Index: webrtc/base/physicalsocketserver.cc |
diff --git a/webrtc/base/physicalsocketserver.cc b/webrtc/base/physicalsocketserver.cc |
index a412703015e77f903b82c6126edd21a4c00a023d..6354cd63b6f4c8b3c8e8532d9a39d7cd9c31fc3c 100644 |
--- a/webrtc/base/physicalsocketserver.cc |
+++ b/webrtc/base/physicalsocketserver.cc |
@@ -80,6 +80,13 @@ int64_t GetSocketRecvTimestamp(int socket) { |
typedef char* SockOptArg; |
#endif |
+#if defined(WEBRTC_USE_EPOLL) |
+// EPOLLRDHUP is only defined starting with Linux 2.6.17. |
+#if !defined(EPOLLRDHUP) |
+#define EPOLLRDHUP 0x2000 |
+#endif |
+#endif |
+ |
namespace rtc { |
std::unique_ptr<SocketServer> SocketServer::CreateDefault() { |
@@ -765,6 +772,11 @@ void SocketDispatcher::OnEvent(uint32_t ff, int err) { |
#elif defined(WEBRTC_POSIX) |
void SocketDispatcher::OnEvent(uint32_t ff, int err) { |
+#if defined(WEBRTC_USE_EPOLL) |
+ // Remember currently enabled events so we can combine multiple changes |
+ // into one update call later. |
+ SaveEnabledEvents(); |
+#endif |
// Make sure we deliver connect/accept first. Otherwise, consumers may see |
// something like a READ followed by a CONNECT, which would be odd. |
if ((ff & DE_CONNECT) != 0) { |
@@ -788,10 +800,53 @@ void SocketDispatcher::OnEvent(uint32_t ff, int err) { |
SetEnabledEvents(0); |
SignalCloseEvent(this, err); |
} |
+#if defined(WEBRTC_USE_EPOLL) |
+ RestoreEnabledEvents(); |
Taylor Brandstetter
2017/05/17 23:19:02
This name isn't very intuitive; it sounds like it
joachim
2017/05/18 12:41:49
Keeping a bitmask of events to disable won't work,
Taylor Brandstetter
2017/05/18 21:59:56
Ah, that's what I was missing. Can you mention it
joachim
2017/05/19 20:46:23
Done.
|
+#endif |
} |
#endif // WEBRTC_POSIX |
+#if defined(WEBRTC_USE_EPOLL) |
+ |
+void SocketDispatcher::SaveEnabledEvents() { |
+ RTC_DCHECK_EQ(saved_enabled_events_, -1); |
+ saved_enabled_events_ = enabled_events(); |
+} |
+ |
+void SocketDispatcher::RestoreEnabledEvents() { |
+ RTC_DCHECK_NE(saved_enabled_events_, -1); |
+ uint8_t old_events = static_cast<uint8_t>(saved_enabled_events_); |
+ saved_enabled_events_ = -1; |
+ MaybeUpdateDispatcher(old_events); |
+} |
+ |
+void SocketDispatcher::MaybeUpdateDispatcher(uint8_t old_events) { |
+ if (enabled_events() != old_events && saved_enabled_events_ == -1) { |
Taylor Brandstetter
2017/05/17 23:19:02
Since DE_READ and DE_ACCEPT both map to EPOLLIN: s
joachim
2017/05/18 12:41:49
Right, thanks for spotting.
|
+ ss_->Update(this); |
+ } |
+} |
+ |
+void SocketDispatcher::SetEnabledEvents(uint8_t events) { |
+ uint8_t old_events = enabled_events(); |
+ PhysicalSocket::SetEnabledEvents(events); |
+ MaybeUpdateDispatcher(old_events); |
+} |
+ |
+void SocketDispatcher::EnableEvents(uint8_t events) { |
+ uint8_t old_events = enabled_events(); |
+ PhysicalSocket::EnableEvents(events); |
+ MaybeUpdateDispatcher(old_events); |
+} |
+ |
+void SocketDispatcher::DisableEvents(uint8_t events) { |
+ uint8_t old_events = enabled_events(); |
+ PhysicalSocket::DisableEvents(events); |
+ MaybeUpdateDispatcher(old_events); |
+} |
+ |
+#endif // WEBRTC_USE_EPOLL |
+ |
int SocketDispatcher::Close() { |
if (s_ == INVALID_SOCKET) |
return 0; |
@@ -1120,6 +1175,17 @@ class Signaler : public EventDispatcher { |
PhysicalSocketServer::PhysicalSocketServer() |
: fWait_(false) { |
+#if defined(WEBRTC_USE_EPOLL) |
+ // Since Linux 2.6.8, the size argument is ignored, but must be greater than |
+ // zero. Before that the size served as hint to the kernel for the amount of |
+ // space to initially allocate in internal data structures. |
+ epoll_fd_ = epoll_create(FD_SETSIZE); |
+ if (epoll_fd_ == -1) { |
+ // Not an error, will fall back to "select" below. |
+ LOG_E(LS_WARNING, EN, errno) << "epoll_create"; |
+ epoll_fd_ = INVALID_SOCKET; |
+ } |
+#endif |
signal_wakeup_ = new Signaler(this, &fWait_); |
#if defined(WEBRTC_WIN) |
socket_ev_ = WSACreateEvent(); |
@@ -1134,6 +1200,11 @@ PhysicalSocketServer::~PhysicalSocketServer() { |
signal_dispatcher_.reset(); |
#endif |
delete signal_wakeup_; |
+#if defined(WEBRTC_USE_EPOLL) |
+ if (epoll_fd_ != INVALID_SOCKET) { |
+ close(epoll_fd_); |
+ } |
+#endif |
RTC_DCHECK(dispatchers_.empty()); |
} |
@@ -1181,6 +1252,7 @@ AsyncSocket* PhysicalSocketServer::WrapSocket(SOCKET s) { |
void PhysicalSocketServer::Add(Dispatcher *pdispatcher) { |
CritScope cs(&crit_); |
+#if defined(WEBRTC_WIN) |
// Prevent duplicates. This can cause dead dispatchers to stick around. |
DispatcherList::iterator pos = std::find(dispatchers_.begin(), |
dispatchers_.end(), |
@@ -1188,10 +1260,19 @@ void PhysicalSocketServer::Add(Dispatcher *pdispatcher) { |
if (pos != dispatchers_.end()) |
return; |
dispatchers_.push_back(pdispatcher); |
+#else |
+ dispatchers_.insert(pdispatcher); |
+#endif // WEBRTC_WIN |
+#if defined(WEBRTC_USE_EPOLL) |
+ if (epoll_fd_ != INVALID_SOCKET) { |
+ AddEpoll(pdispatcher); |
+ } |
+#endif // WEBRTC_USE_EPOLL |
} |
void PhysicalSocketServer::Remove(Dispatcher *pdispatcher) { |
CritScope cs(&crit_); |
+#if defined(WEBRTC_WIN) |
DispatcherList::iterator pos = std::find(dispatchers_.begin(), |
dispatchers_.end(), |
pdispatcher); |
@@ -1211,10 +1292,50 @@ void PhysicalSocketServer::Remove(Dispatcher *pdispatcher) { |
--**it; |
} |
} |
+#else |
+ if (!dispatchers_.erase(pdispatcher)) { |
+ LOG(LS_WARNING) << "PhysicalSocketServer asked to remove a unknown " |
+ << "dispatcher, potentially from a duplicate call to Add."; |
+ return; |
+ } |
+#endif // WEBRTC_WIN |
+#if defined(WEBRTC_USE_EPOLL) |
+ if (epoll_fd_ != INVALID_SOCKET) { |
+ RemoveEpoll(pdispatcher); |
+ } |
+#endif // WEBRTC_USE_EPOLL |
+} |
+ |
+void PhysicalSocketServer::Update(Dispatcher *pdispatcher) { |
+#if defined(WEBRTC_USE_EPOLL) |
+ if (epoll_fd_ == INVALID_SOCKET) { |
+ return; |
+ } |
+ |
+ CritScope cs(&crit_); |
+ if (dispatchers_.find(pdispatcher) == dispatchers_.end()) { |
+ return; |
+ } |
+ |
+ UpdateEpoll(pdispatcher); |
+#endif |
} |
#if defined(WEBRTC_POSIX) |
+ |
bool PhysicalSocketServer::Wait(int cmsWait, bool process_io) { |
+#if defined(WEBRTC_USE_EPOLL) |
+ // We don't keep a dedicated epoll descriptor containing only non-IO (i.e. |
+ // signaling) dispatchers, so the default "select" loop will be used in that |
+ // case. |
+ if (epoll_fd_ != INVALID_SOCKET && process_io) { |
+ return WaitEpoll(cmsWait); |
+ } |
+#endif |
+ return WaitSelect(cmsWait, process_io); |
+} |
zhaoyanfeng
2017/05/18 07:46:17
If process_io == false, still use select?
That see
joachim
2017/05/18 12:41:49
The wakeup dispatcher is created early during cons
zhaoyanfeng
2017/05/18 14:08:37
I have tested it in our server but failed, our ser
joachim
2017/05/18 19:31:53
I changed it now to use "poll" to wait for the sig
|
+ |
+bool PhysicalSocketServer::WaitSelect(int cmsWait, bool process_io) { |
// Calculate timing information |
struct timeval* ptvWait = nullptr; |
@@ -1257,13 +1378,15 @@ bool PhysicalSocketServer::Wait(int cmsWait, bool process_io) { |
int fdmax = -1; |
{ |
CritScope cr(&crit_); |
- for (size_t i = 0; i < dispatchers_.size(); ++i) { |
+ for (Dispatcher* pdispatcher : dispatchers_) { |
// Query dispatchers for read and write wait state |
- Dispatcher *pdispatcher = dispatchers_[i]; |
RTC_DCHECK(pdispatcher); |
if (!process_io && (pdispatcher != signal_wakeup_)) |
continue; |
int fd = pdispatcher->GetDescriptor(); |
+ // "select"ing a file descriptor that is equal to or larger than |
+ // FD_SETSIZE will result in undefined behavior. |
+ RTC_DCHECK_LT(fd, FD_SETSIZE); |
if (fd > fdmax) |
fdmax = fd; |
@@ -1297,8 +1420,7 @@ bool PhysicalSocketServer::Wait(int cmsWait, bool process_io) { |
} else { |
// We have signaled descriptors |
CritScope cr(&crit_); |
- for (size_t i = 0; i < dispatchers_.size(); ++i) { |
- Dispatcher *pdispatcher = dispatchers_[i]; |
+ for (Dispatcher *pdispatcher : dispatchers_) { |
int fd = pdispatcher->GetDescriptor(); |
uint32_t ff = 0; |
int errcode = 0; |
@@ -1372,6 +1494,197 @@ bool PhysicalSocketServer::Wait(int cmsWait, bool process_io) { |
return true; |
} |
+#if defined(WEBRTC_USE_EPOLL) |
+ |
+// Initial number of events to process with one call to "epoll_wait". |
+static const size_t kInitialEpollEvents = 128; |
+ |
+// Maximum number of events to process with one call to "epoll_wait". |
+static const size_t kMaxEpollEvents = 8192; |
+ |
+static int GetEpollEvents(Dispatcher* pdispatcher) { |
+ int events = 0; |
+ uint32_t ff = pdispatcher->GetRequestedEvents(); |
+ if (ff & (DE_READ | DE_ACCEPT)) { |
+ events |= EPOLLIN; |
+ } |
+ if (ff & (DE_WRITE | DE_CONNECT)) { |
+ events |= EPOLLOUT; |
+ } |
+ return events; |
+} |
+ |
+void PhysicalSocketServer::AddEpoll(Dispatcher* pdispatcher) { |
+ RTC_DCHECK(epoll_fd_ != INVALID_SOCKET); |
+ int fd = pdispatcher->GetDescriptor(); |
+ RTC_DCHECK(fd != INVALID_SOCKET); |
+ if (fd == INVALID_SOCKET) { |
+ return; |
+ } |
+ |
+ struct epoll_event event = {0}; |
+ event.events = GetEpollEvents(pdispatcher); |
+ event.data.ptr = pdispatcher; |
+ int err = epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, fd, &event); |
+ RTC_DCHECK_EQ(err, 0); |
+ if (err == -1) { |
+ LOG_E(LS_ERROR, EN, errno) << "epoll_ctl EPOLL_CTL_ADD"; |
+ } |
+} |
+ |
+void PhysicalSocketServer::RemoveEpoll(Dispatcher* pdispatcher) { |
+ RTC_DCHECK(epoll_fd_ != INVALID_SOCKET); |
+ int fd = pdispatcher->GetDescriptor(); |
+ RTC_DCHECK(fd != INVALID_SOCKET); |
+ if (fd == INVALID_SOCKET) { |
+ return; |
+ } |
+ |
+ struct epoll_event event = {0}; |
+ int err = epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, fd, &event); |
+ RTC_DCHECK(err == 0 || errno == ENOENT); |
+ if (err == -1) { |
+ if (errno == ENOENT) { |
+ // Socket has already been closed. |
+ LOG_E(LS_VERBOSE, EN, errno) << "epoll_ctl EPOLL_CTL_DEL"; |
+ } else { |
+ LOG_E(LS_ERROR, EN, errno) << "epoll_ctl EPOLL_CTL_DEL"; |
+ } |
+ } |
+} |
+ |
+void PhysicalSocketServer::UpdateEpoll(Dispatcher* pdispatcher) { |
+ RTC_DCHECK(epoll_fd_ != INVALID_SOCKET); |
+ int fd = pdispatcher->GetDescriptor(); |
+ RTC_DCHECK(fd != INVALID_SOCKET); |
+ if (fd == INVALID_SOCKET) { |
+ return; |
+ } |
+ |
+ struct epoll_event event = {0}; |
+ event.events = GetEpollEvents(pdispatcher); |
+ event.data.ptr = pdispatcher; |
+ int err = epoll_ctl(epoll_fd_, EPOLL_CTL_MOD, fd, &event); |
+ RTC_DCHECK_EQ(err, 0); |
+ if (err == -1) { |
+ LOG_E(LS_ERROR, EN, errno) << "epoll_ctl EPOLL_CTL_MOD"; |
+ } |
+} |
+ |
+bool PhysicalSocketServer::WaitEpoll(int cmsWait) { |
+ RTC_DCHECK(epoll_fd_ != INVALID_SOCKET); |
+ int64_t tvWait = -1; |
+ int64_t tvStop = -1; |
+ if (cmsWait != kForever) { |
+ tvWait = cmsWait; |
+ tvStop = TimeAfter(cmsWait); |
+ } |
+ |
+ if (epoll_events_.empty()) { |
+ // The initial space to receive events is created only if epoll is used. |
+ epoll_events_.resize(kInitialEpollEvents); |
+ } |
+ |
+ fWait_ = true; |
+ |
+ while (fWait_) { |
+ // Wait then call handlers as appropriate |
+ // < 0 means error |
+ // 0 means timeout |
+ // > 0 means count of descriptors ready |
+ int n = epoll_wait(epoll_fd_, &epoll_events_[0], |
+ static_cast<int>(epoll_events_.size()), static_cast<int>(tvWait)); |
+ if (n < 0) { |
+ if (errno != EINTR) { |
+ LOG_E(LS_ERROR, EN, errno) << "epoll"; |
+ return false; |
+ } |
+ // Else ignore the error and keep going. If this EINTR was for one of the |
+ // signals managed by this PhysicalSocketServer, the |
+ // PosixSignalDeliveryDispatcher will be in the signaled state in the next |
+ // iteration. |
+ } else if (n == 0) { |
+ // If timeout, return success |
+ return true; |
+ } else { |
+ // We have signaled descriptors |
+ CritScope cr(&crit_); |
+ for (int i = 0; i < n; ++i) { |
+ const epoll_event& event = epoll_events_[i]; |
+ Dispatcher* pdispatcher = static_cast<Dispatcher*>(event.data.ptr); |
+ if (dispatchers_.find(pdispatcher) == dispatchers_.end()) { |
+ // The dispatcher for this socket no longer exists. |
+ continue; |
+ } |
+ |
+ int fd = pdispatcher->GetDescriptor(); |
+ uint32_t ff = 0; |
+ int errcode = 0; |
+ |
+ // Reap any error code. |
+ // TODO(pthatcher): Should we set errcode if getsockopt fails? |
+ if (event.events & (EPOLLRDHUP | EPOLLERR | EPOLLHUP)) { |
+ socklen_t len = sizeof(errcode); |
+ ::getsockopt(fd, SOL_SOCKET, SO_ERROR, &errcode, &len); |
+ } |
+ |
+ // Check readable descriptors. If we're waiting on an accept, signal |
+ // that. Otherwise we're waiting for data, check to see if we're |
+ // readable or really closed. |
+ // TODO(pthatcher): Only peek at TCP descriptors. |
+ if ((event.events & (EPOLLIN | EPOLLPRI))) { |
+ if (pdispatcher->GetRequestedEvents() & DE_ACCEPT) { |
+ ff |= DE_ACCEPT; |
+ } else if (errcode || pdispatcher->IsDescriptorClosed()) { |
+ ff |= DE_CLOSE; |
+ } else { |
+ ff |= DE_READ; |
+ } |
+ } |
+ |
+ // Check writable descriptors. If we're waiting on a connect, detect |
+ // success versus failure by the reaped error code. |
+ if ((event.events & EPOLLOUT)) { |
+ if (pdispatcher->GetRequestedEvents() & DE_CONNECT) { |
+ if (!errcode) { |
+ ff |= DE_CONNECT; |
+ } else { |
+ ff |= DE_CLOSE; |
+ } |
+ } else { |
+ ff |= DE_WRITE; |
+ } |
+ } |
+ |
+ // Tell the descriptor about the event. |
+ if (ff != 0) { |
+ pdispatcher->OnPreEvent(ff); |
+ pdispatcher->OnEvent(ff, errcode); |
+ } |
+ } |
+ } |
+ |
+ if (static_cast<size_t>(n) == epoll_events_.size() && |
+ epoll_events_.size() < kMaxEpollEvents) { |
+ // We used the complete space to receive events, increase size for future |
+ // iterations. |
+ epoll_events_.resize(std::max(epoll_events_.size() * 2, kMaxEpollEvents)); |
+ } |
+ |
+ if (cmsWait != kForever) { |
+ tvWait = TimeDiff(tvStop, TimeMillis()); |
+ if (tvWait < 0) { |
+ // Return success on timeout. |
+ return true; |
+ } |
+ } |
+ } |
+ |
+ return true; |
+} |
+ |
+#endif // WEBRTC_USE_EPOLL |
+ |
static void GlobalSignalHandler(int signum) { |
PosixSignalHandler::Instance()->OnPosixSignalReceived(signum); |
} |