Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(201)

Side by Side Diff: webrtc/base/physicalsocketserver.cc

Issue 2880923002: Support epoll in PhysicalSocketServer. (Closed)
Patch Set: Fix for undefined EPOLLRDHUP. Created 3 years, 7 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « webrtc/base/physicalsocketserver.h ('k') | no next file » | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 /* 1 /*
2 * Copyright 2004 The WebRTC Project Authors. All rights reserved. 2 * Copyright 2004 The WebRTC Project Authors. All rights reserved.
3 * 3 *
4 * Use of this source code is governed by a BSD-style license 4 * Use of this source code is governed by a BSD-style license
5 * that can be found in the LICENSE file in the root of the source 5 * that can be found in the LICENSE file in the root of the source
6 * tree. An additional intellectual property rights grant can be found 6 * tree. An additional intellectual property rights grant can be found
7 * in the file PATENTS. All contributing project authors may 7 * in the file PATENTS. All contributing project authors may
8 * be found in the AUTHORS file in the root of the source tree. 8 * be found in the AUTHORS file in the root of the source tree.
9 */ 9 */
10 #include "webrtc/base/physicalsocketserver.h" 10 #include "webrtc/base/physicalsocketserver.h"
(...skipping 19 matching lines...) Expand all
30 30
31 #if defined(WEBRTC_WIN) 31 #if defined(WEBRTC_WIN)
32 #define WIN32_LEAN_AND_MEAN 32 #define WIN32_LEAN_AND_MEAN
33 #include <windows.h> 33 #include <windows.h>
34 #include <winsock2.h> 34 #include <winsock2.h>
35 #include <ws2tcpip.h> 35 #include <ws2tcpip.h>
36 #undef SetPort 36 #undef SetPort
37 #endif 37 #endif
38 38
39 #include <algorithm> 39 #include <algorithm>
40 #include <limits>
40 #include <map> 41 #include <map>
41 42
42 #include "webrtc/base/arraysize.h" 43 #include "webrtc/base/arraysize.h"
43 #include "webrtc/base/basictypes.h" 44 #include "webrtc/base/basictypes.h"
44 #include "webrtc/base/byteorder.h" 45 #include "webrtc/base/byteorder.h"
45 #include "webrtc/base/checks.h" 46 #include "webrtc/base/checks.h"
46 #include "webrtc/base/logging.h" 47 #include "webrtc/base/logging.h"
47 #include "webrtc/base/networkmonitor.h" 48 #include "webrtc/base/networkmonitor.h"
48 #include "webrtc/base/nullsocketserver.h" 49 #include "webrtc/base/nullsocketserver.h"
49 #include "webrtc/base/timeutils.h" 50 #include "webrtc/base/timeutils.h"
(...skipping 24 matching lines...) Expand all
74 75
75 int64_t GetSocketRecvTimestamp(int socket) { 76 int64_t GetSocketRecvTimestamp(int socket) {
76 return -1; 77 return -1;
77 } 78 }
78 #endif 79 #endif
79 80
80 #if defined(WEBRTC_WIN) 81 #if defined(WEBRTC_WIN)
81 typedef char* SockOptArg; 82 typedef char* SockOptArg;
82 #endif 83 #endif
83 84
85 #if defined(WEBRTC_POSIX) && defined(WEBRTC_LINUX)
86 // EPOLLRDHUP is only defined starting with Linux 2.6.17.
87 #if !defined(EPOLLRDHUP)
88 #define EPOLLRDHUP 0x2000
89 #endif
90 #endif
91
84 namespace rtc { 92 namespace rtc {
85 93
86 std::unique_ptr<SocketServer> SocketServer::CreateDefault() { 94 std::unique_ptr<SocketServer> SocketServer::CreateDefault() {
87 #if defined(__native_client__) 95 #if defined(__native_client__)
88 return std::unique_ptr<SocketServer>(new rtc::NullSocketServer); 96 return std::unique_ptr<SocketServer>(new rtc::NullSocketServer);
89 #else 97 #else
90 return std::unique_ptr<SocketServer>(new rtc::PhysicalSocketServer); 98 return std::unique_ptr<SocketServer>(new rtc::PhysicalSocketServer);
91 #endif 99 #endif
92 } 100 }
93 101
(...skipping 33 matching lines...) Expand 10 before | Expand all | Expand 10 after
127 resolver_(nullptr) { 135 resolver_(nullptr) {
128 #if defined(WEBRTC_WIN) 136 #if defined(WEBRTC_WIN)
129 // EnsureWinsockInit() ensures that winsock is initialized. The default 137 // EnsureWinsockInit() ensures that winsock is initialized. The default
130 // version of this function doesn't do anything because winsock is 138 // version of this function doesn't do anything because winsock is
131 // initialized by constructor of a static object. If neccessary libjingle 139 // initialized by constructor of a static object. If neccessary libjingle
132 // users can link it with a different version of this function by replacing 140 // users can link it with a different version of this function by replacing
133 // win32socketinit.cc. See win32socketinit.cc for more details. 141 // win32socketinit.cc. See win32socketinit.cc for more details.
134 EnsureWinsockInit(); 142 EnsureWinsockInit();
135 #endif 143 #endif
136 if (s_ != INVALID_SOCKET) { 144 if (s_ != INVALID_SOCKET) {
137 enabled_events_ = DE_READ | DE_WRITE; 145 SetEnabledEvents(DE_READ | DE_WRITE);
Taylor Brandstetter 2017/05/17 07:20:16 nit: The "SetEnabledEvents" etc. change seems like
joachim 2017/05/17 21:17:02 Moved to https://codereview.webrtc.org/2893723002/
138 146
139 int type = SOCK_STREAM; 147 int type = SOCK_STREAM;
140 socklen_t len = sizeof(type); 148 socklen_t len = sizeof(type);
141 const int res = 149 const int res =
142 getsockopt(s_, SOL_SOCKET, SO_TYPE, (SockOptArg)&type, &len); 150 getsockopt(s_, SOL_SOCKET, SO_TYPE, (SockOptArg)&type, &len);
143 RTC_DCHECK_EQ(0, res); 151 RTC_DCHECK_EQ(0, res);
144 udp_ = (SOCK_DGRAM == type); 152 udp_ = (SOCK_DGRAM == type);
145 } 153 }
146 } 154 }
147 155
148 PhysicalSocket::~PhysicalSocket() { 156 PhysicalSocket::~PhysicalSocket() {
149 Close(); 157 Close();
150 } 158 }
151 159
152 bool PhysicalSocket::Create(int family, int type) { 160 bool PhysicalSocket::Create(int family, int type) {
153 Close(); 161 Close();
154 s_ = ::socket(family, type, 0); 162 s_ = ::socket(family, type, 0);
155 udp_ = (SOCK_DGRAM == type); 163 udp_ = (SOCK_DGRAM == type);
156 UpdateLastError(); 164 UpdateLastError();
157 if (udp_) 165 if (udp_) {
158 enabled_events_ = DE_READ | DE_WRITE; 166 SetEnabledEvents(DE_READ | DE_WRITE);
167 }
159 return s_ != INVALID_SOCKET; 168 return s_ != INVALID_SOCKET;
160 } 169 }
161 170
162 SocketAddress PhysicalSocket::GetLocalAddress() const { 171 SocketAddress PhysicalSocket::GetLocalAddress() const {
163 sockaddr_storage addr_storage = {0}; 172 sockaddr_storage addr_storage = {0};
164 socklen_t addrlen = sizeof(addr_storage); 173 socklen_t addrlen = sizeof(addr_storage);
165 sockaddr* addr = reinterpret_cast<sockaddr*>(&addr_storage); 174 sockaddr* addr = reinterpret_cast<sockaddr*>(&addr_storage);
166 int result = ::getsockname(s_, addr, &addrlen); 175 int result = ::getsockname(s_, addr, &addrlen);
167 SocketAddress address; 176 SocketAddress address;
168 if (result >= 0) { 177 if (result >= 0) {
(...skipping 91 matching lines...) Expand 10 before | Expand all | Expand 10 after
260 int PhysicalSocket::DoConnect(const SocketAddress& connect_addr) { 269 int PhysicalSocket::DoConnect(const SocketAddress& connect_addr) {
261 if ((s_ == INVALID_SOCKET) && 270 if ((s_ == INVALID_SOCKET) &&
262 !Create(connect_addr.family(), SOCK_STREAM)) { 271 !Create(connect_addr.family(), SOCK_STREAM)) {
263 return SOCKET_ERROR; 272 return SOCKET_ERROR;
264 } 273 }
265 sockaddr_storage addr_storage; 274 sockaddr_storage addr_storage;
266 size_t len = connect_addr.ToSockAddrStorage(&addr_storage); 275 size_t len = connect_addr.ToSockAddrStorage(&addr_storage);
267 sockaddr* addr = reinterpret_cast<sockaddr*>(&addr_storage); 276 sockaddr* addr = reinterpret_cast<sockaddr*>(&addr_storage);
268 int err = ::connect(s_, addr, static_cast<int>(len)); 277 int err = ::connect(s_, addr, static_cast<int>(len));
269 UpdateLastError(); 278 UpdateLastError();
279 uint8_t events = DE_READ | DE_WRITE;
270 if (err == 0) { 280 if (err == 0) {
271 state_ = CS_CONNECTED; 281 state_ = CS_CONNECTED;
272 } else if (IsBlockingError(GetError())) { 282 } else if (IsBlockingError(GetError())) {
273 state_ = CS_CONNECTING; 283 state_ = CS_CONNECTING;
274 enabled_events_ |= DE_CONNECT; 284 events |= DE_CONNECT;
275 } else { 285 } else {
276 return SOCKET_ERROR; 286 return SOCKET_ERROR;
277 } 287 }
278 288
279 enabled_events_ |= DE_READ | DE_WRITE; 289 EnableEvents(events);
280 return 0; 290 return 0;
281 } 291 }
282 292
283 int PhysicalSocket::GetError() const { 293 int PhysicalSocket::GetError() const {
284 CritScope cs(&crit_); 294 CritScope cs(&crit_);
285 return error_; 295 return error_;
286 } 296 }
287 297
288 void PhysicalSocket::SetError(int error) { 298 void PhysicalSocket::SetError(int error) {
289 CritScope cs(&crit_); 299 CritScope cs(&crit_);
(...skipping 45 matching lines...) Expand 10 before | Expand all | Expand 10 after
335 #else 345 #else
336 0 346 0
337 #endif 347 #endif
338 ); 348 );
339 UpdateLastError(); 349 UpdateLastError();
340 MaybeRemapSendError(); 350 MaybeRemapSendError();
341 // We have seen minidumps where this may be false. 351 // We have seen minidumps where this may be false.
342 RTC_DCHECK(sent <= static_cast<int>(cb)); 352 RTC_DCHECK(sent <= static_cast<int>(cb));
343 if ((sent > 0 && sent < static_cast<int>(cb)) || 353 if ((sent > 0 && sent < static_cast<int>(cb)) ||
344 (sent < 0 && IsBlockingError(GetError()))) { 354 (sent < 0 && IsBlockingError(GetError()))) {
345 enabled_events_ |= DE_WRITE; 355 EnableEvents(DE_WRITE);
346 } 356 }
347 return sent; 357 return sent;
348 } 358 }
349 359
350 int PhysicalSocket::SendTo(const void* buffer, 360 int PhysicalSocket::SendTo(const void* buffer,
351 size_t length, 361 size_t length,
352 const SocketAddress& addr) { 362 const SocketAddress& addr) {
353 sockaddr_storage saddr; 363 sockaddr_storage saddr;
354 size_t len = addr.ToSockAddrStorage(&saddr); 364 size_t len = addr.ToSockAddrStorage(&saddr);
355 int sent = DoSendTo( 365 int sent = DoSendTo(
356 s_, static_cast<const char *>(buffer), static_cast<int>(length), 366 s_, static_cast<const char *>(buffer), static_cast<int>(length),
357 #if defined(WEBRTC_LINUX) && !defined(WEBRTC_ANDROID) 367 #if defined(WEBRTC_LINUX) && !defined(WEBRTC_ANDROID)
358 // Suppress SIGPIPE. See above for explanation. 368 // Suppress SIGPIPE. See above for explanation.
359 MSG_NOSIGNAL, 369 MSG_NOSIGNAL,
360 #else 370 #else
361 0, 371 0,
362 #endif 372 #endif
363 reinterpret_cast<sockaddr*>(&saddr), static_cast<int>(len)); 373 reinterpret_cast<sockaddr*>(&saddr), static_cast<int>(len));
364 UpdateLastError(); 374 UpdateLastError();
365 MaybeRemapSendError(); 375 MaybeRemapSendError();
366 // We have seen minidumps where this may be false. 376 // We have seen minidumps where this may be false.
367 RTC_DCHECK(sent <= static_cast<int>(length)); 377 RTC_DCHECK(sent <= static_cast<int>(length));
368 if ((sent > 0 && sent < static_cast<int>(length)) || 378 if ((sent > 0 && sent < static_cast<int>(length)) ||
369 (sent < 0 && IsBlockingError(GetError()))) { 379 (sent < 0 && IsBlockingError(GetError()))) {
370 enabled_events_ |= DE_WRITE; 380 EnableEvents(DE_WRITE);
371 } 381 }
372 return sent; 382 return sent;
373 } 383 }
374 384
375 int PhysicalSocket::Recv(void* buffer, size_t length, int64_t* timestamp) { 385 int PhysicalSocket::Recv(void* buffer, size_t length, int64_t* timestamp) {
376 int received = ::recv(s_, static_cast<char*>(buffer), 386 int received = ::recv(s_, static_cast<char*>(buffer),
377 static_cast<int>(length), 0); 387 static_cast<int>(length), 0);
378 if ((received == 0) && (length != 0)) { 388 if ((received == 0) && (length != 0)) {
379 // Note: on graceful shutdown, recv can return 0. In this case, we 389 // Note: on graceful shutdown, recv can return 0. In this case, we
380 // pretend it is blocking, and then signal close, so that simplifying 390 // pretend it is blocking, and then signal close, so that simplifying
381 // assumptions can be made about Recv. 391 // assumptions can be made about Recv.
382 LOG(LS_WARNING) << "EOF from socket; deferring close event"; 392 LOG(LS_WARNING) << "EOF from socket; deferring close event";
383 // Must turn this back on so that the select() loop will notice the close 393 // Must turn this back on so that the select() loop will notice the close
384 // event. 394 // event.
385 enabled_events_ |= DE_READ; 395 EnableEvents(DE_READ);
386 SetError(EWOULDBLOCK); 396 SetError(EWOULDBLOCK);
387 return SOCKET_ERROR; 397 return SOCKET_ERROR;
388 } 398 }
389 if (timestamp) { 399 if (timestamp) {
390 *timestamp = GetSocketRecvTimestamp(s_); 400 *timestamp = GetSocketRecvTimestamp(s_);
391 } 401 }
392 UpdateLastError(); 402 UpdateLastError();
393 int error = GetError(); 403 int error = GetError();
394 bool success = (received >= 0) || IsBlockingError(error); 404 bool success = (received >= 0) || IsBlockingError(error);
395 if (udp_ || success) { 405 if (udp_ || success) {
396 enabled_events_ |= DE_READ; 406 EnableEvents(DE_READ);
397 } 407 }
398 if (!success) { 408 if (!success) {
399 LOG_F(LS_VERBOSE) << "Error = " << error; 409 LOG_F(LS_VERBOSE) << "Error = " << error;
400 } 410 }
401 return received; 411 return received;
402 } 412 }
403 413
404 int PhysicalSocket::RecvFrom(void* buffer, 414 int PhysicalSocket::RecvFrom(void* buffer,
405 size_t length, 415 size_t length,
406 SocketAddress* out_addr, 416 SocketAddress* out_addr,
407 int64_t* timestamp) { 417 int64_t* timestamp) {
408 sockaddr_storage addr_storage; 418 sockaddr_storage addr_storage;
409 socklen_t addr_len = sizeof(addr_storage); 419 socklen_t addr_len = sizeof(addr_storage);
410 sockaddr* addr = reinterpret_cast<sockaddr*>(&addr_storage); 420 sockaddr* addr = reinterpret_cast<sockaddr*>(&addr_storage);
411 int received = ::recvfrom(s_, static_cast<char*>(buffer), 421 int received = ::recvfrom(s_, static_cast<char*>(buffer),
412 static_cast<int>(length), 0, addr, &addr_len); 422 static_cast<int>(length), 0, addr, &addr_len);
413 if (timestamp) { 423 if (timestamp) {
414 *timestamp = GetSocketRecvTimestamp(s_); 424 *timestamp = GetSocketRecvTimestamp(s_);
415 } 425 }
416 UpdateLastError(); 426 UpdateLastError();
417 if ((received >= 0) && (out_addr != nullptr)) 427 if ((received >= 0) && (out_addr != nullptr))
418 SocketAddressFromSockAddrStorage(addr_storage, out_addr); 428 SocketAddressFromSockAddrStorage(addr_storage, out_addr);
419 int error = GetError(); 429 int error = GetError();
420 bool success = (received >= 0) || IsBlockingError(error); 430 bool success = (received >= 0) || IsBlockingError(error);
421 if (udp_ || success) { 431 if (udp_ || success) {
422 enabled_events_ |= DE_READ; 432 EnableEvents(DE_READ);
423 } 433 }
424 if (!success) { 434 if (!success) {
425 LOG_F(LS_VERBOSE) << "Error = " << error; 435 LOG_F(LS_VERBOSE) << "Error = " << error;
426 } 436 }
427 return received; 437 return received;
428 } 438 }
429 439
430 int PhysicalSocket::Listen(int backlog) { 440 int PhysicalSocket::Listen(int backlog) {
431 int err = ::listen(s_, backlog); 441 int err = ::listen(s_, backlog);
432 UpdateLastError(); 442 UpdateLastError();
433 if (err == 0) { 443 if (err == 0) {
434 state_ = CS_CONNECTING; 444 state_ = CS_CONNECTING;
435 enabled_events_ |= DE_ACCEPT; 445 EnableEvents(DE_ACCEPT);
436 #if !defined(NDEBUG) 446 #if !defined(NDEBUG)
437 dbg_addr_ = "Listening @ "; 447 dbg_addr_ = "Listening @ ";
438 dbg_addr_.append(GetLocalAddress().ToString()); 448 dbg_addr_.append(GetLocalAddress().ToString());
439 #endif 449 #endif
440 } 450 }
441 return err; 451 return err;
442 } 452 }
443 453
444 AsyncSocket* PhysicalSocket::Accept(SocketAddress* out_addr) { 454 AsyncSocket* PhysicalSocket::Accept(SocketAddress* out_addr) {
445 // Always re-subscribe DE_ACCEPT to make sure new incoming connections will 455 // Always re-subscribe DE_ACCEPT to make sure new incoming connections will
446 // trigger an event even if DoAccept returns an error here. 456 // trigger an event even if DoAccept returns an error here.
447 enabled_events_ |= DE_ACCEPT; 457 EnableEvents(DE_ACCEPT);
448 sockaddr_storage addr_storage; 458 sockaddr_storage addr_storage;
449 socklen_t addr_len = sizeof(addr_storage); 459 socklen_t addr_len = sizeof(addr_storage);
450 sockaddr* addr = reinterpret_cast<sockaddr*>(&addr_storage); 460 sockaddr* addr = reinterpret_cast<sockaddr*>(&addr_storage);
451 SOCKET s = DoAccept(s_, addr, &addr_len); 461 SOCKET s = DoAccept(s_, addr, &addr_len);
452 UpdateLastError(); 462 UpdateLastError();
453 if (s == INVALID_SOCKET) 463 if (s == INVALID_SOCKET)
454 return nullptr; 464 return nullptr;
455 if (out_addr != nullptr) 465 if (out_addr != nullptr)
456 SocketAddressFromSockAddrStorage(addr_storage, out_addr); 466 SocketAddressFromSockAddrStorage(addr_storage, out_addr);
457 return ss_->WrapSocket(s); 467 return ss_->WrapSocket(s);
458 } 468 }
459 469
460 int PhysicalSocket::Close() { 470 int PhysicalSocket::Close() {
461 if (s_ == INVALID_SOCKET) 471 if (s_ == INVALID_SOCKET)
462 return 0; 472 return 0;
463 int err = ::closesocket(s_); 473 int err = ::closesocket(s_);
464 UpdateLastError(); 474 UpdateLastError();
465 s_ = INVALID_SOCKET; 475 s_ = INVALID_SOCKET;
466 state_ = CS_CLOSED; 476 state_ = CS_CLOSED;
467 enabled_events_ = 0; 477 SetEnabledEvents(0);
468 if (resolver_) { 478 if (resolver_) {
469 resolver_->Destroy(false); 479 resolver_->Destroy(false);
470 resolver_ = nullptr; 480 resolver_ = nullptr;
471 } 481 }
472 return err; 482 return err;
473 } 483 }
474 484
475 int PhysicalSocket::EstimateMTU(uint16_t* mtu) { 485 int PhysicalSocket::EstimateMTU(uint16_t* mtu) {
476 SocketAddress addr = GetRemoteAddress(); 486 SocketAddress addr = GetRemoteAddress();
477 if (addr.IsAnyIP()) { 487 if (addr.IsAnyIP()) {
(...skipping 105 matching lines...) Expand 10 before | Expand all | Expand 10 after
583 // Reference/ManPages/man2/sendto.2.html 593 // Reference/ManPages/man2/sendto.2.html
584 // ENOBUFS - The output queue for a network interface is full. 594 // ENOBUFS - The output queue for a network interface is full.
585 // This generally indicates that the interface has stopped sending, 595 // This generally indicates that the interface has stopped sending,
586 // but may be caused by transient congestion. 596 // but may be caused by transient congestion.
587 if (GetError() == ENOBUFS) { 597 if (GetError() == ENOBUFS) {
588 SetError(EWOULDBLOCK); 598 SetError(EWOULDBLOCK);
589 } 599 }
590 #endif 600 #endif
591 } 601 }
592 602
603 void PhysicalSocket::SetEnabledEvents(uint8_t events) {
604 enabled_events_ = events;
605 }
606
607 void PhysicalSocket::EnableEvents(uint8_t events) {
608 enabled_events_ |= events;
609 }
610
611 void PhysicalSocket::DisableEvents(uint8_t events) {
612 enabled_events_ &= ~events;
613 }
614
593 int PhysicalSocket::TranslateOption(Option opt, int* slevel, int* sopt) { 615 int PhysicalSocket::TranslateOption(Option opt, int* slevel, int* sopt) {
594 switch (opt) { 616 switch (opt) {
595 case OPT_DONTFRAGMENT: 617 case OPT_DONTFRAGMENT:
596 #if defined(WEBRTC_WIN) 618 #if defined(WEBRTC_WIN)
597 *slevel = IPPROTO_IP; 619 *slevel = IPPROTO_IP;
598 *sopt = IP_DONTFRAGMENT; 620 *sopt = IP_DONTFRAGMENT;
599 break; 621 break;
600 #elif defined(WEBRTC_MAC) || defined(BSD) || defined(__native_client__) 622 #elif defined(WEBRTC_MAC) || defined(BSD) || defined(__native_client__)
601 LOG(LS_WARNING) << "Socket::OPT_DONTFRAGMENT not supported."; 623 LOG(LS_WARNING) << "Socket::OPT_DONTFRAGMENT not supported.";
602 return -1; 624 return -1;
(...skipping 178 matching lines...) Expand 10 before | Expand all | Expand 10 after
781 803
782 #if defined(WEBRTC_WIN) 804 #if defined(WEBRTC_WIN)
783 805
784 void SocketDispatcher::OnEvent(uint32_t ff, int err) { 806 void SocketDispatcher::OnEvent(uint32_t ff, int err) {
785 int cache_id = id_; 807 int cache_id = id_;
786 // Make sure we deliver connect/accept first. Otherwise, consumers may see 808 // Make sure we deliver connect/accept first. Otherwise, consumers may see
787 // something like a READ followed by a CONNECT, which would be odd. 809 // something like a READ followed by a CONNECT, which would be odd.
788 if (((ff & DE_CONNECT) != 0) && (id_ == cache_id)) { 810 if (((ff & DE_CONNECT) != 0) && (id_ == cache_id)) {
789 if (ff != DE_CONNECT) 811 if (ff != DE_CONNECT)
790 LOG(LS_VERBOSE) << "Signalled with DE_CONNECT: " << ff; 812 LOG(LS_VERBOSE) << "Signalled with DE_CONNECT: " << ff;
791 enabled_events_ &= ~DE_CONNECT; 813 DisableEvents(DE_CONNECT);
792 #if !defined(NDEBUG) 814 #if !defined(NDEBUG)
793 dbg_addr_ = "Connected @ "; 815 dbg_addr_ = "Connected @ ";
794 dbg_addr_.append(GetRemoteAddress().ToString()); 816 dbg_addr_.append(GetRemoteAddress().ToString());
795 #endif 817 #endif
796 SignalConnectEvent(this); 818 SignalConnectEvent(this);
797 } 819 }
798 if (((ff & DE_ACCEPT) != 0) && (id_ == cache_id)) { 820 if (((ff & DE_ACCEPT) != 0) && (id_ == cache_id)) {
799 enabled_events_ &= ~DE_ACCEPT; 821 DisableEvents(DE_ACCEPT);
800 SignalReadEvent(this); 822 SignalReadEvent(this);
801 } 823 }
802 if ((ff & DE_READ) != 0) { 824 if ((ff & DE_READ) != 0) {
803 enabled_events_ &= ~DE_READ; 825 DisableEvents(DE_READ);
804 SignalReadEvent(this); 826 SignalReadEvent(this);
805 } 827 }
806 if (((ff & DE_WRITE) != 0) && (id_ == cache_id)) { 828 if (((ff & DE_WRITE) != 0) && (id_ == cache_id)) {
807 enabled_events_ &= ~DE_WRITE; 829 DisableEvents(DE_WRITE);
808 SignalWriteEvent(this); 830 SignalWriteEvent(this);
809 } 831 }
810 if (((ff & DE_CLOSE) != 0) && (id_ == cache_id)) { 832 if (((ff & DE_CLOSE) != 0) && (id_ == cache_id)) {
811 signal_close_ = true; 833 signal_close_ = true;
812 signal_err_ = err; 834 signal_err_ = err;
813 } 835 }
814 } 836 }
815 837
816 #elif defined(WEBRTC_POSIX) 838 #elif defined(WEBRTC_POSIX)
817 839
818 void SocketDispatcher::OnEvent(uint32_t ff, int err) { 840 void SocketDispatcher::OnEvent(uint32_t ff, int err) {
841 #if defined(WEBRTC_LINUX)
Taylor Brandstetter 2017/05/17 07:20:16 nit: It may be cleaner to do "#ifdef WEBRTC_LINUX
joachim 2017/05/17 21:17:02 Done (named it "WEBRTC_USE_EPOLL").
842 // Remember currently enabled events so we can combine multiple changes
843 // into one update call later.
844 PushEnabledEvents();
845 #endif
819 // Make sure we deliver connect/accept first. Otherwise, consumers may see 846 // Make sure we deliver connect/accept first. Otherwise, consumers may see
820 // something like a READ followed by a CONNECT, which would be odd. 847 // something like a READ followed by a CONNECT, which would be odd.
821 if ((ff & DE_CONNECT) != 0) { 848 if ((ff & DE_CONNECT) != 0) {
822 enabled_events_ &= ~DE_CONNECT; 849 DisableEvents(DE_CONNECT);
823 SignalConnectEvent(this); 850 SignalConnectEvent(this);
824 } 851 }
825 if ((ff & DE_ACCEPT) != 0) { 852 if ((ff & DE_ACCEPT) != 0) {
826 enabled_events_ &= ~DE_ACCEPT; 853 DisableEvents(DE_ACCEPT);
827 SignalReadEvent(this); 854 SignalReadEvent(this);
828 } 855 }
829 if ((ff & DE_READ) != 0) { 856 if ((ff & DE_READ) != 0) {
830 enabled_events_ &= ~DE_READ; 857 DisableEvents(DE_READ);
831 SignalReadEvent(this); 858 SignalReadEvent(this);
832 } 859 }
833 if ((ff & DE_WRITE) != 0) { 860 if ((ff & DE_WRITE) != 0) {
834 enabled_events_ &= ~DE_WRITE; 861 DisableEvents(DE_WRITE);
835 SignalWriteEvent(this); 862 SignalWriteEvent(this);
836 } 863 }
837 if ((ff & DE_CLOSE) != 0) { 864 if ((ff & DE_CLOSE) != 0) {
838 // The socket is now dead to us, so stop checking it. 865 // The socket is now dead to us, so stop checking it.
839 enabled_events_ = 0; 866 SetEnabledEvents(0);
840 SignalCloseEvent(this, err); 867 SignalCloseEvent(this, err);
841 } 868 }
869 #if defined(WEBRTC_LINUX)
870 PopEnabledEvents();
871 #endif
842 } 872 }
843 873
874 #if defined(WEBRTC_LINUX)
875
876 void SocketDispatcher::PushEnabledEvents() {
877 enabled_events_stack_.push_back(enabled_events_);
Taylor Brandstetter 2017/05/17 07:20:16 Will the stack ever have a size > 1? If so, can yo
joachim 2017/05/17 21:17:02 Currently it will only be one element. I'll change
878 }
879
880 void SocketDispatcher::PopEnabledEvents() {
881 RTC_DCHECK(!enabled_events_stack_.empty());
882 uint8_t old_events = enabled_events_stack_.back();
883 enabled_events_stack_.pop_back();
884 MaybeUpdateDispatcher(old_events);
885 }
886
887 void SocketDispatcher::MaybeUpdateDispatcher(uint8_t old_events) {
888 if (enabled_events_ != old_events && enabled_events_stack_.empty()) {
889 ss_->Update(this);
890 }
891 }
892
893 void SocketDispatcher::SetEnabledEvents(uint8_t events) {
894 uint8_t old_events = enabled_events_;
895 PhysicalSocket::SetEnabledEvents(events);
896 MaybeUpdateDispatcher(old_events);
897 }
898
899 void SocketDispatcher::EnableEvents(uint8_t events) {
900 uint8_t old_events = enabled_events_;
901 PhysicalSocket::EnableEvents(events);
902 MaybeUpdateDispatcher(old_events);
903 }
904
905 void SocketDispatcher::DisableEvents(uint8_t events) {
906 uint8_t old_events = enabled_events_;
907 PhysicalSocket::DisableEvents(events);
908 MaybeUpdateDispatcher(old_events);
909 }
910
911 #endif // WEBRTC_LINUX
912
844 #endif // WEBRTC_POSIX 913 #endif // WEBRTC_POSIX
845 914
846 int SocketDispatcher::Close() { 915 int SocketDispatcher::Close() {
847 if (s_ == INVALID_SOCKET) 916 if (s_ == INVALID_SOCKET)
848 return 0; 917 return 0;
849 918
850 #if defined(WEBRTC_WIN) 919 #if defined(WEBRTC_WIN)
851 id_ = 0; 920 id_ = 0;
852 signal_close_ = false; 921 signal_close_ = false;
853 #endif 922 #endif
(...skipping 310 matching lines...) Expand 10 before | Expand all | Expand 10 after
1164 if (pf_) 1233 if (pf_)
1165 *pf_ = false; 1234 *pf_ = false;
1166 } 1235 }
1167 1236
1168 private: 1237 private:
1169 bool *pf_; 1238 bool *pf_;
1170 }; 1239 };
1171 1240
1172 PhysicalSocketServer::PhysicalSocketServer() 1241 PhysicalSocketServer::PhysicalSocketServer()
1173 : fWait_(false) { 1242 : fWait_(false) {
1243 #if defined(WEBRTC_POSIX) && defined(WEBRTC_LINUX)
1244 // Since Linux 2.6.8, the size argument is ignored, but must be greater than
1245 // zero. Before that the size served as hint to the kernel for the amount of
1246 // space to initially allocate in internal data structures.
1247 epoll_fd_ = epoll_create(FD_SETSIZE);
1248 if (epoll_fd_ == -1) {
1249 LOG_E(LS_WARNING, EN, errno) << "epoll_create";
1250 epoll_fd_ = INVALID_SOCKET;
Taylor Brandstetter 2017/05/17 07:20:16 Could you leave a comment mentioning that the code
joachim 2017/05/17 21:17:02 Done.
1251 }
1252 #endif
1174 signal_wakeup_ = new Signaler(this, &fWait_); 1253 signal_wakeup_ = new Signaler(this, &fWait_);
1175 #if defined(WEBRTC_WIN) 1254 #if defined(WEBRTC_WIN)
1176 socket_ev_ = WSACreateEvent(); 1255 socket_ev_ = WSACreateEvent();
1177 #endif 1256 #endif
1178 } 1257 }
1179 1258
1180 PhysicalSocketServer::~PhysicalSocketServer() { 1259 PhysicalSocketServer::~PhysicalSocketServer() {
1181 #if defined(WEBRTC_WIN) 1260 #if defined(WEBRTC_WIN)
1182 WSACloseEvent(socket_ev_); 1261 WSACloseEvent(socket_ev_);
1183 #endif 1262 #endif
1184 #if defined(WEBRTC_POSIX) 1263 #if defined(WEBRTC_POSIX)
1185 signal_dispatcher_.reset(); 1264 signal_dispatcher_.reset();
1186 #endif 1265 #endif
1187 delete signal_wakeup_; 1266 delete signal_wakeup_;
1267 #if defined(WEBRTC_POSIX) && defined(WEBRTC_LINUX)
1268 if (epoll_fd_ != INVALID_SOCKET) {
1269 close(epoll_fd_);
1270 }
1271 #endif
1188 RTC_DCHECK(dispatchers_.empty()); 1272 RTC_DCHECK(dispatchers_.empty());
1189 } 1273 }
1190 1274
1191 void PhysicalSocketServer::WakeUp() { 1275 void PhysicalSocketServer::WakeUp() {
1192 signal_wakeup_->Signal(); 1276 signal_wakeup_->Signal();
1193 } 1277 }
1194 1278
1195 Socket* PhysicalSocketServer::CreateSocket(int type) { 1279 Socket* PhysicalSocketServer::CreateSocket(int type) {
1196 return CreateSocket(AF_INET, type); 1280 return CreateSocket(AF_INET, type);
1197 } 1281 }
(...skipping 27 matching lines...) Expand all
1225 if (dispatcher->Initialize()) { 1309 if (dispatcher->Initialize()) {
1226 return dispatcher; 1310 return dispatcher;
1227 } else { 1311 } else {
1228 delete dispatcher; 1312 delete dispatcher;
1229 return nullptr; 1313 return nullptr;
1230 } 1314 }
1231 } 1315 }
1232 1316
1233 void PhysicalSocketServer::Add(Dispatcher *pdispatcher) { 1317 void PhysicalSocketServer::Add(Dispatcher *pdispatcher) {
1234 CritScope cs(&crit_); 1318 CritScope cs(&crit_);
1319 #if defined(WEBRTC_WIN)
1235 // Prevent duplicates. This can cause dead dispatchers to stick around. 1320 // Prevent duplicates. This can cause dead dispatchers to stick around.
1236 DispatcherList::iterator pos = std::find(dispatchers_.begin(), 1321 DispatcherList::iterator pos = std::find(dispatchers_.begin(),
1237 dispatchers_.end(), 1322 dispatchers_.end(),
1238 pdispatcher); 1323 pdispatcher);
1239 if (pos != dispatchers_.end()) 1324 if (pos != dispatchers_.end())
1240 return; 1325 return;
1241 dispatchers_.push_back(pdispatcher); 1326 dispatchers_.push_back(pdispatcher);
1327 #else
Taylor Brandstetter 2017/05/17 07:20:16 Is there a reason the set can't be used on Windows
joachim 2017/05/17 21:17:02 The Windows code has some special handling to inva
1328 dispatchers_.insert(pdispatcher);
1329 #if defined(WEBRTC_POSIX) && defined(WEBRTC_LINUX)
1330 if (epoll_fd_ != INVALID_SOCKET) {
1331 AddEpoll(pdispatcher);
1332 }
1333 #endif // WEBRTC_POSIX && WEBRTC_LINUX
1334 #endif // WEBRTC_WIN
1242 } 1335 }
1243 1336
1244 void PhysicalSocketServer::Remove(Dispatcher *pdispatcher) { 1337 void PhysicalSocketServer::Remove(Dispatcher *pdispatcher) {
1245 CritScope cs(&crit_); 1338 CritScope cs(&crit_);
1339 #if defined(WEBRTC_WIN)
1246 DispatcherList::iterator pos = std::find(dispatchers_.begin(), 1340 DispatcherList::iterator pos = std::find(dispatchers_.begin(),
1247 dispatchers_.end(), 1341 dispatchers_.end(),
1248 pdispatcher); 1342 pdispatcher);
1249 // We silently ignore duplicate calls to Add, so we should silently ignore 1343 // We silently ignore duplicate calls to Add, so we should silently ignore
1250 // the (expected) symmetric calls to Remove. Note that this may still hide 1344 // the (expected) symmetric calls to Remove. Note that this may still hide
1251 // a real issue, so we at least log a warning about it. 1345 // a real issue, so we at least log a warning about it.
1252 if (pos == dispatchers_.end()) { 1346 if (pos == dispatchers_.end()) {
1253 LOG(LS_WARNING) << "PhysicalSocketServer asked to remove a unknown " 1347 LOG(LS_WARNING) << "PhysicalSocketServer asked to remove a unknown "
1254 << "dispatcher, potentially from a duplicate call to Add."; 1348 << "dispatcher, potentially from a duplicate call to Add.";
1255 return; 1349 return;
1256 } 1350 }
1257 size_t index = pos - dispatchers_.begin(); 1351 size_t index = pos - dispatchers_.begin();
1258 dispatchers_.erase(pos); 1352 dispatchers_.erase(pos);
1259 for (IteratorList::iterator it = iterators_.begin(); it != iterators_.end(); 1353 for (IteratorList::iterator it = iterators_.begin(); it != iterators_.end();
1260 ++it) { 1354 ++it) {
1261 if (index < **it) { 1355 if (index < **it) {
1262 --**it; 1356 --**it;
1263 } 1357 }
1264 } 1358 }
1359 #else
1360 if (!dispatchers_.erase(pdispatcher)) {
1361 LOG(LS_WARNING) << "PhysicalSocketServer asked to remove a unknown "
1362 << "dispatcher, potentially from a duplicate call to Add.";
1363 return;
1364 }
1365 #if defined(WEBRTC_POSIX) && defined(WEBRTC_LINUX)
1366 if (epoll_fd_ != INVALID_SOCKET) {
1367 RemoveEpoll(pdispatcher);
1368 }
1369 #endif // WEBRTC_POSIX && WEBRTC_LINUX
1370 #endif // WEBRTC_WIN
1371 }
1372
1373 void PhysicalSocketServer::Update(Dispatcher *pdispatcher) {
Taylor Brandstetter 2017/05/17 07:20:16 It seems like the code would be more readable if t
joachim 2017/05/17 21:17:02 The call site for this is the SocketDispatcher. I
Taylor Brandstetter 2017/05/17 23:19:02 I see now; in that case this is fine.
1374 #if defined(WEBRTC_POSIX) && defined(WEBRTC_LINUX)
1375 if (epoll_fd_ == INVALID_SOCKET) {
1376 return;
1377 }
1378
1379 CritScope cs(&crit_);
1380 if (dispatchers_.find(pdispatcher) == dispatchers_.end()) {
1381 return;
1382 }
1383
1384 UpdateEpoll(pdispatcher);
1385 #endif
1265 } 1386 }
1266 1387
1267 #if defined(WEBRTC_POSIX) 1388 #if defined(WEBRTC_POSIX)
1389
1268 bool PhysicalSocketServer::Wait(int cmsWait, bool process_io) { 1390 bool PhysicalSocketServer::Wait(int cmsWait, bool process_io) {
1391 #if defined(WEBRTC_LINUX)
1392 // We don't keep a dedicated epoll descriptor containing only non-IO (i.e.
1393 // signaling) dispatchers, so the default "select" loop will be used in that
1394 // case.
1395 if (epoll_fd_ != INVALID_SOCKET && process_io) {
1396 return WaitEpoll(cmsWait);
1397 }
1398 #endif
1399 return WaitSelect(cmsWait, process_io);
1400 }
1401
1402 bool PhysicalSocketServer::WaitSelect(int cmsWait, bool process_io) {
1269 // Calculate timing information 1403 // Calculate timing information
1270 1404
1271 struct timeval* ptvWait = nullptr; 1405 struct timeval* ptvWait = nullptr;
1272 struct timeval tvWait; 1406 struct timeval tvWait;
1273 struct timeval tvStop; 1407 struct timeval tvStop;
1274 if (cmsWait != kForever) { 1408 if (cmsWait != kForever) {
1275 // Calculate wait timeval 1409 // Calculate wait timeval
1276 tvWait.tv_sec = cmsWait / 1000; 1410 tvWait.tv_sec = cmsWait / 1000;
1277 tvWait.tv_usec = (cmsWait % 1000) * 1000; 1411 tvWait.tv_usec = (cmsWait % 1000) * 1000;
1278 ptvWait = &tvWait; 1412 ptvWait = &tvWait;
(...skipping 22 matching lines...) Expand all
1301 __msan_unpoison(&fdsRead, sizeof(fdsRead)); 1435 __msan_unpoison(&fdsRead, sizeof(fdsRead));
1302 __msan_unpoison(&fdsWrite, sizeof(fdsWrite)); 1436 __msan_unpoison(&fdsWrite, sizeof(fdsWrite));
1303 #endif 1437 #endif
1304 1438
1305 fWait_ = true; 1439 fWait_ = true;
1306 1440
1307 while (fWait_) { 1441 while (fWait_) {
1308 int fdmax = -1; 1442 int fdmax = -1;
1309 { 1443 {
1310 CritScope cr(&crit_); 1444 CritScope cr(&crit_);
1311 for (size_t i = 0; i < dispatchers_.size(); ++i) { 1445 for (Dispatcher* pdispatcher : dispatchers_) {
1312 // Query dispatchers for read and write wait state 1446 // Query dispatchers for read and write wait state
1313 Dispatcher *pdispatcher = dispatchers_[i];
1314 RTC_DCHECK(pdispatcher); 1447 RTC_DCHECK(pdispatcher);
1315 if (!process_io && (pdispatcher != signal_wakeup_)) 1448 if (!process_io && (pdispatcher != signal_wakeup_))
1316 continue; 1449 continue;
1317 int fd = pdispatcher->GetDescriptor(); 1450 int fd = pdispatcher->GetDescriptor();
1451 // "select"ing a file descriptor that is equal to or larger than
1452 // FD_SETSIZE will result in undefined behavior.
1453 RTC_DCHECK_LT(fd, FD_SETSIZE);
1318 if (fd > fdmax) 1454 if (fd > fdmax)
1319 fdmax = fd; 1455 fdmax = fd;
1320 1456
1321 uint32_t ff = pdispatcher->GetRequestedEvents(); 1457 uint32_t ff = pdispatcher->GetRequestedEvents();
1322 if (ff & (DE_READ | DE_ACCEPT)) 1458 if (ff & (DE_READ | DE_ACCEPT))
1323 FD_SET(fd, &fdsRead); 1459 FD_SET(fd, &fdsRead);
1324 if (ff & (DE_WRITE | DE_CONNECT)) 1460 if (ff & (DE_WRITE | DE_CONNECT))
1325 FD_SET(fd, &fdsWrite); 1461 FD_SET(fd, &fdsWrite);
1326 } 1462 }
1327 } 1463 }
(...skipping 13 matching lines...) Expand all
1341 // Else ignore the error and keep going. If this EINTR was for one of the 1477 // Else ignore the error and keep going. If this EINTR was for one of the
1342 // signals managed by this PhysicalSocketServer, the 1478 // signals managed by this PhysicalSocketServer, the
1343 // PosixSignalDeliveryDispatcher will be in the signaled state in the next 1479 // PosixSignalDeliveryDispatcher will be in the signaled state in the next
1344 // iteration. 1480 // iteration.
1345 } else if (n == 0) { 1481 } else if (n == 0) {
1346 // If timeout, return success 1482 // If timeout, return success
1347 return true; 1483 return true;
1348 } else { 1484 } else {
1349 // We have signaled descriptors 1485 // We have signaled descriptors
1350 CritScope cr(&crit_); 1486 CritScope cr(&crit_);
1351 for (size_t i = 0; i < dispatchers_.size(); ++i) { 1487 for (Dispatcher *pdispatcher : dispatchers_) {
1352 Dispatcher *pdispatcher = dispatchers_[i];
1353 int fd = pdispatcher->GetDescriptor(); 1488 int fd = pdispatcher->GetDescriptor();
1354 uint32_t ff = 0; 1489 uint32_t ff = 0;
1355 int errcode = 0; 1490 int errcode = 0;
1356 1491
1357 // Reap any error code, which can be signaled through reads or writes. 1492 // Reap any error code, which can be signaled through reads or writes.
1358 // TODO(pthatcher): Should we set errcode if getsockopt fails? 1493 // TODO(pthatcher): Should we set errcode if getsockopt fails?
1359 if (FD_ISSET(fd, &fdsRead) || FD_ISSET(fd, &fdsWrite)) { 1494 if (FD_ISSET(fd, &fdsRead) || FD_ISSET(fd, &fdsWrite)) {
1360 socklen_t len = sizeof(errcode); 1495 socklen_t len = sizeof(errcode);
1361 ::getsockopt(fd, SOL_SOCKET, SO_ERROR, &errcode, &len); 1496 ::getsockopt(fd, SOL_SOCKET, SO_ERROR, &errcode, &len);
1362 } 1497 }
(...skipping 53 matching lines...) Expand 10 before | Expand all | Expand 10 after
1416 ptvWait->tv_usec += 1000000; 1551 ptvWait->tv_usec += 1000000;
1417 ptvWait->tv_sec -= 1; 1552 ptvWait->tv_sec -= 1;
1418 } 1553 }
1419 } 1554 }
1420 } 1555 }
1421 } 1556 }
1422 1557
1423 return true; 1558 return true;
1424 } 1559 }
1425 1560
1561 #if defined(WEBRTC_LINUX)
1562
1563 // Initial number of events to process with one call to "epoll_wait".
1564 static const size_t kInitialEpollEvents = 128;
1565
1566 // Maximum number of events to process with one call to "epoll_wait".
1567 static const size_t kMaxEpollEvents = 8192;
1568
1569 static int GetEpollEvents(Dispatcher* pdispatcher) {
1570 int events = 0;
1571 uint32_t ff = pdispatcher->GetRequestedEvents();
1572 if (ff & (DE_READ | DE_ACCEPT)) {
1573 events |= EPOLLIN;
1574 }
1575 if (ff & (DE_WRITE | DE_CONNECT)) {
1576 events |= EPOLLOUT;
1577 }
1578 return events;
1579 }
1580
1581 void PhysicalSocketServer::AddEpoll(Dispatcher* pdispatcher) {
1582 RTC_DCHECK(epoll_fd_ != INVALID_SOCKET);
1583 int fd = pdispatcher->GetDescriptor();
1584 RTC_DCHECK(fd != INVALID_SOCKET);
1585 if (fd == INVALID_SOCKET) {
1586 return;
1587 }
1588
1589 struct epoll_event event = {0};
1590 event.events = GetEpollEvents(pdispatcher);
1591 event.data.ptr = pdispatcher;
1592 int err = epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, fd, &event);
1593 RTC_DCHECK_EQ(err, 0);
1594 if (err == -1) {
1595 LOG_E(LS_ERROR, EN, errno) << "epoll_ctl EPOLL_CTL_ADD";
1596 }
1597 }
1598
1599 void PhysicalSocketServer::RemoveEpoll(Dispatcher* pdispatcher) {
1600 RTC_DCHECK(epoll_fd_ != INVALID_SOCKET);
1601 int fd = pdispatcher->GetDescriptor();
1602 RTC_DCHECK(fd != INVALID_SOCKET);
1603 if (fd == INVALID_SOCKET) {
1604 return;
1605 }
1606
1607 struct epoll_event event = {0};
1608 int err = epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, fd, &event);
1609 RTC_DCHECK(err == 0 || errno == ENOENT);
1610 if (err == -1) {
1611 if (errno == ENOENT) {
1612 // Socket has already been closed.
1613 LOG_E(LS_VERBOSE, EN, errno) << "epoll_ctl EPOLL_CTL_DEL";
1614 } else {
1615 LOG_E(LS_ERROR, EN, errno) << "epoll_ctl EPOLL_CTL_DEL";
1616 }
1617 }
1618 }
1619
1620 void PhysicalSocketServer::UpdateEpoll(Dispatcher* pdispatcher) {
1621 RTC_DCHECK(epoll_fd_ != INVALID_SOCKET);
1622 int fd = pdispatcher->GetDescriptor();
1623 RTC_DCHECK(fd != INVALID_SOCKET);
1624 if (fd == INVALID_SOCKET) {
1625 return;
1626 }
1627
1628 struct epoll_event event = {0};
1629 event.events = GetEpollEvents(pdispatcher);
1630 event.data.ptr = pdispatcher;
1631 int err = epoll_ctl(epoll_fd_, EPOLL_CTL_MOD, fd, &event);
1632 RTC_DCHECK_EQ(err, 0);
1633 if (err == -1) {
1634 LOG_E(LS_ERROR, EN, errno) << "epoll_ctl EPOLL_CTL_MOD";
1635 }
1636 }
1637
1638 bool PhysicalSocketServer::WaitEpoll(int cmsWait) {
1639 RTC_DCHECK(epoll_fd_ != INVALID_SOCKET);
1640 int64_t tvWait = -1;
1641 int64_t tvStop = -1;
1642 if (cmsWait != kForever) {
1643 tvWait = cmsWait;
1644 tvStop = TimeAfter(cmsWait);
1645 }
1646
1647 if (epoll_events_.empty()) {
1648 // The initial space to receive events is created only if epoll is used.
1649 epoll_events_.resize(kInitialEpollEvents);
1650 }
1651
1652 fWait_ = true;
1653
1654 while (fWait_) {
1655 // Wait then call handlers as appropriate
1656 // < 0 means error
1657 // 0 means timeout
1658 // > 0 means count of descriptors ready
1659 int n = epoll_wait(epoll_fd_, &epoll_events_[0],
1660 static_cast<int>(epoll_events_.size()), static_cast<int>(tvWait));
1661 if (n < 0) {
1662 if (errno != EINTR) {
1663 LOG_E(LS_ERROR, EN, errno) << "epoll";
1664 return false;
1665 }
1666 // Else ignore the error and keep going. If this EINTR was for one of the
1667 // signals managed by this PhysicalSocketServer, the
1668 // PosixSignalDeliveryDispatcher will be in the signaled state in the next
1669 // iteration.
1670 } else if (n == 0) {
1671 // If timeout, return success
1672 return true;
1673 } else {
1674 // We have signaled descriptors
1675 CritScope cr(&crit_);
1676 for (int i = 0; i < n; ++i) {
1677 const epoll_event& event = epoll_events_[i];
1678 Dispatcher* pdispatcher = static_cast<Dispatcher*>(event.data.ptr);
1679 if (dispatchers_.find(pdispatcher) == dispatchers_.end()) {
1680 // The dispatcher for this socket no longer exists.
1681 continue;
1682 }
1683
1684 int fd = pdispatcher->GetDescriptor();
1685 uint32_t ff = 0;
1686 int errcode = 0;
1687
1688 // Reap any error code.
1689 // TODO(pthatcher): Should we set errcode if getsockopt fails?
1690 if (event.events & (EPOLLRDHUP | EPOLLERR | EPOLLHUP)) {
1691 socklen_t len = sizeof(errcode);
1692 ::getsockopt(fd, SOL_SOCKET, SO_ERROR, &errcode, &len);
1693 }
1694
1695 // Check readable descriptors. If we're waiting on an accept, signal
1696 // that. Otherwise we're waiting for data, check to see if we're
1697 // readable or really closed.
1698 // TODO(pthatcher): Only peek at TCP descriptors.
1699 if ((event.events & (EPOLLIN | EPOLLPRI))) {
1700 if (pdispatcher->GetRequestedEvents() & DE_ACCEPT) {
1701 ff |= DE_ACCEPT;
1702 } else if (errcode || pdispatcher->IsDescriptorClosed()) {
1703 ff |= DE_CLOSE;
1704 } else {
1705 ff |= DE_READ;
1706 }
1707 }
1708
1709 // Check writable descriptors. If we're waiting on a connect, detect
1710 // success versus failure by the reaped error code.
1711 if ((event.events & EPOLLOUT)) {
1712 if (pdispatcher->GetRequestedEvents() & DE_CONNECT) {
1713 if (!errcode) {
1714 ff |= DE_CONNECT;
1715 } else {
1716 ff |= DE_CLOSE;
1717 }
1718 } else {
1719 ff |= DE_WRITE;
1720 }
1721 }
1722
1723 // Tell the descriptor about the event.
1724 if (ff != 0) {
1725 pdispatcher->OnPreEvent(ff);
1726 pdispatcher->OnEvent(ff, errcode);
1727 }
1728 }
1729 }
1730
1731 if (static_cast<size_t>(n) == epoll_events_.size() &&
1732 epoll_events_.size() < kMaxEpollEvents) {
1733 // We used the complete space to receive events, increase size for future
1734 // iterations.
1735 epoll_events_.resize(std::max(epoll_events_.size() * 2, kMaxEpollEvents));
1736 }
1737
1738 if (cmsWait != kForever) {
1739 tvWait = TimeDiff(tvStop, TimeMillis());
1740 if (tvWait < 0) {
1741 // Return success on timeout.
1742 return true;
1743 }
1744 }
1745 }
1746
1747 return true;
1748 }
1749
1750 #endif // WEBRTC_LINUX
1751
1426 static void GlobalSignalHandler(int signum) { 1752 static void GlobalSignalHandler(int signum) {
1427 PosixSignalHandler::Instance()->OnPosixSignalReceived(signum); 1753 PosixSignalHandler::Instance()->OnPosixSignalReceived(signum);
1428 } 1754 }
1429 1755
1430 bool PhysicalSocketServer::SetPosixSignalHandler(int signum, 1756 bool PhysicalSocketServer::SetPosixSignalHandler(int signum,
1431 void (*handler)(int)) { 1757 void (*handler)(int)) {
1432 // If handler is SIG_IGN or SIG_DFL then clear our user-level handler, 1758 // If handler is SIG_IGN or SIG_DFL then clear our user-level handler,
1433 // otherwise set one. 1759 // otherwise set one.
1434 if (handler == SIG_IGN || handler == SIG_DFL) { 1760 if (handler == SIG_IGN || handler == SIG_DFL) {
1435 if (!InstallSignal(signum, handler)) { 1761 if (!InstallSignal(signum, handler)) {
(...skipping 201 matching lines...) Expand 10 before | Expand all | Expand 10 after
1637 break; 1963 break;
1638 } 1964 }
1639 } 1965 }
1640 1966
1641 // Done 1967 // Done
1642 return true; 1968 return true;
1643 } 1969 }
1644 #endif // WEBRTC_WIN 1970 #endif // WEBRTC_WIN
1645 1971
1646 } // namespace rtc 1972 } // namespace rtc
OLDNEW
« no previous file with comments | « webrtc/base/physicalsocketserver.h ('k') | no next file » | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698