Index: webrtc/base/physicalsocketserver.cc |
diff --git a/webrtc/base/physicalsocketserver.cc b/webrtc/base/physicalsocketserver.cc |
index a412703015e77f903b82c6126edd21a4c00a023d..91e890cc0dd5229d396de1ffbe0028481d6ad78d 100644 |
--- a/webrtc/base/physicalsocketserver.cc |
+++ b/webrtc/base/physicalsocketserver.cc |
@@ -21,6 +21,10 @@ |
#include <string.h> |
#include <errno.h> |
#include <fcntl.h> |
+#if defined(WEBRTC_USE_EPOLL) |
+// "poll" will be used to wait for the signal dispatcher. |
+#include <poll.h> |
+#endif |
#include <sys/ioctl.h> |
#include <sys/time.h> |
#include <sys/select.h> |
@@ -80,6 +84,16 @@ int64_t GetSocketRecvTimestamp(int socket) { |
typedef char* SockOptArg; |
#endif |
+#if defined(WEBRTC_USE_EPOLL) |
+// POLLRDHUP / EPOLLRDHUP are only defined starting with Linux 2.6.17. |
+#if !defined(POLLRDHUP) |
+#define POLLRDHUP 0x2000 |
+#endif |
+#if !defined(EPOLLRDHUP) |
+#define EPOLLRDHUP 0x2000 |
+#endif |
+#endif |
+ |
namespace rtc { |
std::unique_ptr<SocketServer> SocketServer::CreateDefault() { |
@@ -765,6 +779,14 @@ void SocketDispatcher::OnEvent(uint32_t ff, int err) { |
#elif defined(WEBRTC_POSIX) |
void SocketDispatcher::OnEvent(uint32_t ff, int err) { |
+#if defined(WEBRTC_USE_EPOLL) |
+ // Remember currently enabled events so we can combine multiple changes |
+ // into one update call later. |
+ // The signal handlers might re-enable events disabled here, so we can't |
+ // keep a list of events to disable at the end of the method. This list |
+ // would not be updated with the events enabled by the signal handlers. |
+ StartBatchedEventUpdates(); |
+#endif |
// Make sure we deliver connect/accept first. Otherwise, consumers may see |
// something like a READ followed by a CONNECT, which would be odd. |
if ((ff & DE_CONNECT) != 0) { |
@@ -788,10 +810,65 @@ void SocketDispatcher::OnEvent(uint32_t ff, int err) { |
SetEnabledEvents(0); |
SignalCloseEvent(this, err); |
} |
+#if defined(WEBRTC_USE_EPOLL) |
+ FinishBatchedEventUpdates(); |
+#endif |
} |
#endif // WEBRTC_POSIX |
+#if defined(WEBRTC_USE_EPOLL) |
+ |
+static int GetEpollEvents(uint32_t ff) { |
+ int events = 0; |
+ if (ff & (DE_READ | DE_ACCEPT)) { |
+ events |= EPOLLIN; |
+ } |
+ if (ff & (DE_WRITE | DE_CONNECT)) { |
+ events |= EPOLLOUT; |
+ } |
+ return events; |
+} |
+ |
+void SocketDispatcher::StartBatchedEventUpdates() { |
+ RTC_DCHECK_EQ(saved_enabled_events_, -1); |
+ saved_enabled_events_ = enabled_events(); |
+} |
+ |
+void SocketDispatcher::FinishBatchedEventUpdates() { |
+ RTC_DCHECK_NE(saved_enabled_events_, -1); |
+ uint8_t old_events = static_cast<uint8_t>(saved_enabled_events_); |
+ saved_enabled_events_ = -1; |
+ MaybeUpdateDispatcher(old_events); |
+} |
+ |
+void SocketDispatcher::MaybeUpdateDispatcher(uint8_t old_events) { |
+ if (GetEpollEvents(enabled_events()) != GetEpollEvents(old_events) && |
+ saved_enabled_events_ == -1) { |
+ ss_->Update(this); |
+ } |
+} |
+ |
+void SocketDispatcher::SetEnabledEvents(uint8_t events) { |
+ uint8_t old_events = enabled_events(); |
+ PhysicalSocket::SetEnabledEvents(events); |
+ MaybeUpdateDispatcher(old_events); |
+} |
+ |
+void SocketDispatcher::EnableEvents(uint8_t events) { |
+ uint8_t old_events = enabled_events(); |
+ PhysicalSocket::EnableEvents(events); |
+ MaybeUpdateDispatcher(old_events); |
+} |
+ |
+void SocketDispatcher::DisableEvents(uint8_t events) { |
+ uint8_t old_events = enabled_events(); |
+ PhysicalSocket::DisableEvents(events); |
+ MaybeUpdateDispatcher(old_events); |
+} |
+ |
+#endif // WEBRTC_USE_EPOLL |
+ |
int SocketDispatcher::Close() { |
if (s_ == INVALID_SOCKET) |
return 0; |
@@ -1120,6 +1197,17 @@ class Signaler : public EventDispatcher { |
PhysicalSocketServer::PhysicalSocketServer() |
: fWait_(false) { |
+#if defined(WEBRTC_USE_EPOLL) |
+ // Since Linux 2.6.8, the size argument is ignored, but must be greater than |
+ // zero. Before that the size served as hint to the kernel for the amount of |
+ // space to initially allocate in internal data structures. |
+ epoll_fd_ = epoll_create(FD_SETSIZE); |
+ if (epoll_fd_ == -1) { |
+ // Not an error, will fall back to "select" below. |
+ LOG_E(LS_WARNING, EN, errno) << "epoll_create"; |
+ epoll_fd_ = INVALID_SOCKET; |
+ } |
+#endif |
signal_wakeup_ = new Signaler(this, &fWait_); |
#if defined(WEBRTC_WIN) |
socket_ev_ = WSACreateEvent(); |
@@ -1134,6 +1222,11 @@ PhysicalSocketServer::~PhysicalSocketServer() { |
signal_dispatcher_.reset(); |
#endif |
delete signal_wakeup_; |
+#if defined(WEBRTC_USE_EPOLL) |
+ if (epoll_fd_ != INVALID_SOCKET) { |
+ close(epoll_fd_); |
+ } |
+#endif |
RTC_DCHECK(dispatchers_.empty()); |
} |
@@ -1181,40 +1274,148 @@ AsyncSocket* PhysicalSocketServer::WrapSocket(SOCKET s) { |
void PhysicalSocketServer::Add(Dispatcher *pdispatcher) { |
CritScope cs(&crit_); |
- // Prevent duplicates. This can cause dead dispatchers to stick around. |
- DispatcherList::iterator pos = std::find(dispatchers_.begin(), |
- dispatchers_.end(), |
- pdispatcher); |
- if (pos != dispatchers_.end()) |
- return; |
- dispatchers_.push_back(pdispatcher); |
+ if (processing_dispatchers_) { |
+ // A dispatcher is being added while a "Wait" call is processing the |
+ // list of socket events. |
+ // Defer adding to "dispatchers_" set until processing is done to avoid |
+ // invalidating the iterator in "Wait". |
+ pending_remove_dispatchers_.erase(pdispatcher); |
+ pending_add_dispatchers_.insert(pdispatcher); |
+ } else { |
+ dispatchers_.insert(pdispatcher); |
+ } |
+#if defined(WEBRTC_USE_EPOLL) |
+ if (epoll_fd_ != INVALID_SOCKET) { |
+ AddEpoll(pdispatcher); |
+ } |
+#endif // WEBRTC_USE_EPOLL |
} |
void PhysicalSocketServer::Remove(Dispatcher *pdispatcher) { |
CritScope cs(&crit_); |
- DispatcherList::iterator pos = std::find(dispatchers_.begin(), |
- dispatchers_.end(), |
- pdispatcher); |
- // We silently ignore duplicate calls to Add, so we should silently ignore |
- // the (expected) symmetric calls to Remove. Note that this may still hide |
- // a real issue, so we at least log a warning about it. |
- if (pos == dispatchers_.end()) { |
+ if (processing_dispatchers_) { |
+ // A dispatcher is being removed while a "Wait" call is processing the |
+ // list of socket events. |
+ // Defer removal from "dispatchers_" set until processing is done to avoid |
+ // invalidating the iterator in "Wait". |
+ if (!pending_add_dispatchers_.erase(pdispatcher) && |
+ dispatchers_.find(pdispatcher) == dispatchers_.end()) { |
+ LOG(LS_WARNING) << "PhysicalSocketServer asked to remove a unknown " |
+ << "dispatcher, potentially from a duplicate call to " |
+ << "Add."; |
+ return; |
+ } |
+ |
+ pending_remove_dispatchers_.insert(pdispatcher); |
+ } else if (!dispatchers_.erase(pdispatcher)) { |
LOG(LS_WARNING) << "PhysicalSocketServer asked to remove a unknown " |
<< "dispatcher, potentially from a duplicate call to Add."; |
return; |
} |
- size_t index = pos - dispatchers_.begin(); |
- dispatchers_.erase(pos); |
- for (IteratorList::iterator it = iterators_.begin(); it != iterators_.end(); |
- ++it) { |
- if (index < **it) { |
- --**it; |
+#if defined(WEBRTC_USE_EPOLL) |
+ if (epoll_fd_ != INVALID_SOCKET) { |
+ RemoveEpoll(pdispatcher); |
+ } |
+#endif // WEBRTC_USE_EPOLL |
+} |
+ |
+void PhysicalSocketServer::Update(Dispatcher* pdispatcher) { |
+#if defined(WEBRTC_USE_EPOLL) |
+ if (epoll_fd_ == INVALID_SOCKET) { |
+ return; |
+ } |
+ |
+ CritScope cs(&crit_); |
+ if (dispatchers_.find(pdispatcher) == dispatchers_.end()) { |
+ return; |
+ } |
+ |
+ UpdateEpoll(pdispatcher); |
+#endif |
+} |
+ |
+void PhysicalSocketServer::AddRemovePendingDispatchers() { |
+ if (!pending_add_dispatchers_.empty()) { |
+ for (Dispatcher* pdispatcher : pending_add_dispatchers_) { |
+ dispatchers_.insert(pdispatcher); |
+ } |
+ pending_add_dispatchers_.clear(); |
+ } |
+ |
+ if (!pending_remove_dispatchers_.empty()) { |
+ for (Dispatcher* pdispatcher : pending_remove_dispatchers_) { |
+ dispatchers_.erase(pdispatcher); |
} |
+ pending_remove_dispatchers_.clear(); |
} |
} |
#if defined(WEBRTC_POSIX) |
+ |
bool PhysicalSocketServer::Wait(int cmsWait, bool process_io) { |
+#if defined(WEBRTC_USE_EPOLL) |
+ // We don't keep a dedicated "epoll" descriptor containing only the non-IO |
+ // (i.e. signaling) dispatcher, so "poll" will be used instead of the default |
+ // "select" to support sockets larger than FD_SETSIZE. |
+ if (!process_io) { |
+ return WaitPoll(cmsWait, signal_wakeup_); |
+ } else if (epoll_fd_ != INVALID_SOCKET) { |
+ return WaitEpoll(cmsWait); |
+ } |
+#endif |
+ return WaitSelect(cmsWait, process_io); |
+} |
+ |
+static void ProcessEvents(Dispatcher* dispatcher, |
+ bool readable, |
+ bool writable, |
+ bool check_error) { |
+ int errcode = 0; |
+ // TODO(pthatcher): Should we set errcode if getsockopt fails? |
+ if (check_error) { |
+ socklen_t len = sizeof(errcode); |
+ ::getsockopt(dispatcher->GetDescriptor(), SOL_SOCKET, SO_ERROR, &errcode, |
+ &len); |
+ } |
+ |
+ uint32_t ff = 0; |
+ |
+ // Check readable descriptors. If we're waiting on an accept, signal |
+ // that. Otherwise we're waiting for data, check to see if we're |
+ // readable or really closed. |
+ // TODO(pthatcher): Only peek at TCP descriptors. |
+ if (readable) { |
+ if (dispatcher->GetRequestedEvents() & DE_ACCEPT) { |
+ ff |= DE_ACCEPT; |
+ } else if (errcode || dispatcher->IsDescriptorClosed()) { |
+ ff |= DE_CLOSE; |
+ } else { |
+ ff |= DE_READ; |
+ } |
+ } |
+ |
+ // Check writable descriptors. If we're waiting on a connect, detect |
+ // success versus failure by the reaped error code. |
+ if (writable) { |
+ if (dispatcher->GetRequestedEvents() & DE_CONNECT) { |
+ if (!errcode) { |
+ ff |= DE_CONNECT; |
+ } else { |
+ ff |= DE_CLOSE; |
+ } |
+ } else { |
+ ff |= DE_WRITE; |
+ } |
+ } |
+ |
+ // Tell the descriptor about the event. |
+ if (ff != 0) { |
+ dispatcher->OnPreEvent(ff); |
+ dispatcher->OnEvent(ff, errcode); |
+ } |
+} |
+ |
+bool PhysicalSocketServer::WaitSelect(int cmsWait, bool process_io) { |
// Calculate timing information |
struct timeval* ptvWait = nullptr; |
@@ -1257,13 +1458,17 @@ bool PhysicalSocketServer::Wait(int cmsWait, bool process_io) { |
int fdmax = -1; |
{ |
CritScope cr(&crit_); |
- for (size_t i = 0; i < dispatchers_.size(); ++i) { |
+ // TODO(jbauch): Support re-entrant waiting. |
+ RTC_DCHECK(!processing_dispatchers_); |
+ for (Dispatcher* pdispatcher : dispatchers_) { |
// Query dispatchers for read and write wait state |
- Dispatcher *pdispatcher = dispatchers_[i]; |
RTC_DCHECK(pdispatcher); |
if (!process_io && (pdispatcher != signal_wakeup_)) |
continue; |
int fd = pdispatcher->GetDescriptor(); |
+ // "select"ing a file descriptor that is equal to or larger than |
+ // FD_SETSIZE will result in undefined behavior. |
+ RTC_DCHECK_LT(fd, FD_SETSIZE); |
if (fd > fdmax) |
fdmax = fd; |
@@ -1297,55 +1502,28 @@ bool PhysicalSocketServer::Wait(int cmsWait, bool process_io) { |
} else { |
// We have signaled descriptors |
CritScope cr(&crit_); |
- for (size_t i = 0; i < dispatchers_.size(); ++i) { |
- Dispatcher *pdispatcher = dispatchers_[i]; |
+ processing_dispatchers_ = true; |
+ for (Dispatcher* pdispatcher : dispatchers_) { |
int fd = pdispatcher->GetDescriptor(); |
- uint32_t ff = 0; |
- int errcode = 0; |
- |
- // Reap any error code, which can be signaled through reads or writes. |
- // TODO(pthatcher): Should we set errcode if getsockopt fails? |
- if (FD_ISSET(fd, &fdsRead) || FD_ISSET(fd, &fdsWrite)) { |
- socklen_t len = sizeof(errcode); |
- ::getsockopt(fd, SOL_SOCKET, SO_ERROR, &errcode, &len); |
- } |
- // Check readable descriptors. If we're waiting on an accept, signal |
- // that. Otherwise we're waiting for data, check to see if we're |
- // readable or really closed. |
- // TODO(pthatcher): Only peek at TCP descriptors. |
- if (FD_ISSET(fd, &fdsRead)) { |
+ bool readable = FD_ISSET(fd, &fdsRead); |
+ if (readable) { |
FD_CLR(fd, &fdsRead); |
- if (pdispatcher->GetRequestedEvents() & DE_ACCEPT) { |
- ff |= DE_ACCEPT; |
- } else if (errcode || pdispatcher->IsDescriptorClosed()) { |
- ff |= DE_CLOSE; |
- } else { |
- ff |= DE_READ; |
- } |
} |
- // Check writable descriptors. If we're waiting on a connect, detect |
- // success versus failure by the reaped error code. |
- if (FD_ISSET(fd, &fdsWrite)) { |
+ bool writable = FD_ISSET(fd, &fdsWrite); |
+ if (writable) { |
FD_CLR(fd, &fdsWrite); |
- if (pdispatcher->GetRequestedEvents() & DE_CONNECT) { |
- if (!errcode) { |
- ff |= DE_CONNECT; |
- } else { |
- ff |= DE_CLOSE; |
- } |
- } else { |
- ff |= DE_WRITE; |
- } |
} |
- // Tell the descriptor about the event. |
- if (ff != 0) { |
- pdispatcher->OnPreEvent(ff); |
- pdispatcher->OnEvent(ff, errcode); |
- } |
+ // The error code can be signaled through reads or writes. |
+ ProcessEvents(pdispatcher, readable, writable, readable || writable); |
} |
+ |
+ processing_dispatchers_ = false; |
+ // Process deferred dispatchers that have been added/removed while the |
+ // events were handled above. |
+ AddRemovePendingDispatchers(); |
} |
// Recalc the time remaining to wait. Doing it here means it doesn't get |
@@ -1372,6 +1550,214 @@ bool PhysicalSocketServer::Wait(int cmsWait, bool process_io) { |
return true; |
} |
+#if defined(WEBRTC_USE_EPOLL) |
+ |
+// Initial number of events to process with one call to "epoll_wait". |
+static const size_t kInitialEpollEvents = 128; |
+ |
+// Maximum number of events to process with one call to "epoll_wait". |
+static const size_t kMaxEpollEvents = 8192; |
+ |
+void PhysicalSocketServer::AddEpoll(Dispatcher* pdispatcher) { |
+ RTC_DCHECK(epoll_fd_ != INVALID_SOCKET); |
+ int fd = pdispatcher->GetDescriptor(); |
+ RTC_DCHECK(fd != INVALID_SOCKET); |
+ if (fd == INVALID_SOCKET) { |
+ return; |
+ } |
+ |
+ struct epoll_event event = {0}; |
+ event.events = GetEpollEvents(pdispatcher->GetRequestedEvents()); |
+ event.data.ptr = pdispatcher; |
+ int err = epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, fd, &event); |
+ RTC_DCHECK_EQ(err, 0); |
+ if (err == -1) { |
+ LOG_E(LS_ERROR, EN, errno) << "epoll_ctl EPOLL_CTL_ADD"; |
+ } |
+} |
+ |
+void PhysicalSocketServer::RemoveEpoll(Dispatcher* pdispatcher) { |
+ RTC_DCHECK(epoll_fd_ != INVALID_SOCKET); |
+ int fd = pdispatcher->GetDescriptor(); |
+ RTC_DCHECK(fd != INVALID_SOCKET); |
+ if (fd == INVALID_SOCKET) { |
+ return; |
+ } |
+ |
+ struct epoll_event event = {0}; |
+ int err = epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, fd, &event); |
+ RTC_DCHECK(err == 0 || errno == ENOENT); |
+ if (err == -1) { |
+ if (errno == ENOENT) { |
+ // Socket has already been closed. |
+ LOG_E(LS_VERBOSE, EN, errno) << "epoll_ctl EPOLL_CTL_DEL"; |
+ } else { |
+ LOG_E(LS_ERROR, EN, errno) << "epoll_ctl EPOLL_CTL_DEL"; |
+ } |
+ } |
+} |
+ |
+void PhysicalSocketServer::UpdateEpoll(Dispatcher* pdispatcher) { |
+ RTC_DCHECK(epoll_fd_ != INVALID_SOCKET); |
+ int fd = pdispatcher->GetDescriptor(); |
+ RTC_DCHECK(fd != INVALID_SOCKET); |
+ if (fd == INVALID_SOCKET) { |
+ return; |
+ } |
+ |
+ struct epoll_event event = {0}; |
+ event.events = GetEpollEvents(pdispatcher->GetRequestedEvents()); |
+ event.data.ptr = pdispatcher; |
+ int err = epoll_ctl(epoll_fd_, EPOLL_CTL_MOD, fd, &event); |
+ RTC_DCHECK_EQ(err, 0); |
+ if (err == -1) { |
+ LOG_E(LS_ERROR, EN, errno) << "epoll_ctl EPOLL_CTL_MOD"; |
+ } |
+} |
+ |
+bool PhysicalSocketServer::WaitEpoll(int cmsWait) { |
+ RTC_DCHECK(epoll_fd_ != INVALID_SOCKET); |
+ int64_t tvWait = -1; |
+ int64_t tvStop = -1; |
+ if (cmsWait != kForever) { |
+ tvWait = cmsWait; |
+ tvStop = TimeAfter(cmsWait); |
+ } |
+ |
+ if (epoll_events_.empty()) { |
+ // The initial space to receive events is created only if epoll is used. |
+ epoll_events_.resize(kInitialEpollEvents); |
+ } |
+ |
+ fWait_ = true; |
+ |
+ while (fWait_) { |
+ // Wait then call handlers as appropriate |
+ // < 0 means error |
+ // 0 means timeout |
+ // > 0 means count of descriptors ready |
+ int n = epoll_wait(epoll_fd_, &epoll_events_[0], |
+ static_cast<int>(epoll_events_.size()), |
+ static_cast<int>(tvWait)); |
+ if (n < 0) { |
+ if (errno != EINTR) { |
+ LOG_E(LS_ERROR, EN, errno) << "epoll"; |
+ return false; |
+ } |
+ // Else ignore the error and keep going. If this EINTR was for one of the |
+ // signals managed by this PhysicalSocketServer, the |
+ // PosixSignalDeliveryDispatcher will be in the signaled state in the next |
+ // iteration. |
+ } else if (n == 0) { |
+ // If timeout, return success |
+ return true; |
+ } else { |
+ // We have signaled descriptors |
+ CritScope cr(&crit_); |
+ for (int i = 0; i < n; ++i) { |
+ const epoll_event& event = epoll_events_[i]; |
+ Dispatcher* pdispatcher = static_cast<Dispatcher*>(event.data.ptr); |
+ if (dispatchers_.find(pdispatcher) == dispatchers_.end()) { |
+ // The dispatcher for this socket no longer exists. |
+ continue; |
+ } |
+ |
+ bool readable = (event.events & (EPOLLIN | EPOLLPRI)); |
+ bool writable = (event.events & EPOLLOUT); |
+ bool check_error = (event.events & (EPOLLRDHUP | EPOLLERR | EPOLLHUP)); |
+ |
+ ProcessEvents(pdispatcher, readable, writable, check_error); |
+ } |
+ } |
+ |
+ if (static_cast<size_t>(n) == epoll_events_.size() && |
+ epoll_events_.size() < kMaxEpollEvents) { |
+ // We used the complete space to receive events, increase size for future |
+ // iterations. |
+ epoll_events_.resize(std::max(epoll_events_.size() * 2, kMaxEpollEvents)); |
+ } |
+ |
+ if (cmsWait != kForever) { |
+ tvWait = TimeDiff(tvStop, TimeMillis()); |
+ if (tvWait < 0) { |
+ // Return success on timeout. |
+ return true; |
+ } |
+ } |
+ } |
+ |
+ return true; |
+} |
+ |
+bool PhysicalSocketServer::WaitPoll(int cmsWait, Dispatcher* dispatcher) { |
+ RTC_DCHECK(dispatcher); |
+ int64_t tvWait = -1; |
+ int64_t tvStop = -1; |
+ if (cmsWait != kForever) { |
+ tvWait = cmsWait; |
+ tvStop = TimeAfter(cmsWait); |
+ } |
+ |
+ fWait_ = true; |
+ |
+ struct pollfd fds = {0}; |
+ int fd = dispatcher->GetDescriptor(); |
+ fds.fd = fd; |
+ |
+ while (fWait_) { |
+ uint32_t ff = dispatcher->GetRequestedEvents(); |
+ fds.events = 0; |
+ if (ff & (DE_READ | DE_ACCEPT)) { |
+ fds.events |= POLLIN; |
+ } |
+ if (ff & (DE_WRITE | DE_CONNECT)) { |
+ fds.events |= POLLOUT; |
+ } |
+ fds.revents = 0; |
+ |
+ // Wait then call handlers as appropriate |
+ // < 0 means error |
+ // 0 means timeout |
+ // > 0 means count of descriptors ready |
+ int n = poll(&fds, 1, static_cast<int>(tvWait)); |
+ if (n < 0) { |
+ if (errno != EINTR) { |
+ LOG_E(LS_ERROR, EN, errno) << "poll"; |
+ return false; |
+ } |
+ // Else ignore the error and keep going. If this EINTR was for one of the |
+ // signals managed by this PhysicalSocketServer, the |
+ // PosixSignalDeliveryDispatcher will be in the signaled state in the next |
+ // iteration. |
+ } else if (n == 0) { |
+ // If timeout, return success |
+ return true; |
+ } else { |
+ // We have signaled descriptors (should only be the passed dispatcher). |
+ RTC_DCHECK_EQ(n, 1); |
+ RTC_DCHECK_EQ(fds.fd, fd); |
+ |
+ bool readable = (fds.revents & (POLLIN | POLLPRI)); |
+ bool writable = (fds.revents & POLLOUT); |
+ bool check_error = (fds.revents & (POLLRDHUP | POLLERR | POLLHUP)); |
+ |
+ ProcessEvents(dispatcher, readable, writable, check_error); |
+ } |
+ |
+ if (cmsWait != kForever) { |
+ tvWait = TimeDiff(tvStop, TimeMillis()); |
+ if (tvWait < 0) { |
+ // Return success on timeout. |
+ return true; |
+ } |
+ } |
+ } |
+ |
+ return true; |
+} |
+ |
+#endif // WEBRTC_USE_EPOLL |
+ |
static void GlobalSignalHandler(int signum) { |
PosixSignalHandler::Instance()->OnPosixSignalReceived(signum); |
} |
@@ -1445,12 +1831,13 @@ bool PhysicalSocketServer::Wait(int cmsWait, bool process_io) { |
{ |
CritScope cr(&crit_); |
- size_t i = 0; |
- iterators_.push_back(&i); |
- // Don't track dispatchers_.size(), because we want to pick up any new |
- // dispatchers that were added while processing the loop. |
- while (i < dispatchers_.size()) { |
- Dispatcher* disp = dispatchers_[i++]; |
+ // TODO(jbauch): Support re-entrant waiting. |
+ RTC_DCHECK(!processing_dispatchers_); |
+ |
+ // Calling "CheckSignalClose" might remove a closed dispatcher from the |
+ // set. This must be deferred to prevent invalidating the iterator. |
+ processing_dispatchers_ = true; |
+ for (Dispatcher* disp : dispatchers_) { |
if (!process_io && (disp != signal_wakeup_)) |
continue; |
SOCKET s = disp->GetSocket(); |
@@ -1465,8 +1852,11 @@ bool PhysicalSocketServer::Wait(int cmsWait, bool process_io) { |
event_owners.push_back(disp); |
} |
} |
- RTC_DCHECK(iterators_.back() == &i); |
- iterators_.pop_back(); |
+ |
+ processing_dispatchers_ = false; |
+ // Process deferred dispatchers that have been added/removed while the |
+ // events were handled above. |
+ AddRemovePendingDispatchers(); |
} |
// Which is shorter, the delay wait or the asked wait? |
@@ -1500,14 +1890,15 @@ bool PhysicalSocketServer::Wait(int cmsWait, bool process_io) { |
int index = dw - WSA_WAIT_EVENT_0; |
if (index > 0) { |
--index; // The first event is the socket event |
- event_owners[index]->OnPreEvent(0); |
- event_owners[index]->OnEvent(0, 0); |
+ Dispatcher* disp = event_owners[index]; |
+ // The dispatcher could have been removed while waiting for events. |
+ if (dispatchers_.find(disp) != dispatchers_.end()) { |
+ disp->OnPreEvent(0); |
+ disp->OnEvent(0, 0); |
+ } |
} else if (process_io) { |
- size_t i = 0, end = dispatchers_.size(); |
- iterators_.push_back(&i); |
- iterators_.push_back(&end); // Don't iterate over new dispatchers. |
- while (i < end) { |
- Dispatcher* disp = dispatchers_[i++]; |
+ processing_dispatchers_ = true; |
+ for (Dispatcher* disp : dispatchers_) { |
SOCKET s = disp->GetSocket(); |
if (s == INVALID_SOCKET) |
continue; |
@@ -1568,10 +1959,11 @@ bool PhysicalSocketServer::Wait(int cmsWait, bool process_io) { |
} |
} |
} |
- RTC_DCHECK(iterators_.back() == &end); |
- iterators_.pop_back(); |
- RTC_DCHECK(iterators_.back() == &i); |
- iterators_.pop_back(); |
+ |
+ processing_dispatchers_ = false; |
+ // Process deferred dispatchers that have been added/removed while the |
+ // events were handled above. |
+ AddRemovePendingDispatchers(); |
} |
// Reset the network event until new activity occurs |