| Index: webrtc/base/physicalsocketserver.cc
|
| diff --git a/webrtc/base/physicalsocketserver.cc b/webrtc/base/physicalsocketserver.cc
|
| index a412703015e77f903b82c6126edd21a4c00a023d..91e890cc0dd5229d396de1ffbe0028481d6ad78d 100644
|
| --- a/webrtc/base/physicalsocketserver.cc
|
| +++ b/webrtc/base/physicalsocketserver.cc
|
| @@ -21,6 +21,10 @@
|
| #include <string.h>
|
| #include <errno.h>
|
| #include <fcntl.h>
|
| +#if defined(WEBRTC_USE_EPOLL)
|
| +// "poll" will be used to wait for the signal dispatcher.
|
| +#include <poll.h>
|
| +#endif
|
| #include <sys/ioctl.h>
|
| #include <sys/time.h>
|
| #include <sys/select.h>
|
| @@ -80,6 +84,16 @@ int64_t GetSocketRecvTimestamp(int socket) {
|
| typedef char* SockOptArg;
|
| #endif
|
|
|
| +#if defined(WEBRTC_USE_EPOLL)
|
| +// POLLRDHUP / EPOLLRDHUP are only defined starting with Linux 2.6.17.
|
| +#if !defined(POLLRDHUP)
|
| +#define POLLRDHUP 0x2000
|
| +#endif
|
| +#if !defined(EPOLLRDHUP)
|
| +#define EPOLLRDHUP 0x2000
|
| +#endif
|
| +#endif
|
| +
|
| namespace rtc {
|
|
|
| std::unique_ptr<SocketServer> SocketServer::CreateDefault() {
|
| @@ -765,6 +779,14 @@ void SocketDispatcher::OnEvent(uint32_t ff, int err) {
|
| #elif defined(WEBRTC_POSIX)
|
|
|
| void SocketDispatcher::OnEvent(uint32_t ff, int err) {
|
| +#if defined(WEBRTC_USE_EPOLL)
|
| + // Remember currently enabled events so we can combine multiple changes
|
| + // into one update call later.
|
| + // The signal handlers might re-enable events disabled here, so we can't
|
| + // keep a list of events to disable at the end of the method. This list
|
| + // would not be updated with the events enabled by the signal handlers.
|
| + StartBatchedEventUpdates();
|
| +#endif
|
| // Make sure we deliver connect/accept first. Otherwise, consumers may see
|
| // something like a READ followed by a CONNECT, which would be odd.
|
| if ((ff & DE_CONNECT) != 0) {
|
| @@ -788,10 +810,65 @@ void SocketDispatcher::OnEvent(uint32_t ff, int err) {
|
| SetEnabledEvents(0);
|
| SignalCloseEvent(this, err);
|
| }
|
| +#if defined(WEBRTC_USE_EPOLL)
|
| + FinishBatchedEventUpdates();
|
| +#endif
|
| }
|
|
|
| #endif // WEBRTC_POSIX
|
|
|
| +#if defined(WEBRTC_USE_EPOLL)
|
| +
|
| +static int GetEpollEvents(uint32_t ff) {
|
| + int events = 0;
|
| + if (ff & (DE_READ | DE_ACCEPT)) {
|
| + events |= EPOLLIN;
|
| + }
|
| + if (ff & (DE_WRITE | DE_CONNECT)) {
|
| + events |= EPOLLOUT;
|
| + }
|
| + return events;
|
| +}
|
| +
|
| +void SocketDispatcher::StartBatchedEventUpdates() {
|
| + RTC_DCHECK_EQ(saved_enabled_events_, -1);
|
| + saved_enabled_events_ = enabled_events();
|
| +}
|
| +
|
| +void SocketDispatcher::FinishBatchedEventUpdates() {
|
| + RTC_DCHECK_NE(saved_enabled_events_, -1);
|
| + uint8_t old_events = static_cast<uint8_t>(saved_enabled_events_);
|
| + saved_enabled_events_ = -1;
|
| + MaybeUpdateDispatcher(old_events);
|
| +}
|
| +
|
| +void SocketDispatcher::MaybeUpdateDispatcher(uint8_t old_events) {
|
| + if (GetEpollEvents(enabled_events()) != GetEpollEvents(old_events) &&
|
| + saved_enabled_events_ == -1) {
|
| + ss_->Update(this);
|
| + }
|
| +}
|
| +
|
| +void SocketDispatcher::SetEnabledEvents(uint8_t events) {
|
| + uint8_t old_events = enabled_events();
|
| + PhysicalSocket::SetEnabledEvents(events);
|
| + MaybeUpdateDispatcher(old_events);
|
| +}
|
| +
|
| +void SocketDispatcher::EnableEvents(uint8_t events) {
|
| + uint8_t old_events = enabled_events();
|
| + PhysicalSocket::EnableEvents(events);
|
| + MaybeUpdateDispatcher(old_events);
|
| +}
|
| +
|
| +void SocketDispatcher::DisableEvents(uint8_t events) {
|
| + uint8_t old_events = enabled_events();
|
| + PhysicalSocket::DisableEvents(events);
|
| + MaybeUpdateDispatcher(old_events);
|
| +}
|
| +
|
| +#endif // WEBRTC_USE_EPOLL
|
| +
|
| int SocketDispatcher::Close() {
|
| if (s_ == INVALID_SOCKET)
|
| return 0;
|
| @@ -1120,6 +1197,17 @@ class Signaler : public EventDispatcher {
|
|
|
| PhysicalSocketServer::PhysicalSocketServer()
|
| : fWait_(false) {
|
| +#if defined(WEBRTC_USE_EPOLL)
|
| + // Since Linux 2.6.8, the size argument is ignored, but must be greater than
|
| + // zero. Before that the size served as hint to the kernel for the amount of
|
| + // space to initially allocate in internal data structures.
|
| + epoll_fd_ = epoll_create(FD_SETSIZE);
|
| + if (epoll_fd_ == -1) {
|
| + // Not an error, will fall back to "select" below.
|
| + LOG_E(LS_WARNING, EN, errno) << "epoll_create";
|
| + epoll_fd_ = INVALID_SOCKET;
|
| + }
|
| +#endif
|
| signal_wakeup_ = new Signaler(this, &fWait_);
|
| #if defined(WEBRTC_WIN)
|
| socket_ev_ = WSACreateEvent();
|
| @@ -1134,6 +1222,11 @@ PhysicalSocketServer::~PhysicalSocketServer() {
|
| signal_dispatcher_.reset();
|
| #endif
|
| delete signal_wakeup_;
|
| +#if defined(WEBRTC_USE_EPOLL)
|
| + if (epoll_fd_ != INVALID_SOCKET) {
|
| + close(epoll_fd_);
|
| + }
|
| +#endif
|
| RTC_DCHECK(dispatchers_.empty());
|
| }
|
|
|
| @@ -1181,40 +1274,148 @@ AsyncSocket* PhysicalSocketServer::WrapSocket(SOCKET s) {
|
|
|
| void PhysicalSocketServer::Add(Dispatcher *pdispatcher) {
|
| CritScope cs(&crit_);
|
| - // Prevent duplicates. This can cause dead dispatchers to stick around.
|
| - DispatcherList::iterator pos = std::find(dispatchers_.begin(),
|
| - dispatchers_.end(),
|
| - pdispatcher);
|
| - if (pos != dispatchers_.end())
|
| - return;
|
| - dispatchers_.push_back(pdispatcher);
|
| + if (processing_dispatchers_) {
|
| + // A dispatcher is being added while a "Wait" call is processing the
|
| + // list of socket events.
|
| + // Defer adding to "dispatchers_" set until processing is done to avoid
|
| + // invalidating the iterator in "Wait".
|
| + pending_remove_dispatchers_.erase(pdispatcher);
|
| + pending_add_dispatchers_.insert(pdispatcher);
|
| + } else {
|
| + dispatchers_.insert(pdispatcher);
|
| + }
|
| +#if defined(WEBRTC_USE_EPOLL)
|
| + if (epoll_fd_ != INVALID_SOCKET) {
|
| + AddEpoll(pdispatcher);
|
| + }
|
| +#endif // WEBRTC_USE_EPOLL
|
| }
|
|
|
| void PhysicalSocketServer::Remove(Dispatcher *pdispatcher) {
|
| CritScope cs(&crit_);
|
| - DispatcherList::iterator pos = std::find(dispatchers_.begin(),
|
| - dispatchers_.end(),
|
| - pdispatcher);
|
| - // We silently ignore duplicate calls to Add, so we should silently ignore
|
| - // the (expected) symmetric calls to Remove. Note that this may still hide
|
| - // a real issue, so we at least log a warning about it.
|
| - if (pos == dispatchers_.end()) {
|
| + if (processing_dispatchers_) {
|
| + // A dispatcher is being removed while a "Wait" call is processing the
|
| + // list of socket events.
|
| + // Defer removal from "dispatchers_" set until processing is done to avoid
|
| + // invalidating the iterator in "Wait".
|
| + if (!pending_add_dispatchers_.erase(pdispatcher) &&
|
| + dispatchers_.find(pdispatcher) == dispatchers_.end()) {
|
| + LOG(LS_WARNING) << "PhysicalSocketServer asked to remove a unknown "
|
| + << "dispatcher, potentially from a duplicate call to "
|
| + << "Add.";
|
| + return;
|
| + }
|
| +
|
| + pending_remove_dispatchers_.insert(pdispatcher);
|
| + } else if (!dispatchers_.erase(pdispatcher)) {
|
| LOG(LS_WARNING) << "PhysicalSocketServer asked to remove a unknown "
|
| << "dispatcher, potentially from a duplicate call to Add.";
|
| return;
|
| }
|
| - size_t index = pos - dispatchers_.begin();
|
| - dispatchers_.erase(pos);
|
| - for (IteratorList::iterator it = iterators_.begin(); it != iterators_.end();
|
| - ++it) {
|
| - if (index < **it) {
|
| - --**it;
|
| +#if defined(WEBRTC_USE_EPOLL)
|
| + if (epoll_fd_ != INVALID_SOCKET) {
|
| + RemoveEpoll(pdispatcher);
|
| + }
|
| +#endif // WEBRTC_USE_EPOLL
|
| +}
|
| +
|
| +void PhysicalSocketServer::Update(Dispatcher* pdispatcher) {
|
| +#if defined(WEBRTC_USE_EPOLL)
|
| + if (epoll_fd_ == INVALID_SOCKET) {
|
| + return;
|
| + }
|
| +
|
| + CritScope cs(&crit_);
|
| + if (dispatchers_.find(pdispatcher) == dispatchers_.end()) {
|
| + return;
|
| + }
|
| +
|
| + UpdateEpoll(pdispatcher);
|
| +#endif
|
| +}
|
| +
|
| +void PhysicalSocketServer::AddRemovePendingDispatchers() {
|
| + if (!pending_add_dispatchers_.empty()) {
|
| + for (Dispatcher* pdispatcher : pending_add_dispatchers_) {
|
| + dispatchers_.insert(pdispatcher);
|
| + }
|
| + pending_add_dispatchers_.clear();
|
| + }
|
| +
|
| + if (!pending_remove_dispatchers_.empty()) {
|
| + for (Dispatcher* pdispatcher : pending_remove_dispatchers_) {
|
| + dispatchers_.erase(pdispatcher);
|
| }
|
| + pending_remove_dispatchers_.clear();
|
| }
|
| }
|
|
|
| #if defined(WEBRTC_POSIX)
|
| +
|
| bool PhysicalSocketServer::Wait(int cmsWait, bool process_io) {
|
| +#if defined(WEBRTC_USE_EPOLL)
|
| + // We don't keep a dedicated "epoll" descriptor containing only the non-IO
|
| + // (i.e. signaling) dispatcher, so "poll" will be used instead of the default
|
| + // "select" to support sockets larger than FD_SETSIZE.
|
| + if (!process_io) {
|
| + return WaitPoll(cmsWait, signal_wakeup_);
|
| + } else if (epoll_fd_ != INVALID_SOCKET) {
|
| + return WaitEpoll(cmsWait);
|
| + }
|
| +#endif
|
| + return WaitSelect(cmsWait, process_io);
|
| +}
|
| +
|
| +static void ProcessEvents(Dispatcher* dispatcher,
|
| + bool readable,
|
| + bool writable,
|
| + bool check_error) {
|
| + int errcode = 0;
|
| + // TODO(pthatcher): Should we set errcode if getsockopt fails?
|
| + if (check_error) {
|
| + socklen_t len = sizeof(errcode);
|
| + ::getsockopt(dispatcher->GetDescriptor(), SOL_SOCKET, SO_ERROR, &errcode,
|
| + &len);
|
| + }
|
| +
|
| + uint32_t ff = 0;
|
| +
|
| + // Check readable descriptors. If we're waiting on an accept, signal
|
| + // that. Otherwise we're waiting for data, check to see if we're
|
| + // readable or really closed.
|
| + // TODO(pthatcher): Only peek at TCP descriptors.
|
| + if (readable) {
|
| + if (dispatcher->GetRequestedEvents() & DE_ACCEPT) {
|
| + ff |= DE_ACCEPT;
|
| + } else if (errcode || dispatcher->IsDescriptorClosed()) {
|
| + ff |= DE_CLOSE;
|
| + } else {
|
| + ff |= DE_READ;
|
| + }
|
| + }
|
| +
|
| + // Check writable descriptors. If we're waiting on a connect, detect
|
| + // success versus failure by the reaped error code.
|
| + if (writable) {
|
| + if (dispatcher->GetRequestedEvents() & DE_CONNECT) {
|
| + if (!errcode) {
|
| + ff |= DE_CONNECT;
|
| + } else {
|
| + ff |= DE_CLOSE;
|
| + }
|
| + } else {
|
| + ff |= DE_WRITE;
|
| + }
|
| + }
|
| +
|
| + // Tell the descriptor about the event.
|
| + if (ff != 0) {
|
| + dispatcher->OnPreEvent(ff);
|
| + dispatcher->OnEvent(ff, errcode);
|
| + }
|
| +}
|
| +
|
| +bool PhysicalSocketServer::WaitSelect(int cmsWait, bool process_io) {
|
| // Calculate timing information
|
|
|
| struct timeval* ptvWait = nullptr;
|
| @@ -1257,13 +1458,17 @@ bool PhysicalSocketServer::Wait(int cmsWait, bool process_io) {
|
| int fdmax = -1;
|
| {
|
| CritScope cr(&crit_);
|
| - for (size_t i = 0; i < dispatchers_.size(); ++i) {
|
| + // TODO(jbauch): Support re-entrant waiting.
|
| + RTC_DCHECK(!processing_dispatchers_);
|
| + for (Dispatcher* pdispatcher : dispatchers_) {
|
| // Query dispatchers for read and write wait state
|
| - Dispatcher *pdispatcher = dispatchers_[i];
|
| RTC_DCHECK(pdispatcher);
|
| if (!process_io && (pdispatcher != signal_wakeup_))
|
| continue;
|
| int fd = pdispatcher->GetDescriptor();
|
| + // "select"ing a file descriptor that is equal to or larger than
|
| + // FD_SETSIZE will result in undefined behavior.
|
| + RTC_DCHECK_LT(fd, FD_SETSIZE);
|
| if (fd > fdmax)
|
| fdmax = fd;
|
|
|
| @@ -1297,55 +1502,28 @@ bool PhysicalSocketServer::Wait(int cmsWait, bool process_io) {
|
| } else {
|
| // We have signaled descriptors
|
| CritScope cr(&crit_);
|
| - for (size_t i = 0; i < dispatchers_.size(); ++i) {
|
| - Dispatcher *pdispatcher = dispatchers_[i];
|
| + processing_dispatchers_ = true;
|
| + for (Dispatcher* pdispatcher : dispatchers_) {
|
| int fd = pdispatcher->GetDescriptor();
|
| - uint32_t ff = 0;
|
| - int errcode = 0;
|
| -
|
| - // Reap any error code, which can be signaled through reads or writes.
|
| - // TODO(pthatcher): Should we set errcode if getsockopt fails?
|
| - if (FD_ISSET(fd, &fdsRead) || FD_ISSET(fd, &fdsWrite)) {
|
| - socklen_t len = sizeof(errcode);
|
| - ::getsockopt(fd, SOL_SOCKET, SO_ERROR, &errcode, &len);
|
| - }
|
|
|
| - // Check readable descriptors. If we're waiting on an accept, signal
|
| - // that. Otherwise we're waiting for data, check to see if we're
|
| - // readable or really closed.
|
| - // TODO(pthatcher): Only peek at TCP descriptors.
|
| - if (FD_ISSET(fd, &fdsRead)) {
|
| + bool readable = FD_ISSET(fd, &fdsRead);
|
| + if (readable) {
|
| FD_CLR(fd, &fdsRead);
|
| - if (pdispatcher->GetRequestedEvents() & DE_ACCEPT) {
|
| - ff |= DE_ACCEPT;
|
| - } else if (errcode || pdispatcher->IsDescriptorClosed()) {
|
| - ff |= DE_CLOSE;
|
| - } else {
|
| - ff |= DE_READ;
|
| - }
|
| }
|
|
|
| - // Check writable descriptors. If we're waiting on a connect, detect
|
| - // success versus failure by the reaped error code.
|
| - if (FD_ISSET(fd, &fdsWrite)) {
|
| + bool writable = FD_ISSET(fd, &fdsWrite);
|
| + if (writable) {
|
| FD_CLR(fd, &fdsWrite);
|
| - if (pdispatcher->GetRequestedEvents() & DE_CONNECT) {
|
| - if (!errcode) {
|
| - ff |= DE_CONNECT;
|
| - } else {
|
| - ff |= DE_CLOSE;
|
| - }
|
| - } else {
|
| - ff |= DE_WRITE;
|
| - }
|
| }
|
|
|
| - // Tell the descriptor about the event.
|
| - if (ff != 0) {
|
| - pdispatcher->OnPreEvent(ff);
|
| - pdispatcher->OnEvent(ff, errcode);
|
| - }
|
| + // The error code can be signaled through reads or writes.
|
| + ProcessEvents(pdispatcher, readable, writable, readable || writable);
|
| }
|
| +
|
| + processing_dispatchers_ = false;
|
| + // Process deferred dispatchers that have been added/removed while the
|
| + // events were handled above.
|
| + AddRemovePendingDispatchers();
|
| }
|
|
|
| // Recalc the time remaining to wait. Doing it here means it doesn't get
|
| @@ -1372,6 +1550,214 @@ bool PhysicalSocketServer::Wait(int cmsWait, bool process_io) {
|
| return true;
|
| }
|
|
|
| +#if defined(WEBRTC_USE_EPOLL)
|
| +
|
| +// Initial number of events to process with one call to "epoll_wait".
|
| +static const size_t kInitialEpollEvents = 128;
|
| +
|
| +// Maximum number of events to process with one call to "epoll_wait".
|
| +static const size_t kMaxEpollEvents = 8192;
|
| +
|
| +void PhysicalSocketServer::AddEpoll(Dispatcher* pdispatcher) {
|
| + RTC_DCHECK(epoll_fd_ != INVALID_SOCKET);
|
| + int fd = pdispatcher->GetDescriptor();
|
| + RTC_DCHECK(fd != INVALID_SOCKET);
|
| + if (fd == INVALID_SOCKET) {
|
| + return;
|
| + }
|
| +
|
| + struct epoll_event event = {0};
|
| + event.events = GetEpollEvents(pdispatcher->GetRequestedEvents());
|
| + event.data.ptr = pdispatcher;
|
| + int err = epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, fd, &event);
|
| + RTC_DCHECK_EQ(err, 0);
|
| + if (err == -1) {
|
| + LOG_E(LS_ERROR, EN, errno) << "epoll_ctl EPOLL_CTL_ADD";
|
| + }
|
| +}
|
| +
|
| +void PhysicalSocketServer::RemoveEpoll(Dispatcher* pdispatcher) {
|
| + RTC_DCHECK(epoll_fd_ != INVALID_SOCKET);
|
| + int fd = pdispatcher->GetDescriptor();
|
| + RTC_DCHECK(fd != INVALID_SOCKET);
|
| + if (fd == INVALID_SOCKET) {
|
| + return;
|
| + }
|
| +
|
| + struct epoll_event event = {0};
|
| + int err = epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, fd, &event);
|
| + RTC_DCHECK(err == 0 || errno == ENOENT);
|
| + if (err == -1) {
|
| + if (errno == ENOENT) {
|
| + // Socket has already been closed.
|
| + LOG_E(LS_VERBOSE, EN, errno) << "epoll_ctl EPOLL_CTL_DEL";
|
| + } else {
|
| + LOG_E(LS_ERROR, EN, errno) << "epoll_ctl EPOLL_CTL_DEL";
|
| + }
|
| + }
|
| +}
|
| +
|
| +void PhysicalSocketServer::UpdateEpoll(Dispatcher* pdispatcher) {
|
| + RTC_DCHECK(epoll_fd_ != INVALID_SOCKET);
|
| + int fd = pdispatcher->GetDescriptor();
|
| + RTC_DCHECK(fd != INVALID_SOCKET);
|
| + if (fd == INVALID_SOCKET) {
|
| + return;
|
| + }
|
| +
|
| + struct epoll_event event = {0};
|
| + event.events = GetEpollEvents(pdispatcher->GetRequestedEvents());
|
| + event.data.ptr = pdispatcher;
|
| + int err = epoll_ctl(epoll_fd_, EPOLL_CTL_MOD, fd, &event);
|
| + RTC_DCHECK_EQ(err, 0);
|
| + if (err == -1) {
|
| + LOG_E(LS_ERROR, EN, errno) << "epoll_ctl EPOLL_CTL_MOD";
|
| + }
|
| +}
|
| +
|
| +bool PhysicalSocketServer::WaitEpoll(int cmsWait) {
|
| + RTC_DCHECK(epoll_fd_ != INVALID_SOCKET);
|
| + int64_t tvWait = -1;
|
| + int64_t tvStop = -1;
|
| + if (cmsWait != kForever) {
|
| + tvWait = cmsWait;
|
| + tvStop = TimeAfter(cmsWait);
|
| + }
|
| +
|
| + if (epoll_events_.empty()) {
|
| + // The initial space to receive events is created only if epoll is used.
|
| + epoll_events_.resize(kInitialEpollEvents);
|
| + }
|
| +
|
| + fWait_ = true;
|
| +
|
| + while (fWait_) {
|
| + // Wait then call handlers as appropriate
|
| + // < 0 means error
|
| + // 0 means timeout
|
| + // > 0 means count of descriptors ready
|
| + int n = epoll_wait(epoll_fd_, &epoll_events_[0],
|
| + static_cast<int>(epoll_events_.size()),
|
| + static_cast<int>(tvWait));
|
| + if (n < 0) {
|
| + if (errno != EINTR) {
|
| + LOG_E(LS_ERROR, EN, errno) << "epoll";
|
| + return false;
|
| + }
|
| + // Else ignore the error and keep going. If this EINTR was for one of the
|
| + // signals managed by this PhysicalSocketServer, the
|
| + // PosixSignalDeliveryDispatcher will be in the signaled state in the next
|
| + // iteration.
|
| + } else if (n == 0) {
|
| + // If timeout, return success
|
| + return true;
|
| + } else {
|
| + // We have signaled descriptors
|
| + CritScope cr(&crit_);
|
| + for (int i = 0; i < n; ++i) {
|
| + const epoll_event& event = epoll_events_[i];
|
| + Dispatcher* pdispatcher = static_cast<Dispatcher*>(event.data.ptr);
|
| + if (dispatchers_.find(pdispatcher) == dispatchers_.end()) {
|
| + // The dispatcher for this socket no longer exists.
|
| + continue;
|
| + }
|
| +
|
| + bool readable = (event.events & (EPOLLIN | EPOLLPRI));
|
| + bool writable = (event.events & EPOLLOUT);
|
| + bool check_error = (event.events & (EPOLLRDHUP | EPOLLERR | EPOLLHUP));
|
| +
|
| + ProcessEvents(pdispatcher, readable, writable, check_error);
|
| + }
|
| + }
|
| +
|
| + if (static_cast<size_t>(n) == epoll_events_.size() &&
|
| + epoll_events_.size() < kMaxEpollEvents) {
|
| + // We used the complete space to receive events, increase size for future
|
| + // iterations.
|
| + epoll_events_.resize(std::max(epoll_events_.size() * 2, kMaxEpollEvents));
|
| + }
|
| +
|
| + if (cmsWait != kForever) {
|
| + tvWait = TimeDiff(tvStop, TimeMillis());
|
| + if (tvWait < 0) {
|
| + // Return success on timeout.
|
| + return true;
|
| + }
|
| + }
|
| + }
|
| +
|
| + return true;
|
| +}
|
| +
|
| +bool PhysicalSocketServer::WaitPoll(int cmsWait, Dispatcher* dispatcher) {
|
| + RTC_DCHECK(dispatcher);
|
| + int64_t tvWait = -1;
|
| + int64_t tvStop = -1;
|
| + if (cmsWait != kForever) {
|
| + tvWait = cmsWait;
|
| + tvStop = TimeAfter(cmsWait);
|
| + }
|
| +
|
| + fWait_ = true;
|
| +
|
| + struct pollfd fds = {0};
|
| + int fd = dispatcher->GetDescriptor();
|
| + fds.fd = fd;
|
| +
|
| + while (fWait_) {
|
| + uint32_t ff = dispatcher->GetRequestedEvents();
|
| + fds.events = 0;
|
| + if (ff & (DE_READ | DE_ACCEPT)) {
|
| + fds.events |= POLLIN;
|
| + }
|
| + if (ff & (DE_WRITE | DE_CONNECT)) {
|
| + fds.events |= POLLOUT;
|
| + }
|
| + fds.revents = 0;
|
| +
|
| + // Wait then call handlers as appropriate
|
| + // < 0 means error
|
| + // 0 means timeout
|
| + // > 0 means count of descriptors ready
|
| + int n = poll(&fds, 1, static_cast<int>(tvWait));
|
| + if (n < 0) {
|
| + if (errno != EINTR) {
|
| + LOG_E(LS_ERROR, EN, errno) << "poll";
|
| + return false;
|
| + }
|
| + // Else ignore the error and keep going. If this EINTR was for one of the
|
| + // signals managed by this PhysicalSocketServer, the
|
| + // PosixSignalDeliveryDispatcher will be in the signaled state in the next
|
| + // iteration.
|
| + } else if (n == 0) {
|
| + // If timeout, return success
|
| + return true;
|
| + } else {
|
| + // We have signaled descriptors (should only be the passed dispatcher).
|
| + RTC_DCHECK_EQ(n, 1);
|
| + RTC_DCHECK_EQ(fds.fd, fd);
|
| +
|
| + bool readable = (fds.revents & (POLLIN | POLLPRI));
|
| + bool writable = (fds.revents & POLLOUT);
|
| + bool check_error = (fds.revents & (POLLRDHUP | POLLERR | POLLHUP));
|
| +
|
| + ProcessEvents(dispatcher, readable, writable, check_error);
|
| + }
|
| +
|
| + if (cmsWait != kForever) {
|
| + tvWait = TimeDiff(tvStop, TimeMillis());
|
| + if (tvWait < 0) {
|
| + // Return success on timeout.
|
| + return true;
|
| + }
|
| + }
|
| + }
|
| +
|
| + return true;
|
| +}
|
| +
|
| +#endif // WEBRTC_USE_EPOLL
|
| +
|
| static void GlobalSignalHandler(int signum) {
|
| PosixSignalHandler::Instance()->OnPosixSignalReceived(signum);
|
| }
|
| @@ -1445,12 +1831,13 @@ bool PhysicalSocketServer::Wait(int cmsWait, bool process_io) {
|
|
|
| {
|
| CritScope cr(&crit_);
|
| - size_t i = 0;
|
| - iterators_.push_back(&i);
|
| - // Don't track dispatchers_.size(), because we want to pick up any new
|
| - // dispatchers that were added while processing the loop.
|
| - while (i < dispatchers_.size()) {
|
| - Dispatcher* disp = dispatchers_[i++];
|
| + // TODO(jbauch): Support re-entrant waiting.
|
| + RTC_DCHECK(!processing_dispatchers_);
|
| +
|
| + // Calling "CheckSignalClose" might remove a closed dispatcher from the
|
| + // set. This must be deferred to prevent invalidating the iterator.
|
| + processing_dispatchers_ = true;
|
| + for (Dispatcher* disp : dispatchers_) {
|
| if (!process_io && (disp != signal_wakeup_))
|
| continue;
|
| SOCKET s = disp->GetSocket();
|
| @@ -1465,8 +1852,11 @@ bool PhysicalSocketServer::Wait(int cmsWait, bool process_io) {
|
| event_owners.push_back(disp);
|
| }
|
| }
|
| - RTC_DCHECK(iterators_.back() == &i);
|
| - iterators_.pop_back();
|
| +
|
| + processing_dispatchers_ = false;
|
| + // Process deferred dispatchers that have been added/removed while the
|
| + // events were handled above.
|
| + AddRemovePendingDispatchers();
|
| }
|
|
|
| // Which is shorter, the delay wait or the asked wait?
|
| @@ -1500,14 +1890,15 @@ bool PhysicalSocketServer::Wait(int cmsWait, bool process_io) {
|
| int index = dw - WSA_WAIT_EVENT_0;
|
| if (index > 0) {
|
| --index; // The first event is the socket event
|
| - event_owners[index]->OnPreEvent(0);
|
| - event_owners[index]->OnEvent(0, 0);
|
| + Dispatcher* disp = event_owners[index];
|
| + // The dispatcher could have been removed while waiting for events.
|
| + if (dispatchers_.find(disp) != dispatchers_.end()) {
|
| + disp->OnPreEvent(0);
|
| + disp->OnEvent(0, 0);
|
| + }
|
| } else if (process_io) {
|
| - size_t i = 0, end = dispatchers_.size();
|
| - iterators_.push_back(&i);
|
| - iterators_.push_back(&end); // Don't iterate over new dispatchers.
|
| - while (i < end) {
|
| - Dispatcher* disp = dispatchers_[i++];
|
| + processing_dispatchers_ = true;
|
| + for (Dispatcher* disp : dispatchers_) {
|
| SOCKET s = disp->GetSocket();
|
| if (s == INVALID_SOCKET)
|
| continue;
|
| @@ -1568,10 +1959,11 @@ bool PhysicalSocketServer::Wait(int cmsWait, bool process_io) {
|
| }
|
| }
|
| }
|
| - RTC_DCHECK(iterators_.back() == &end);
|
| - iterators_.pop_back();
|
| - RTC_DCHECK(iterators_.back() == &i);
|
| - iterators_.pop_back();
|
| +
|
| + processing_dispatchers_ = false;
|
| + // Process deferred dispatchers that have been added/removed while the
|
| + // events were handled above.
|
| + AddRemovePendingDispatchers();
|
| }
|
|
|
| // Reset the network event until new activity occurs
|
|
|