Index: webrtc/base/physicalsocketserver.cc |
diff --git a/webrtc/base/physicalsocketserver.cc b/webrtc/base/physicalsocketserver.cc |
index 0d6cfa14f7206d314ff581489836f74f7b678277..f50a2270ac84fe1073c5fae3aa46a4a6d6ddece9 100644 |
--- a/webrtc/base/physicalsocketserver.cc |
+++ b/webrtc/base/physicalsocketserver.cc |
@@ -37,6 +37,7 @@ |
#endif |
#include <algorithm> |
+#include <limits> |
#include <map> |
#include "webrtc/base/arraysize.h" |
@@ -81,6 +82,13 @@ int64_t GetSocketRecvTimestamp(int socket) { |
typedef char* SockOptArg; |
#endif |
+#if defined(WEBRTC_POSIX) && defined(WEBRTC_LINUX) |
+// EPOLLRDHUP is only defined starting with Linux 2.6.17. |
+#if !defined(EPOLLRDHUP) |
+#define EPOLLRDHUP 0x2000 |
+#endif |
+#endif |
+ |
namespace rtc { |
std::unique_ptr<SocketServer> SocketServer::CreateDefault() { |
@@ -134,7 +142,7 @@ PhysicalSocket::PhysicalSocket(PhysicalSocketServer* ss, SOCKET s) |
EnsureWinsockInit(); |
#endif |
if (s_ != INVALID_SOCKET) { |
- enabled_events_ = DE_READ | DE_WRITE; |
+ SetEnabledEvents(DE_READ | DE_WRITE); |
Taylor Brandstetter
2017/05/17 07:20:16
nit: The "SetEnabledEvents" etc. change seems like
joachim
2017/05/17 21:17:02
Moved to https://codereview.webrtc.org/2893723002/
|
int type = SOCK_STREAM; |
socklen_t len = sizeof(type); |
@@ -154,8 +162,9 @@ bool PhysicalSocket::Create(int family, int type) { |
s_ = ::socket(family, type, 0); |
udp_ = (SOCK_DGRAM == type); |
UpdateLastError(); |
- if (udp_) |
- enabled_events_ = DE_READ | DE_WRITE; |
+ if (udp_) { |
+ SetEnabledEvents(DE_READ | DE_WRITE); |
+ } |
return s_ != INVALID_SOCKET; |
} |
@@ -267,16 +276,17 @@ int PhysicalSocket::DoConnect(const SocketAddress& connect_addr) { |
sockaddr* addr = reinterpret_cast<sockaddr*>(&addr_storage); |
int err = ::connect(s_, addr, static_cast<int>(len)); |
UpdateLastError(); |
+ uint8_t events = DE_READ | DE_WRITE; |
if (err == 0) { |
state_ = CS_CONNECTED; |
} else if (IsBlockingError(GetError())) { |
state_ = CS_CONNECTING; |
- enabled_events_ |= DE_CONNECT; |
+ events |= DE_CONNECT; |
} else { |
return SOCKET_ERROR; |
} |
- enabled_events_ |= DE_READ | DE_WRITE; |
+ EnableEvents(events); |
return 0; |
} |
@@ -342,7 +352,7 @@ int PhysicalSocket::Send(const void* pv, size_t cb) { |
RTC_DCHECK(sent <= static_cast<int>(cb)); |
if ((sent > 0 && sent < static_cast<int>(cb)) || |
(sent < 0 && IsBlockingError(GetError()))) { |
- enabled_events_ |= DE_WRITE; |
+ EnableEvents(DE_WRITE); |
} |
return sent; |
} |
@@ -367,7 +377,7 @@ int PhysicalSocket::SendTo(const void* buffer, |
RTC_DCHECK(sent <= static_cast<int>(length)); |
if ((sent > 0 && sent < static_cast<int>(length)) || |
(sent < 0 && IsBlockingError(GetError()))) { |
- enabled_events_ |= DE_WRITE; |
+ EnableEvents(DE_WRITE); |
} |
return sent; |
} |
@@ -382,7 +392,7 @@ int PhysicalSocket::Recv(void* buffer, size_t length, int64_t* timestamp) { |
LOG(LS_WARNING) << "EOF from socket; deferring close event"; |
// Must turn this back on so that the select() loop will notice the close |
// event. |
- enabled_events_ |= DE_READ; |
+ EnableEvents(DE_READ); |
SetError(EWOULDBLOCK); |
return SOCKET_ERROR; |
} |
@@ -393,7 +403,7 @@ int PhysicalSocket::Recv(void* buffer, size_t length, int64_t* timestamp) { |
int error = GetError(); |
bool success = (received >= 0) || IsBlockingError(error); |
if (udp_ || success) { |
- enabled_events_ |= DE_READ; |
+ EnableEvents(DE_READ); |
} |
if (!success) { |
LOG_F(LS_VERBOSE) << "Error = " << error; |
@@ -419,7 +429,7 @@ int PhysicalSocket::RecvFrom(void* buffer, |
int error = GetError(); |
bool success = (received >= 0) || IsBlockingError(error); |
if (udp_ || success) { |
- enabled_events_ |= DE_READ; |
+ EnableEvents(DE_READ); |
} |
if (!success) { |
LOG_F(LS_VERBOSE) << "Error = " << error; |
@@ -432,7 +442,7 @@ int PhysicalSocket::Listen(int backlog) { |
UpdateLastError(); |
if (err == 0) { |
state_ = CS_CONNECTING; |
- enabled_events_ |= DE_ACCEPT; |
+ EnableEvents(DE_ACCEPT); |
#if !defined(NDEBUG) |
dbg_addr_ = "Listening @ "; |
dbg_addr_.append(GetLocalAddress().ToString()); |
@@ -444,7 +454,7 @@ int PhysicalSocket::Listen(int backlog) { |
AsyncSocket* PhysicalSocket::Accept(SocketAddress* out_addr) { |
// Always re-subscribe DE_ACCEPT to make sure new incoming connections will |
// trigger an event even if DoAccept returns an error here. |
- enabled_events_ |= DE_ACCEPT; |
+ EnableEvents(DE_ACCEPT); |
sockaddr_storage addr_storage; |
socklen_t addr_len = sizeof(addr_storage); |
sockaddr* addr = reinterpret_cast<sockaddr*>(&addr_storage); |
@@ -464,7 +474,7 @@ int PhysicalSocket::Close() { |
UpdateLastError(); |
s_ = INVALID_SOCKET; |
state_ = CS_CLOSED; |
- enabled_events_ = 0; |
+ SetEnabledEvents(0); |
if (resolver_) { |
resolver_->Destroy(false); |
resolver_ = nullptr; |
@@ -590,6 +600,18 @@ void PhysicalSocket::MaybeRemapSendError() { |
#endif |
} |
+void PhysicalSocket::SetEnabledEvents(uint8_t events) { |
+ enabled_events_ = events; |
+} |
+ |
+void PhysicalSocket::EnableEvents(uint8_t events) { |
+ enabled_events_ |= events; |
+} |
+ |
+void PhysicalSocket::DisableEvents(uint8_t events) { |
+ enabled_events_ &= ~events; |
+} |
+ |
int PhysicalSocket::TranslateOption(Option opt, int* slevel, int* sopt) { |
switch (opt) { |
case OPT_DONTFRAGMENT: |
@@ -788,7 +810,7 @@ void SocketDispatcher::OnEvent(uint32_t ff, int err) { |
if (((ff & DE_CONNECT) != 0) && (id_ == cache_id)) { |
if (ff != DE_CONNECT) |
LOG(LS_VERBOSE) << "Signalled with DE_CONNECT: " << ff; |
- enabled_events_ &= ~DE_CONNECT; |
+ DisableEvents(DE_CONNECT); |
#if !defined(NDEBUG) |
dbg_addr_ = "Connected @ "; |
dbg_addr_.append(GetRemoteAddress().ToString()); |
@@ -796,15 +818,15 @@ void SocketDispatcher::OnEvent(uint32_t ff, int err) { |
SignalConnectEvent(this); |
} |
if (((ff & DE_ACCEPT) != 0) && (id_ == cache_id)) { |
- enabled_events_ &= ~DE_ACCEPT; |
+ DisableEvents(DE_ACCEPT); |
SignalReadEvent(this); |
} |
if ((ff & DE_READ) != 0) { |
- enabled_events_ &= ~DE_READ; |
+ DisableEvents(DE_READ); |
SignalReadEvent(this); |
} |
if (((ff & DE_WRITE) != 0) && (id_ == cache_id)) { |
- enabled_events_ &= ~DE_WRITE; |
+ DisableEvents(DE_WRITE); |
SignalWriteEvent(this); |
} |
if (((ff & DE_CLOSE) != 0) && (id_ == cache_id)) { |
@@ -816,31 +838,78 @@ void SocketDispatcher::OnEvent(uint32_t ff, int err) { |
#elif defined(WEBRTC_POSIX) |
void SocketDispatcher::OnEvent(uint32_t ff, int err) { |
+#if defined(WEBRTC_LINUX) |
Taylor Brandstetter
2017/05/17 07:20:16
nit: It may be cleaner to do "#ifdef WEBRTC_LINUX
joachim
2017/05/17 21:17:02
Done (named it "WEBRTC_USE_EPOLL").
|
+ // Remember currently enabled events so we can combine multiple changes |
+ // into one update call later. |
+ PushEnabledEvents(); |
+#endif |
// Make sure we deliver connect/accept first. Otherwise, consumers may see |
// something like a READ followed by a CONNECT, which would be odd. |
if ((ff & DE_CONNECT) != 0) { |
- enabled_events_ &= ~DE_CONNECT; |
+ DisableEvents(DE_CONNECT); |
SignalConnectEvent(this); |
} |
if ((ff & DE_ACCEPT) != 0) { |
- enabled_events_ &= ~DE_ACCEPT; |
+ DisableEvents(DE_ACCEPT); |
SignalReadEvent(this); |
} |
if ((ff & DE_READ) != 0) { |
- enabled_events_ &= ~DE_READ; |
+ DisableEvents(DE_READ); |
SignalReadEvent(this); |
} |
if ((ff & DE_WRITE) != 0) { |
- enabled_events_ &= ~DE_WRITE; |
+ DisableEvents(DE_WRITE); |
SignalWriteEvent(this); |
} |
if ((ff & DE_CLOSE) != 0) { |
// The socket is now dead to us, so stop checking it. |
- enabled_events_ = 0; |
+ SetEnabledEvents(0); |
SignalCloseEvent(this, err); |
} |
+#if defined(WEBRTC_LINUX) |
+ PopEnabledEvents(); |
+#endif |
+} |
+ |
+#if defined(WEBRTC_LINUX) |
+ |
+void SocketDispatcher::PushEnabledEvents() { |
+ enabled_events_stack_.push_back(enabled_events_); |
Taylor Brandstetter
2017/05/17 07:20:16
Will the stack ever have a size > 1? If so, can yo
joachim
2017/05/17 21:17:02
Currently it will only be one element. I'll change
|
} |
+void SocketDispatcher::PopEnabledEvents() { |
+ RTC_DCHECK(!enabled_events_stack_.empty()); |
+ uint8_t old_events = enabled_events_stack_.back(); |
+ enabled_events_stack_.pop_back(); |
+ MaybeUpdateDispatcher(old_events); |
+} |
+ |
+void SocketDispatcher::MaybeUpdateDispatcher(uint8_t old_events) { |
+ if (enabled_events_ != old_events && enabled_events_stack_.empty()) { |
+ ss_->Update(this); |
+ } |
+} |
+ |
+void SocketDispatcher::SetEnabledEvents(uint8_t events) { |
+ uint8_t old_events = enabled_events_; |
+ PhysicalSocket::SetEnabledEvents(events); |
+ MaybeUpdateDispatcher(old_events); |
+} |
+ |
+void SocketDispatcher::EnableEvents(uint8_t events) { |
+ uint8_t old_events = enabled_events_; |
+ PhysicalSocket::EnableEvents(events); |
+ MaybeUpdateDispatcher(old_events); |
+} |
+ |
+void SocketDispatcher::DisableEvents(uint8_t events) { |
+ uint8_t old_events = enabled_events_; |
+ PhysicalSocket::DisableEvents(events); |
+ MaybeUpdateDispatcher(old_events); |
+} |
+ |
+#endif // WEBRTC_LINUX |
+ |
#endif // WEBRTC_POSIX |
int SocketDispatcher::Close() { |
@@ -1171,6 +1240,16 @@ class Signaler : public EventDispatcher { |
PhysicalSocketServer::PhysicalSocketServer() |
: fWait_(false) { |
+#if defined(WEBRTC_POSIX) && defined(WEBRTC_LINUX) |
+ // Since Linux 2.6.8, the size argument is ignored, but must be greater than |
+ // zero. Before that the size served as hint to the kernel for the amount of |
+ // space to initially allocate in internal data structures. |
+ epoll_fd_ = epoll_create(FD_SETSIZE); |
+ if (epoll_fd_ == -1) { |
+ LOG_E(LS_WARNING, EN, errno) << "epoll_create"; |
+ epoll_fd_ = INVALID_SOCKET; |
Taylor Brandstetter
2017/05/17 07:20:16
Could you leave a comment mentioning that the code
joachim
2017/05/17 21:17:02
Done.
|
+ } |
+#endif |
signal_wakeup_ = new Signaler(this, &fWait_); |
#if defined(WEBRTC_WIN) |
socket_ev_ = WSACreateEvent(); |
@@ -1185,6 +1264,11 @@ PhysicalSocketServer::~PhysicalSocketServer() { |
signal_dispatcher_.reset(); |
#endif |
delete signal_wakeup_; |
+#if defined(WEBRTC_POSIX) && defined(WEBRTC_LINUX) |
+ if (epoll_fd_ != INVALID_SOCKET) { |
+ close(epoll_fd_); |
+ } |
+#endif |
RTC_DCHECK(dispatchers_.empty()); |
} |
@@ -1232,6 +1316,7 @@ AsyncSocket* PhysicalSocketServer::WrapSocket(SOCKET s) { |
void PhysicalSocketServer::Add(Dispatcher *pdispatcher) { |
CritScope cs(&crit_); |
+#if defined(WEBRTC_WIN) |
// Prevent duplicates. This can cause dead dispatchers to stick around. |
DispatcherList::iterator pos = std::find(dispatchers_.begin(), |
dispatchers_.end(), |
@@ -1239,10 +1324,19 @@ void PhysicalSocketServer::Add(Dispatcher *pdispatcher) { |
if (pos != dispatchers_.end()) |
return; |
dispatchers_.push_back(pdispatcher); |
+#else |
Taylor Brandstetter
2017/05/17 07:20:16
Is there a reason the set can't be used on Windows
joachim
2017/05/17 21:17:02
The Windows code has some special handling to inva
|
+ dispatchers_.insert(pdispatcher); |
+#if defined(WEBRTC_POSIX) && defined(WEBRTC_LINUX) |
+ if (epoll_fd_ != INVALID_SOCKET) { |
+ AddEpoll(pdispatcher); |
+ } |
+#endif // WEBRTC_POSIX && WEBRTC_LINUX |
+#endif // WEBRTC_WIN |
} |
void PhysicalSocketServer::Remove(Dispatcher *pdispatcher) { |
CritScope cs(&crit_); |
+#if defined(WEBRTC_WIN) |
DispatcherList::iterator pos = std::find(dispatchers_.begin(), |
dispatchers_.end(), |
pdispatcher); |
@@ -1262,10 +1356,50 @@ void PhysicalSocketServer::Remove(Dispatcher *pdispatcher) { |
--**it; |
} |
} |
+#else |
+ if (!dispatchers_.erase(pdispatcher)) { |
+ LOG(LS_WARNING) << "PhysicalSocketServer asked to remove a unknown " |
+ << "dispatcher, potentially from a duplicate call to Add."; |
+ return; |
+ } |
+#if defined(WEBRTC_POSIX) && defined(WEBRTC_LINUX) |
+ if (epoll_fd_ != INVALID_SOCKET) { |
+ RemoveEpoll(pdispatcher); |
+ } |
+#endif // WEBRTC_POSIX && WEBRTC_LINUX |
+#endif // WEBRTC_WIN |
+} |
+ |
+void PhysicalSocketServer::Update(Dispatcher *pdispatcher) { |
Taylor Brandstetter
2017/05/17 07:20:16
It seems like the code would be more readable if t
joachim
2017/05/17 21:17:02
The call site for this is the SocketDispatcher. I
Taylor Brandstetter
2017/05/17 23:19:02
I see now; in that case this is fine.
|
+#if defined(WEBRTC_POSIX) && defined(WEBRTC_LINUX) |
+ if (epoll_fd_ == INVALID_SOCKET) { |
+ return; |
+ } |
+ |
+ CritScope cs(&crit_); |
+ if (dispatchers_.find(pdispatcher) == dispatchers_.end()) { |
+ return; |
+ } |
+ |
+ UpdateEpoll(pdispatcher); |
+#endif |
} |
#if defined(WEBRTC_POSIX) |
+ |
bool PhysicalSocketServer::Wait(int cmsWait, bool process_io) { |
+#if defined(WEBRTC_LINUX) |
+ // We don't keep a dedicated epoll descriptor containing only non-IO (i.e. |
+ // signaling) dispatchers, so the default "select" loop will be used in that |
+ // case. |
+ if (epoll_fd_ != INVALID_SOCKET && process_io) { |
+ return WaitEpoll(cmsWait); |
+ } |
+#endif |
+ return WaitSelect(cmsWait, process_io); |
+} |
+ |
+bool PhysicalSocketServer::WaitSelect(int cmsWait, bool process_io) { |
// Calculate timing information |
struct timeval* ptvWait = nullptr; |
@@ -1308,13 +1442,15 @@ bool PhysicalSocketServer::Wait(int cmsWait, bool process_io) { |
int fdmax = -1; |
{ |
CritScope cr(&crit_); |
- for (size_t i = 0; i < dispatchers_.size(); ++i) { |
+ for (Dispatcher* pdispatcher : dispatchers_) { |
// Query dispatchers for read and write wait state |
- Dispatcher *pdispatcher = dispatchers_[i]; |
RTC_DCHECK(pdispatcher); |
if (!process_io && (pdispatcher != signal_wakeup_)) |
continue; |
int fd = pdispatcher->GetDescriptor(); |
+ // "select"ing a file descriptor that is equal to or larger than |
+ // FD_SETSIZE will result in undefined behavior. |
+ RTC_DCHECK_LT(fd, FD_SETSIZE); |
if (fd > fdmax) |
fdmax = fd; |
@@ -1348,8 +1484,7 @@ bool PhysicalSocketServer::Wait(int cmsWait, bool process_io) { |
} else { |
// We have signaled descriptors |
CritScope cr(&crit_); |
- for (size_t i = 0; i < dispatchers_.size(); ++i) { |
- Dispatcher *pdispatcher = dispatchers_[i]; |
+ for (Dispatcher *pdispatcher : dispatchers_) { |
int fd = pdispatcher->GetDescriptor(); |
uint32_t ff = 0; |
int errcode = 0; |
@@ -1423,6 +1558,197 @@ bool PhysicalSocketServer::Wait(int cmsWait, bool process_io) { |
return true; |
} |
+#if defined(WEBRTC_LINUX) |
+ |
+// Initial number of events to process with one call to "epoll_wait". |
+static const size_t kInitialEpollEvents = 128; |
+ |
+// Maximum number of events to process with one call to "epoll_wait". |
+static const size_t kMaxEpollEvents = 8192; |
+ |
+static int GetEpollEvents(Dispatcher* pdispatcher) { |
+ int events = 0; |
+ uint32_t ff = pdispatcher->GetRequestedEvents(); |
+ if (ff & (DE_READ | DE_ACCEPT)) { |
+ events |= EPOLLIN; |
+ } |
+ if (ff & (DE_WRITE | DE_CONNECT)) { |
+ events |= EPOLLOUT; |
+ } |
+ return events; |
+} |
+ |
+void PhysicalSocketServer::AddEpoll(Dispatcher* pdispatcher) { |
+ RTC_DCHECK(epoll_fd_ != INVALID_SOCKET); |
+ int fd = pdispatcher->GetDescriptor(); |
+ RTC_DCHECK(fd != INVALID_SOCKET); |
+ if (fd == INVALID_SOCKET) { |
+ return; |
+ } |
+ |
+ struct epoll_event event = {0}; |
+ event.events = GetEpollEvents(pdispatcher); |
+ event.data.ptr = pdispatcher; |
+ int err = epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, fd, &event); |
+ RTC_DCHECK_EQ(err, 0); |
+ if (err == -1) { |
+ LOG_E(LS_ERROR, EN, errno) << "epoll_ctl EPOLL_CTL_ADD"; |
+ } |
+} |
+ |
+void PhysicalSocketServer::RemoveEpoll(Dispatcher* pdispatcher) { |
+ RTC_DCHECK(epoll_fd_ != INVALID_SOCKET); |
+ int fd = pdispatcher->GetDescriptor(); |
+ RTC_DCHECK(fd != INVALID_SOCKET); |
+ if (fd == INVALID_SOCKET) { |
+ return; |
+ } |
+ |
+ struct epoll_event event = {0}; |
+ int err = epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, fd, &event); |
+ RTC_DCHECK(err == 0 || errno == ENOENT); |
+ if (err == -1) { |
+ if (errno == ENOENT) { |
+ // Socket has already been closed. |
+ LOG_E(LS_VERBOSE, EN, errno) << "epoll_ctl EPOLL_CTL_DEL"; |
+ } else { |
+ LOG_E(LS_ERROR, EN, errno) << "epoll_ctl EPOLL_CTL_DEL"; |
+ } |
+ } |
+} |
+ |
+void PhysicalSocketServer::UpdateEpoll(Dispatcher* pdispatcher) { |
+ RTC_DCHECK(epoll_fd_ != INVALID_SOCKET); |
+ int fd = pdispatcher->GetDescriptor(); |
+ RTC_DCHECK(fd != INVALID_SOCKET); |
+ if (fd == INVALID_SOCKET) { |
+ return; |
+ } |
+ |
+ struct epoll_event event = {0}; |
+ event.events = GetEpollEvents(pdispatcher); |
+ event.data.ptr = pdispatcher; |
+ int err = epoll_ctl(epoll_fd_, EPOLL_CTL_MOD, fd, &event); |
+ RTC_DCHECK_EQ(err, 0); |
+ if (err == -1) { |
+ LOG_E(LS_ERROR, EN, errno) << "epoll_ctl EPOLL_CTL_MOD"; |
+ } |
+} |
+ |
+bool PhysicalSocketServer::WaitEpoll(int cmsWait) { |
+ RTC_DCHECK(epoll_fd_ != INVALID_SOCKET); |
+ int64_t tvWait = -1; |
+ int64_t tvStop = -1; |
+ if (cmsWait != kForever) { |
+ tvWait = cmsWait; |
+ tvStop = TimeAfter(cmsWait); |
+ } |
+ |
+ if (epoll_events_.empty()) { |
+ // The initial space to receive events is created only if epoll is used. |
+ epoll_events_.resize(kInitialEpollEvents); |
+ } |
+ |
+ fWait_ = true; |
+ |
+ while (fWait_) { |
+ // Wait then call handlers as appropriate |
+ // < 0 means error |
+ // 0 means timeout |
+ // > 0 means count of descriptors ready |
+ int n = epoll_wait(epoll_fd_, &epoll_events_[0], |
+ static_cast<int>(epoll_events_.size()), static_cast<int>(tvWait)); |
+ if (n < 0) { |
+ if (errno != EINTR) { |
+ LOG_E(LS_ERROR, EN, errno) << "epoll"; |
+ return false; |
+ } |
+ // Else ignore the error and keep going. If this EINTR was for one of the |
+ // signals managed by this PhysicalSocketServer, the |
+ // PosixSignalDeliveryDispatcher will be in the signaled state in the next |
+ // iteration. |
+ } else if (n == 0) { |
+ // If timeout, return success |
+ return true; |
+ } else { |
+ // We have signaled descriptors |
+ CritScope cr(&crit_); |
+ for (int i = 0; i < n; ++i) { |
+ const epoll_event& event = epoll_events_[i]; |
+ Dispatcher* pdispatcher = static_cast<Dispatcher*>(event.data.ptr); |
+ if (dispatchers_.find(pdispatcher) == dispatchers_.end()) { |
+ // The dispatcher for this socket no longer exists. |
+ continue; |
+ } |
+ |
+ int fd = pdispatcher->GetDescriptor(); |
+ uint32_t ff = 0; |
+ int errcode = 0; |
+ |
+ // Reap any error code. |
+ // TODO(pthatcher): Should we set errcode if getsockopt fails? |
+ if (event.events & (EPOLLRDHUP | EPOLLERR | EPOLLHUP)) { |
+ socklen_t len = sizeof(errcode); |
+ ::getsockopt(fd, SOL_SOCKET, SO_ERROR, &errcode, &len); |
+ } |
+ |
+ // Check readable descriptors. If we're waiting on an accept, signal |
+ // that. Otherwise we're waiting for data, check to see if we're |
+ // readable or really closed. |
+ // TODO(pthatcher): Only peek at TCP descriptors. |
+ if ((event.events & (EPOLLIN | EPOLLPRI))) { |
+ if (pdispatcher->GetRequestedEvents() & DE_ACCEPT) { |
+ ff |= DE_ACCEPT; |
+ } else if (errcode || pdispatcher->IsDescriptorClosed()) { |
+ ff |= DE_CLOSE; |
+ } else { |
+ ff |= DE_READ; |
+ } |
+ } |
+ |
+ // Check writable descriptors. If we're waiting on a connect, detect |
+ // success versus failure by the reaped error code. |
+ if ((event.events & EPOLLOUT)) { |
+ if (pdispatcher->GetRequestedEvents() & DE_CONNECT) { |
+ if (!errcode) { |
+ ff |= DE_CONNECT; |
+ } else { |
+ ff |= DE_CLOSE; |
+ } |
+ } else { |
+ ff |= DE_WRITE; |
+ } |
+ } |
+ |
+ // Tell the descriptor about the event. |
+ if (ff != 0) { |
+ pdispatcher->OnPreEvent(ff); |
+ pdispatcher->OnEvent(ff, errcode); |
+ } |
+ } |
+ } |
+ |
+ if (static_cast<size_t>(n) == epoll_events_.size() && |
+ epoll_events_.size() < kMaxEpollEvents) { |
+ // We used the complete space to receive events, increase size for future |
+ // iterations. |
+ epoll_events_.resize(std::max(epoll_events_.size() * 2, kMaxEpollEvents)); |
+ } |
+ |
+ if (cmsWait != kForever) { |
+ tvWait = TimeDiff(tvStop, TimeMillis()); |
+ if (tvWait < 0) { |
+ // Return success on timeout. |
+ return true; |
+ } |
+ } |
+ } |
+ |
+ return true; |
+} |
+ |
+#endif // WEBRTC_LINUX |
+ |
static void GlobalSignalHandler(int signum) { |
PosixSignalHandler::Instance()->OnPosixSignalReceived(signum); |
} |