Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(82)

Unified Diff: webrtc/base/physicalsocketserver.cc

Issue 2880923002: Support epoll in PhysicalSocketServer. (Closed)
Patch Set: Win: Prevent updates to dispatcher in loop before waiting. Created 3 years, 7 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « webrtc/base/physicalsocketserver.h ('k') | no next file » | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: webrtc/base/physicalsocketserver.cc
diff --git a/webrtc/base/physicalsocketserver.cc b/webrtc/base/physicalsocketserver.cc
index a412703015e77f903b82c6126edd21a4c00a023d..91e890cc0dd5229d396de1ffbe0028481d6ad78d 100644
--- a/webrtc/base/physicalsocketserver.cc
+++ b/webrtc/base/physicalsocketserver.cc
@@ -21,6 +21,10 @@
#include <string.h>
#include <errno.h>
#include <fcntl.h>
+#if defined(WEBRTC_USE_EPOLL)
+// "poll" will be used to wait for the signal dispatcher.
+#include <poll.h>
+#endif
#include <sys/ioctl.h>
#include <sys/time.h>
#include <sys/select.h>
@@ -80,6 +84,16 @@ int64_t GetSocketRecvTimestamp(int socket) {
typedef char* SockOptArg;
#endif
+#if defined(WEBRTC_USE_EPOLL)
+// POLLRDHUP / EPOLLRDHUP are only defined starting with Linux 2.6.17.
+#if !defined(POLLRDHUP)
+#define POLLRDHUP 0x2000
+#endif
+#if !defined(EPOLLRDHUP)
+#define EPOLLRDHUP 0x2000
+#endif
+#endif
+
namespace rtc {
std::unique_ptr<SocketServer> SocketServer::CreateDefault() {
@@ -765,6 +779,14 @@ void SocketDispatcher::OnEvent(uint32_t ff, int err) {
#elif defined(WEBRTC_POSIX)
void SocketDispatcher::OnEvent(uint32_t ff, int err) {
+#if defined(WEBRTC_USE_EPOLL)
+ // Remember currently enabled events so we can combine multiple changes
+ // into one update call later.
+ // The signal handlers might re-enable events disabled here, so we can't
+ // keep a list of events to disable at the end of the method. This list
+ // would not be updated with the events enabled by the signal handlers.
+ StartBatchedEventUpdates();
+#endif
// Make sure we deliver connect/accept first. Otherwise, consumers may see
// something like a READ followed by a CONNECT, which would be odd.
if ((ff & DE_CONNECT) != 0) {
@@ -788,10 +810,65 @@ void SocketDispatcher::OnEvent(uint32_t ff, int err) {
SetEnabledEvents(0);
SignalCloseEvent(this, err);
}
+#if defined(WEBRTC_USE_EPOLL)
+ FinishBatchedEventUpdates();
+#endif
}
#endif // WEBRTC_POSIX
+#if defined(WEBRTC_USE_EPOLL)
+
+static int GetEpollEvents(uint32_t ff) {
+ int events = 0;
+ if (ff & (DE_READ | DE_ACCEPT)) {
+ events |= EPOLLIN;
+ }
+ if (ff & (DE_WRITE | DE_CONNECT)) {
+ events |= EPOLLOUT;
+ }
+ return events;
+}
+
+void SocketDispatcher::StartBatchedEventUpdates() {
+ RTC_DCHECK_EQ(saved_enabled_events_, -1);
+ saved_enabled_events_ = enabled_events();
+}
+
+void SocketDispatcher::FinishBatchedEventUpdates() {
+ RTC_DCHECK_NE(saved_enabled_events_, -1);
+ uint8_t old_events = static_cast<uint8_t>(saved_enabled_events_);
+ saved_enabled_events_ = -1;
+ MaybeUpdateDispatcher(old_events);
+}
+
+void SocketDispatcher::MaybeUpdateDispatcher(uint8_t old_events) {
+ if (GetEpollEvents(enabled_events()) != GetEpollEvents(old_events) &&
+ saved_enabled_events_ == -1) {
+ ss_->Update(this);
+ }
+}
+
+void SocketDispatcher::SetEnabledEvents(uint8_t events) {
+ uint8_t old_events = enabled_events();
+ PhysicalSocket::SetEnabledEvents(events);
+ MaybeUpdateDispatcher(old_events);
+}
+
+void SocketDispatcher::EnableEvents(uint8_t events) {
+ uint8_t old_events = enabled_events();
+ PhysicalSocket::EnableEvents(events);
+ MaybeUpdateDispatcher(old_events);
+}
+
+void SocketDispatcher::DisableEvents(uint8_t events) {
+ uint8_t old_events = enabled_events();
+ PhysicalSocket::DisableEvents(events);
+ MaybeUpdateDispatcher(old_events);
+}
+
+#endif // WEBRTC_USE_EPOLL
+
int SocketDispatcher::Close() {
if (s_ == INVALID_SOCKET)
return 0;
@@ -1120,6 +1197,17 @@ class Signaler : public EventDispatcher {
PhysicalSocketServer::PhysicalSocketServer()
: fWait_(false) {
+#if defined(WEBRTC_USE_EPOLL)
+ // Since Linux 2.6.8, the size argument is ignored, but must be greater than
+ // zero. Before that the size served as hint to the kernel for the amount of
+ // space to initially allocate in internal data structures.
+ epoll_fd_ = epoll_create(FD_SETSIZE);
+ if (epoll_fd_ == -1) {
+ // Not an error, will fall back to "select" below.
+ LOG_E(LS_WARNING, EN, errno) << "epoll_create";
+ epoll_fd_ = INVALID_SOCKET;
+ }
+#endif
signal_wakeup_ = new Signaler(this, &fWait_);
#if defined(WEBRTC_WIN)
socket_ev_ = WSACreateEvent();
@@ -1134,6 +1222,11 @@ PhysicalSocketServer::~PhysicalSocketServer() {
signal_dispatcher_.reset();
#endif
delete signal_wakeup_;
+#if defined(WEBRTC_USE_EPOLL)
+ if (epoll_fd_ != INVALID_SOCKET) {
+ close(epoll_fd_);
+ }
+#endif
RTC_DCHECK(dispatchers_.empty());
}
@@ -1181,40 +1274,148 @@ AsyncSocket* PhysicalSocketServer::WrapSocket(SOCKET s) {
void PhysicalSocketServer::Add(Dispatcher *pdispatcher) {
CritScope cs(&crit_);
- // Prevent duplicates. This can cause dead dispatchers to stick around.
- DispatcherList::iterator pos = std::find(dispatchers_.begin(),
- dispatchers_.end(),
- pdispatcher);
- if (pos != dispatchers_.end())
- return;
- dispatchers_.push_back(pdispatcher);
+ if (processing_dispatchers_) {
+ // A dispatcher is being added while a "Wait" call is processing the
+ // list of socket events.
+ // Defer adding to "dispatchers_" set until processing is done to avoid
+ // invalidating the iterator in "Wait".
+ pending_remove_dispatchers_.erase(pdispatcher);
+ pending_add_dispatchers_.insert(pdispatcher);
+ } else {
+ dispatchers_.insert(pdispatcher);
+ }
+#if defined(WEBRTC_USE_EPOLL)
+ if (epoll_fd_ != INVALID_SOCKET) {
+ AddEpoll(pdispatcher);
+ }
+#endif // WEBRTC_USE_EPOLL
}
void PhysicalSocketServer::Remove(Dispatcher *pdispatcher) {
CritScope cs(&crit_);
- DispatcherList::iterator pos = std::find(dispatchers_.begin(),
- dispatchers_.end(),
- pdispatcher);
- // We silently ignore duplicate calls to Add, so we should silently ignore
- // the (expected) symmetric calls to Remove. Note that this may still hide
- // a real issue, so we at least log a warning about it.
- if (pos == dispatchers_.end()) {
+ if (processing_dispatchers_) {
+ // A dispatcher is being removed while a "Wait" call is processing the
+ // list of socket events.
+ // Defer removal from "dispatchers_" set until processing is done to avoid
+ // invalidating the iterator in "Wait".
+ if (!pending_add_dispatchers_.erase(pdispatcher) &&
+ dispatchers_.find(pdispatcher) == dispatchers_.end()) {
+ LOG(LS_WARNING) << "PhysicalSocketServer asked to remove a unknown "
+ << "dispatcher, potentially from a duplicate call to "
+ << "Add.";
+ return;
+ }
+
+ pending_remove_dispatchers_.insert(pdispatcher);
+ } else if (!dispatchers_.erase(pdispatcher)) {
LOG(LS_WARNING) << "PhysicalSocketServer asked to remove a unknown "
<< "dispatcher, potentially from a duplicate call to Add.";
return;
}
- size_t index = pos - dispatchers_.begin();
- dispatchers_.erase(pos);
- for (IteratorList::iterator it = iterators_.begin(); it != iterators_.end();
- ++it) {
- if (index < **it) {
- --**it;
+#if defined(WEBRTC_USE_EPOLL)
+ if (epoll_fd_ != INVALID_SOCKET) {
+ RemoveEpoll(pdispatcher);
+ }
+#endif // WEBRTC_USE_EPOLL
+}
+
+void PhysicalSocketServer::Update(Dispatcher* pdispatcher) {
+#if defined(WEBRTC_USE_EPOLL)
+ if (epoll_fd_ == INVALID_SOCKET) {
+ return;
+ }
+
+ CritScope cs(&crit_);
+ if (dispatchers_.find(pdispatcher) == dispatchers_.end()) {
+ return;
+ }
+
+ UpdateEpoll(pdispatcher);
+#endif
+}
+
+void PhysicalSocketServer::AddRemovePendingDispatchers() {
+ if (!pending_add_dispatchers_.empty()) {
+ for (Dispatcher* pdispatcher : pending_add_dispatchers_) {
+ dispatchers_.insert(pdispatcher);
+ }
+ pending_add_dispatchers_.clear();
+ }
+
+ if (!pending_remove_dispatchers_.empty()) {
+ for (Dispatcher* pdispatcher : pending_remove_dispatchers_) {
+ dispatchers_.erase(pdispatcher);
}
+ pending_remove_dispatchers_.clear();
}
}
#if defined(WEBRTC_POSIX)
+
bool PhysicalSocketServer::Wait(int cmsWait, bool process_io) {
+#if defined(WEBRTC_USE_EPOLL)
+ // We don't keep a dedicated "epoll" descriptor containing only the non-IO
+ // (i.e. signaling) dispatcher, so "poll" will be used instead of the default
+ // "select" to support sockets larger than FD_SETSIZE.
+ if (!process_io) {
+ return WaitPoll(cmsWait, signal_wakeup_);
+ } else if (epoll_fd_ != INVALID_SOCKET) {
+ return WaitEpoll(cmsWait);
+ }
+#endif
+ return WaitSelect(cmsWait, process_io);
+}
+
+static void ProcessEvents(Dispatcher* dispatcher,
+ bool readable,
+ bool writable,
+ bool check_error) {
+ int errcode = 0;
+ // TODO(pthatcher): Should we set errcode if getsockopt fails?
+ if (check_error) {
+ socklen_t len = sizeof(errcode);
+ ::getsockopt(dispatcher->GetDescriptor(), SOL_SOCKET, SO_ERROR, &errcode,
+ &len);
+ }
+
+ uint32_t ff = 0;
+
+ // Check readable descriptors. If we're waiting on an accept, signal
+ // that. Otherwise we're waiting for data, check to see if we're
+ // readable or really closed.
+ // TODO(pthatcher): Only peek at TCP descriptors.
+ if (readable) {
+ if (dispatcher->GetRequestedEvents() & DE_ACCEPT) {
+ ff |= DE_ACCEPT;
+ } else if (errcode || dispatcher->IsDescriptorClosed()) {
+ ff |= DE_CLOSE;
+ } else {
+ ff |= DE_READ;
+ }
+ }
+
+ // Check writable descriptors. If we're waiting on a connect, detect
+ // success versus failure by the reaped error code.
+ if (writable) {
+ if (dispatcher->GetRequestedEvents() & DE_CONNECT) {
+ if (!errcode) {
+ ff |= DE_CONNECT;
+ } else {
+ ff |= DE_CLOSE;
+ }
+ } else {
+ ff |= DE_WRITE;
+ }
+ }
+
+ // Tell the descriptor about the event.
+ if (ff != 0) {
+ dispatcher->OnPreEvent(ff);
+ dispatcher->OnEvent(ff, errcode);
+ }
+}
+
+bool PhysicalSocketServer::WaitSelect(int cmsWait, bool process_io) {
// Calculate timing information
struct timeval* ptvWait = nullptr;
@@ -1257,13 +1458,17 @@ bool PhysicalSocketServer::Wait(int cmsWait, bool process_io) {
int fdmax = -1;
{
CritScope cr(&crit_);
- for (size_t i = 0; i < dispatchers_.size(); ++i) {
+ // TODO(jbauch): Support re-entrant waiting.
+ RTC_DCHECK(!processing_dispatchers_);
+ for (Dispatcher* pdispatcher : dispatchers_) {
// Query dispatchers for read and write wait state
- Dispatcher *pdispatcher = dispatchers_[i];
RTC_DCHECK(pdispatcher);
if (!process_io && (pdispatcher != signal_wakeup_))
continue;
int fd = pdispatcher->GetDescriptor();
+ // "select"ing a file descriptor that is equal to or larger than
+ // FD_SETSIZE will result in undefined behavior.
+ RTC_DCHECK_LT(fd, FD_SETSIZE);
if (fd > fdmax)
fdmax = fd;
@@ -1297,55 +1502,28 @@ bool PhysicalSocketServer::Wait(int cmsWait, bool process_io) {
} else {
// We have signaled descriptors
CritScope cr(&crit_);
- for (size_t i = 0; i < dispatchers_.size(); ++i) {
- Dispatcher *pdispatcher = dispatchers_[i];
+ processing_dispatchers_ = true;
+ for (Dispatcher* pdispatcher : dispatchers_) {
int fd = pdispatcher->GetDescriptor();
- uint32_t ff = 0;
- int errcode = 0;
-
- // Reap any error code, which can be signaled through reads or writes.
- // TODO(pthatcher): Should we set errcode if getsockopt fails?
- if (FD_ISSET(fd, &fdsRead) || FD_ISSET(fd, &fdsWrite)) {
- socklen_t len = sizeof(errcode);
- ::getsockopt(fd, SOL_SOCKET, SO_ERROR, &errcode, &len);
- }
- // Check readable descriptors. If we're waiting on an accept, signal
- // that. Otherwise we're waiting for data, check to see if we're
- // readable or really closed.
- // TODO(pthatcher): Only peek at TCP descriptors.
- if (FD_ISSET(fd, &fdsRead)) {
+ bool readable = FD_ISSET(fd, &fdsRead);
+ if (readable) {
FD_CLR(fd, &fdsRead);
- if (pdispatcher->GetRequestedEvents() & DE_ACCEPT) {
- ff |= DE_ACCEPT;
- } else if (errcode || pdispatcher->IsDescriptorClosed()) {
- ff |= DE_CLOSE;
- } else {
- ff |= DE_READ;
- }
}
- // Check writable descriptors. If we're waiting on a connect, detect
- // success versus failure by the reaped error code.
- if (FD_ISSET(fd, &fdsWrite)) {
+ bool writable = FD_ISSET(fd, &fdsWrite);
+ if (writable) {
FD_CLR(fd, &fdsWrite);
- if (pdispatcher->GetRequestedEvents() & DE_CONNECT) {
- if (!errcode) {
- ff |= DE_CONNECT;
- } else {
- ff |= DE_CLOSE;
- }
- } else {
- ff |= DE_WRITE;
- }
}
- // Tell the descriptor about the event.
- if (ff != 0) {
- pdispatcher->OnPreEvent(ff);
- pdispatcher->OnEvent(ff, errcode);
- }
+ // The error code can be signaled through reads or writes.
+ ProcessEvents(pdispatcher, readable, writable, readable || writable);
}
+
+ processing_dispatchers_ = false;
+ // Process deferred dispatchers that have been added/removed while the
+ // events were handled above.
+ AddRemovePendingDispatchers();
}
// Recalc the time remaining to wait. Doing it here means it doesn't get
@@ -1372,6 +1550,214 @@ bool PhysicalSocketServer::Wait(int cmsWait, bool process_io) {
return true;
}
+#if defined(WEBRTC_USE_EPOLL)
+
+// Initial number of events to process with one call to "epoll_wait".
+static const size_t kInitialEpollEvents = 128;
+
+// Maximum number of events to process with one call to "epoll_wait".
+static const size_t kMaxEpollEvents = 8192;
+
+void PhysicalSocketServer::AddEpoll(Dispatcher* pdispatcher) {
+ RTC_DCHECK(epoll_fd_ != INVALID_SOCKET);
+ int fd = pdispatcher->GetDescriptor();
+ RTC_DCHECK(fd != INVALID_SOCKET);
+ if (fd == INVALID_SOCKET) {
+ return;
+ }
+
+ struct epoll_event event = {0};
+ event.events = GetEpollEvents(pdispatcher->GetRequestedEvents());
+ event.data.ptr = pdispatcher;
+ int err = epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, fd, &event);
+ RTC_DCHECK_EQ(err, 0);
+ if (err == -1) {
+ LOG_E(LS_ERROR, EN, errno) << "epoll_ctl EPOLL_CTL_ADD";
+ }
+}
+
+void PhysicalSocketServer::RemoveEpoll(Dispatcher* pdispatcher) {
+ RTC_DCHECK(epoll_fd_ != INVALID_SOCKET);
+ int fd = pdispatcher->GetDescriptor();
+ RTC_DCHECK(fd != INVALID_SOCKET);
+ if (fd == INVALID_SOCKET) {
+ return;
+ }
+
+ struct epoll_event event = {0};
+ int err = epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, fd, &event);
+ RTC_DCHECK(err == 0 || errno == ENOENT);
+ if (err == -1) {
+ if (errno == ENOENT) {
+ // Socket has already been closed.
+ LOG_E(LS_VERBOSE, EN, errno) << "epoll_ctl EPOLL_CTL_DEL";
+ } else {
+ LOG_E(LS_ERROR, EN, errno) << "epoll_ctl EPOLL_CTL_DEL";
+ }
+ }
+}
+
+void PhysicalSocketServer::UpdateEpoll(Dispatcher* pdispatcher) {
+ RTC_DCHECK(epoll_fd_ != INVALID_SOCKET);
+ int fd = pdispatcher->GetDescriptor();
+ RTC_DCHECK(fd != INVALID_SOCKET);
+ if (fd == INVALID_SOCKET) {
+ return;
+ }
+
+ struct epoll_event event = {0};
+ event.events = GetEpollEvents(pdispatcher->GetRequestedEvents());
+ event.data.ptr = pdispatcher;
+ int err = epoll_ctl(epoll_fd_, EPOLL_CTL_MOD, fd, &event);
+ RTC_DCHECK_EQ(err, 0);
+ if (err == -1) {
+ LOG_E(LS_ERROR, EN, errno) << "epoll_ctl EPOLL_CTL_MOD";
+ }
+}
+
+bool PhysicalSocketServer::WaitEpoll(int cmsWait) {
+ RTC_DCHECK(epoll_fd_ != INVALID_SOCKET);
+ int64_t tvWait = -1;
+ int64_t tvStop = -1;
+ if (cmsWait != kForever) {
+ tvWait = cmsWait;
+ tvStop = TimeAfter(cmsWait);
+ }
+
+ if (epoll_events_.empty()) {
+ // The initial space to receive events is created only if epoll is used.
+ epoll_events_.resize(kInitialEpollEvents);
+ }
+
+ fWait_ = true;
+
+ while (fWait_) {
+ // Wait then call handlers as appropriate
+ // < 0 means error
+ // 0 means timeout
+ // > 0 means count of descriptors ready
+ int n = epoll_wait(epoll_fd_, &epoll_events_[0],
+ static_cast<int>(epoll_events_.size()),
+ static_cast<int>(tvWait));
+ if (n < 0) {
+ if (errno != EINTR) {
+ LOG_E(LS_ERROR, EN, errno) << "epoll";
+ return false;
+ }
+ // Else ignore the error and keep going. If this EINTR was for one of the
+ // signals managed by this PhysicalSocketServer, the
+ // PosixSignalDeliveryDispatcher will be in the signaled state in the next
+ // iteration.
+ } else if (n == 0) {
+ // If timeout, return success
+ return true;
+ } else {
+ // We have signaled descriptors
+ CritScope cr(&crit_);
+ for (int i = 0; i < n; ++i) {
+ const epoll_event& event = epoll_events_[i];
+ Dispatcher* pdispatcher = static_cast<Dispatcher*>(event.data.ptr);
+ if (dispatchers_.find(pdispatcher) == dispatchers_.end()) {
+ // The dispatcher for this socket no longer exists.
+ continue;
+ }
+
+ bool readable = (event.events & (EPOLLIN | EPOLLPRI));
+ bool writable = (event.events & EPOLLOUT);
+ bool check_error = (event.events & (EPOLLRDHUP | EPOLLERR | EPOLLHUP));
+
+ ProcessEvents(pdispatcher, readable, writable, check_error);
+ }
+ }
+
+ if (static_cast<size_t>(n) == epoll_events_.size() &&
+ epoll_events_.size() < kMaxEpollEvents) {
+ // We used the complete space to receive events, increase size for future
+ // iterations.
+ epoll_events_.resize(std::max(epoll_events_.size() * 2, kMaxEpollEvents));
+ }
+
+ if (cmsWait != kForever) {
+ tvWait = TimeDiff(tvStop, TimeMillis());
+ if (tvWait < 0) {
+ // Return success on timeout.
+ return true;
+ }
+ }
+ }
+
+ return true;
+}
+
+bool PhysicalSocketServer::WaitPoll(int cmsWait, Dispatcher* dispatcher) {
+ RTC_DCHECK(dispatcher);
+ int64_t tvWait = -1;
+ int64_t tvStop = -1;
+ if (cmsWait != kForever) {
+ tvWait = cmsWait;
+ tvStop = TimeAfter(cmsWait);
+ }
+
+ fWait_ = true;
+
+ struct pollfd fds = {0};
+ int fd = dispatcher->GetDescriptor();
+ fds.fd = fd;
+
+ while (fWait_) {
+ uint32_t ff = dispatcher->GetRequestedEvents();
+ fds.events = 0;
+ if (ff & (DE_READ | DE_ACCEPT)) {
+ fds.events |= POLLIN;
+ }
+ if (ff & (DE_WRITE | DE_CONNECT)) {
+ fds.events |= POLLOUT;
+ }
+ fds.revents = 0;
+
+ // Wait then call handlers as appropriate
+ // < 0 means error
+ // 0 means timeout
+ // > 0 means count of descriptors ready
+ int n = poll(&fds, 1, static_cast<int>(tvWait));
+ if (n < 0) {
+ if (errno != EINTR) {
+ LOG_E(LS_ERROR, EN, errno) << "poll";
+ return false;
+ }
+ // Else ignore the error and keep going. If this EINTR was for one of the
+ // signals managed by this PhysicalSocketServer, the
+ // PosixSignalDeliveryDispatcher will be in the signaled state in the next
+ // iteration.
+ } else if (n == 0) {
+ // If timeout, return success
+ return true;
+ } else {
+ // We have signaled descriptors (should only be the passed dispatcher).
+ RTC_DCHECK_EQ(n, 1);
+ RTC_DCHECK_EQ(fds.fd, fd);
+
+ bool readable = (fds.revents & (POLLIN | POLLPRI));
+ bool writable = (fds.revents & POLLOUT);
+ bool check_error = (fds.revents & (POLLRDHUP | POLLERR | POLLHUP));
+
+ ProcessEvents(dispatcher, readable, writable, check_error);
+ }
+
+ if (cmsWait != kForever) {
+ tvWait = TimeDiff(tvStop, TimeMillis());
+ if (tvWait < 0) {
+ // Return success on timeout.
+ return true;
+ }
+ }
+ }
+
+ return true;
+}
+
+#endif // WEBRTC_USE_EPOLL
+
static void GlobalSignalHandler(int signum) {
PosixSignalHandler::Instance()->OnPosixSignalReceived(signum);
}
@@ -1445,12 +1831,13 @@ bool PhysicalSocketServer::Wait(int cmsWait, bool process_io) {
{
CritScope cr(&crit_);
- size_t i = 0;
- iterators_.push_back(&i);
- // Don't track dispatchers_.size(), because we want to pick up any new
- // dispatchers that were added while processing the loop.
- while (i < dispatchers_.size()) {
- Dispatcher* disp = dispatchers_[i++];
+ // TODO(jbauch): Support re-entrant waiting.
+ RTC_DCHECK(!processing_dispatchers_);
+
+ // Calling "CheckSignalClose" might remove a closed dispatcher from the
+ // set. This must be deferred to prevent invalidating the iterator.
+ processing_dispatchers_ = true;
+ for (Dispatcher* disp : dispatchers_) {
if (!process_io && (disp != signal_wakeup_))
continue;
SOCKET s = disp->GetSocket();
@@ -1465,8 +1852,11 @@ bool PhysicalSocketServer::Wait(int cmsWait, bool process_io) {
event_owners.push_back(disp);
}
}
- RTC_DCHECK(iterators_.back() == &i);
- iterators_.pop_back();
+
+ processing_dispatchers_ = false;
+ // Process deferred dispatchers that have been added/removed while the
+ // events were handled above.
+ AddRemovePendingDispatchers();
}
// Which is shorter, the delay wait or the asked wait?
@@ -1500,14 +1890,15 @@ bool PhysicalSocketServer::Wait(int cmsWait, bool process_io) {
int index = dw - WSA_WAIT_EVENT_0;
if (index > 0) {
--index; // The first event is the socket event
- event_owners[index]->OnPreEvent(0);
- event_owners[index]->OnEvent(0, 0);
+ Dispatcher* disp = event_owners[index];
+ // The dispatcher could have been removed while waiting for events.
+ if (dispatchers_.find(disp) != dispatchers_.end()) {
+ disp->OnPreEvent(0);
+ disp->OnEvent(0, 0);
+ }
} else if (process_io) {
- size_t i = 0, end = dispatchers_.size();
- iterators_.push_back(&i);
- iterators_.push_back(&end); // Don't iterate over new dispatchers.
- while (i < end) {
- Dispatcher* disp = dispatchers_[i++];
+ processing_dispatchers_ = true;
+ for (Dispatcher* disp : dispatchers_) {
SOCKET s = disp->GetSocket();
if (s == INVALID_SOCKET)
continue;
@@ -1568,10 +1959,11 @@ bool PhysicalSocketServer::Wait(int cmsWait, bool process_io) {
}
}
}
- RTC_DCHECK(iterators_.back() == &end);
- iterators_.pop_back();
- RTC_DCHECK(iterators_.back() == &i);
- iterators_.pop_back();
+
+ processing_dispatchers_ = false;
+ // Process deferred dispatchers that have been added/removed while the
+ // events were handled above.
+ AddRemovePendingDispatchers();
}
// Reset the network event until new activity occurs
« no previous file with comments | « webrtc/base/physicalsocketserver.h ('k') | no next file » | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698