OLD | NEW |
---|---|
1 /* | 1 /* |
2 * Copyright 2004 The WebRTC Project Authors. All rights reserved. | 2 * Copyright 2004 The WebRTC Project Authors. All rights reserved. |
3 * | 3 * |
4 * Use of this source code is governed by a BSD-style license | 4 * Use of this source code is governed by a BSD-style license |
5 * that can be found in the LICENSE file in the root of the source | 5 * that can be found in the LICENSE file in the root of the source |
6 * tree. An additional intellectual property rights grant can be found | 6 * tree. An additional intellectual property rights grant can be found |
7 * in the file PATENTS. All contributing project authors may | 7 * in the file PATENTS. All contributing project authors may |
8 * be found in the AUTHORS file in the root of the source tree. | 8 * be found in the AUTHORS file in the root of the source tree. |
9 */ | 9 */ |
10 #include "webrtc/base/physicalsocketserver.h" | 10 #include "webrtc/base/physicalsocketserver.h" |
(...skipping 19 matching lines...) Expand all Loading... | |
30 | 30 |
31 #if defined(WEBRTC_WIN) | 31 #if defined(WEBRTC_WIN) |
32 #define WIN32_LEAN_AND_MEAN | 32 #define WIN32_LEAN_AND_MEAN |
33 #include <windows.h> | 33 #include <windows.h> |
34 #include <winsock2.h> | 34 #include <winsock2.h> |
35 #include <ws2tcpip.h> | 35 #include <ws2tcpip.h> |
36 #undef SetPort | 36 #undef SetPort |
37 #endif | 37 #endif |
38 | 38 |
39 #include <algorithm> | 39 #include <algorithm> |
40 #include <limits> | |
40 #include <map> | 41 #include <map> |
41 | 42 |
42 #include "webrtc/base/arraysize.h" | 43 #include "webrtc/base/arraysize.h" |
43 #include "webrtc/base/basictypes.h" | 44 #include "webrtc/base/basictypes.h" |
44 #include "webrtc/base/byteorder.h" | 45 #include "webrtc/base/byteorder.h" |
45 #include "webrtc/base/checks.h" | 46 #include "webrtc/base/checks.h" |
46 #include "webrtc/base/logging.h" | 47 #include "webrtc/base/logging.h" |
47 #include "webrtc/base/networkmonitor.h" | 48 #include "webrtc/base/networkmonitor.h" |
48 #include "webrtc/base/nullsocketserver.h" | 49 #include "webrtc/base/nullsocketserver.h" |
49 #include "webrtc/base/timeutils.h" | 50 #include "webrtc/base/timeutils.h" |
(...skipping 24 matching lines...) Expand all Loading... | |
74 | 75 |
75 int64_t GetSocketRecvTimestamp(int socket) { | 76 int64_t GetSocketRecvTimestamp(int socket) { |
76 return -1; | 77 return -1; |
77 } | 78 } |
78 #endif | 79 #endif |
79 | 80 |
80 #if defined(WEBRTC_WIN) | 81 #if defined(WEBRTC_WIN) |
81 typedef char* SockOptArg; | 82 typedef char* SockOptArg; |
82 #endif | 83 #endif |
83 | 84 |
85 #if defined(WEBRTC_POSIX) && defined(WEBRTC_LINUX) | |
86 // EPOLLRDHUP is only defined starting with Linux 2.6.17. | |
87 #if !defined(EPOLLRDHUP) | |
88 #define EPOLLRDHUP 0x2000 | |
89 #endif | |
90 #endif | |
91 | |
84 namespace rtc { | 92 namespace rtc { |
85 | 93 |
86 std::unique_ptr<SocketServer> SocketServer::CreateDefault() { | 94 std::unique_ptr<SocketServer> SocketServer::CreateDefault() { |
87 #if defined(__native_client__) | 95 #if defined(__native_client__) |
88 return std::unique_ptr<SocketServer>(new rtc::NullSocketServer); | 96 return std::unique_ptr<SocketServer>(new rtc::NullSocketServer); |
89 #else | 97 #else |
90 return std::unique_ptr<SocketServer>(new rtc::PhysicalSocketServer); | 98 return std::unique_ptr<SocketServer>(new rtc::PhysicalSocketServer); |
91 #endif | 99 #endif |
92 } | 100 } |
93 | 101 |
(...skipping 33 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
127 resolver_(nullptr) { | 135 resolver_(nullptr) { |
128 #if defined(WEBRTC_WIN) | 136 #if defined(WEBRTC_WIN) |
129 // EnsureWinsockInit() ensures that winsock is initialized. The default | 137 // EnsureWinsockInit() ensures that winsock is initialized. The default |
130 // version of this function doesn't do anything because winsock is | 138 // version of this function doesn't do anything because winsock is |
131 // initialized by constructor of a static object. If neccessary libjingle | 139 // initialized by constructor of a static object. If neccessary libjingle |
132 // users can link it with a different version of this function by replacing | 140 // users can link it with a different version of this function by replacing |
133 // win32socketinit.cc. See win32socketinit.cc for more details. | 141 // win32socketinit.cc. See win32socketinit.cc for more details. |
134 EnsureWinsockInit(); | 142 EnsureWinsockInit(); |
135 #endif | 143 #endif |
136 if (s_ != INVALID_SOCKET) { | 144 if (s_ != INVALID_SOCKET) { |
137 enabled_events_ = DE_READ | DE_WRITE; | 145 SetEnabledEvents(DE_READ | DE_WRITE); |
Taylor Brandstetter
2017/05/17 07:20:16
nit: The "SetEnabledEvents" etc. change seems like
joachim
2017/05/17 21:17:02
Moved to https://codereview.webrtc.org/2893723002/
| |
138 | 146 |
139 int type = SOCK_STREAM; | 147 int type = SOCK_STREAM; |
140 socklen_t len = sizeof(type); | 148 socklen_t len = sizeof(type); |
141 const int res = | 149 const int res = |
142 getsockopt(s_, SOL_SOCKET, SO_TYPE, (SockOptArg)&type, &len); | 150 getsockopt(s_, SOL_SOCKET, SO_TYPE, (SockOptArg)&type, &len); |
143 RTC_DCHECK_EQ(0, res); | 151 RTC_DCHECK_EQ(0, res); |
144 udp_ = (SOCK_DGRAM == type); | 152 udp_ = (SOCK_DGRAM == type); |
145 } | 153 } |
146 } | 154 } |
147 | 155 |
148 PhysicalSocket::~PhysicalSocket() { | 156 PhysicalSocket::~PhysicalSocket() { |
149 Close(); | 157 Close(); |
150 } | 158 } |
151 | 159 |
152 bool PhysicalSocket::Create(int family, int type) { | 160 bool PhysicalSocket::Create(int family, int type) { |
153 Close(); | 161 Close(); |
154 s_ = ::socket(family, type, 0); | 162 s_ = ::socket(family, type, 0); |
155 udp_ = (SOCK_DGRAM == type); | 163 udp_ = (SOCK_DGRAM == type); |
156 UpdateLastError(); | 164 UpdateLastError(); |
157 if (udp_) | 165 if (udp_) { |
158 enabled_events_ = DE_READ | DE_WRITE; | 166 SetEnabledEvents(DE_READ | DE_WRITE); |
167 } | |
159 return s_ != INVALID_SOCKET; | 168 return s_ != INVALID_SOCKET; |
160 } | 169 } |
161 | 170 |
162 SocketAddress PhysicalSocket::GetLocalAddress() const { | 171 SocketAddress PhysicalSocket::GetLocalAddress() const { |
163 sockaddr_storage addr_storage = {0}; | 172 sockaddr_storage addr_storage = {0}; |
164 socklen_t addrlen = sizeof(addr_storage); | 173 socklen_t addrlen = sizeof(addr_storage); |
165 sockaddr* addr = reinterpret_cast<sockaddr*>(&addr_storage); | 174 sockaddr* addr = reinterpret_cast<sockaddr*>(&addr_storage); |
166 int result = ::getsockname(s_, addr, &addrlen); | 175 int result = ::getsockname(s_, addr, &addrlen); |
167 SocketAddress address; | 176 SocketAddress address; |
168 if (result >= 0) { | 177 if (result >= 0) { |
(...skipping 91 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
260 int PhysicalSocket::DoConnect(const SocketAddress& connect_addr) { | 269 int PhysicalSocket::DoConnect(const SocketAddress& connect_addr) { |
261 if ((s_ == INVALID_SOCKET) && | 270 if ((s_ == INVALID_SOCKET) && |
262 !Create(connect_addr.family(), SOCK_STREAM)) { | 271 !Create(connect_addr.family(), SOCK_STREAM)) { |
263 return SOCKET_ERROR; | 272 return SOCKET_ERROR; |
264 } | 273 } |
265 sockaddr_storage addr_storage; | 274 sockaddr_storage addr_storage; |
266 size_t len = connect_addr.ToSockAddrStorage(&addr_storage); | 275 size_t len = connect_addr.ToSockAddrStorage(&addr_storage); |
267 sockaddr* addr = reinterpret_cast<sockaddr*>(&addr_storage); | 276 sockaddr* addr = reinterpret_cast<sockaddr*>(&addr_storage); |
268 int err = ::connect(s_, addr, static_cast<int>(len)); | 277 int err = ::connect(s_, addr, static_cast<int>(len)); |
269 UpdateLastError(); | 278 UpdateLastError(); |
279 uint8_t events = DE_READ | DE_WRITE; | |
270 if (err == 0) { | 280 if (err == 0) { |
271 state_ = CS_CONNECTED; | 281 state_ = CS_CONNECTED; |
272 } else if (IsBlockingError(GetError())) { | 282 } else if (IsBlockingError(GetError())) { |
273 state_ = CS_CONNECTING; | 283 state_ = CS_CONNECTING; |
274 enabled_events_ |= DE_CONNECT; | 284 events |= DE_CONNECT; |
275 } else { | 285 } else { |
276 return SOCKET_ERROR; | 286 return SOCKET_ERROR; |
277 } | 287 } |
278 | 288 |
279 enabled_events_ |= DE_READ | DE_WRITE; | 289 EnableEvents(events); |
280 return 0; | 290 return 0; |
281 } | 291 } |
282 | 292 |
283 int PhysicalSocket::GetError() const { | 293 int PhysicalSocket::GetError() const { |
284 CritScope cs(&crit_); | 294 CritScope cs(&crit_); |
285 return error_; | 295 return error_; |
286 } | 296 } |
287 | 297 |
288 void PhysicalSocket::SetError(int error) { | 298 void PhysicalSocket::SetError(int error) { |
289 CritScope cs(&crit_); | 299 CritScope cs(&crit_); |
(...skipping 45 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
335 #else | 345 #else |
336 0 | 346 0 |
337 #endif | 347 #endif |
338 ); | 348 ); |
339 UpdateLastError(); | 349 UpdateLastError(); |
340 MaybeRemapSendError(); | 350 MaybeRemapSendError(); |
341 // We have seen minidumps where this may be false. | 351 // We have seen minidumps where this may be false. |
342 RTC_DCHECK(sent <= static_cast<int>(cb)); | 352 RTC_DCHECK(sent <= static_cast<int>(cb)); |
343 if ((sent > 0 && sent < static_cast<int>(cb)) || | 353 if ((sent > 0 && sent < static_cast<int>(cb)) || |
344 (sent < 0 && IsBlockingError(GetError()))) { | 354 (sent < 0 && IsBlockingError(GetError()))) { |
345 enabled_events_ |= DE_WRITE; | 355 EnableEvents(DE_WRITE); |
346 } | 356 } |
347 return sent; | 357 return sent; |
348 } | 358 } |
349 | 359 |
350 int PhysicalSocket::SendTo(const void* buffer, | 360 int PhysicalSocket::SendTo(const void* buffer, |
351 size_t length, | 361 size_t length, |
352 const SocketAddress& addr) { | 362 const SocketAddress& addr) { |
353 sockaddr_storage saddr; | 363 sockaddr_storage saddr; |
354 size_t len = addr.ToSockAddrStorage(&saddr); | 364 size_t len = addr.ToSockAddrStorage(&saddr); |
355 int sent = DoSendTo( | 365 int sent = DoSendTo( |
356 s_, static_cast<const char *>(buffer), static_cast<int>(length), | 366 s_, static_cast<const char *>(buffer), static_cast<int>(length), |
357 #if defined(WEBRTC_LINUX) && !defined(WEBRTC_ANDROID) | 367 #if defined(WEBRTC_LINUX) && !defined(WEBRTC_ANDROID) |
358 // Suppress SIGPIPE. See above for explanation. | 368 // Suppress SIGPIPE. See above for explanation. |
359 MSG_NOSIGNAL, | 369 MSG_NOSIGNAL, |
360 #else | 370 #else |
361 0, | 371 0, |
362 #endif | 372 #endif |
363 reinterpret_cast<sockaddr*>(&saddr), static_cast<int>(len)); | 373 reinterpret_cast<sockaddr*>(&saddr), static_cast<int>(len)); |
364 UpdateLastError(); | 374 UpdateLastError(); |
365 MaybeRemapSendError(); | 375 MaybeRemapSendError(); |
366 // We have seen minidumps where this may be false. | 376 // We have seen minidumps where this may be false. |
367 RTC_DCHECK(sent <= static_cast<int>(length)); | 377 RTC_DCHECK(sent <= static_cast<int>(length)); |
368 if ((sent > 0 && sent < static_cast<int>(length)) || | 378 if ((sent > 0 && sent < static_cast<int>(length)) || |
369 (sent < 0 && IsBlockingError(GetError()))) { | 379 (sent < 0 && IsBlockingError(GetError()))) { |
370 enabled_events_ |= DE_WRITE; | 380 EnableEvents(DE_WRITE); |
371 } | 381 } |
372 return sent; | 382 return sent; |
373 } | 383 } |
374 | 384 |
375 int PhysicalSocket::Recv(void* buffer, size_t length, int64_t* timestamp) { | 385 int PhysicalSocket::Recv(void* buffer, size_t length, int64_t* timestamp) { |
376 int received = ::recv(s_, static_cast<char*>(buffer), | 386 int received = ::recv(s_, static_cast<char*>(buffer), |
377 static_cast<int>(length), 0); | 387 static_cast<int>(length), 0); |
378 if ((received == 0) && (length != 0)) { | 388 if ((received == 0) && (length != 0)) { |
379 // Note: on graceful shutdown, recv can return 0. In this case, we | 389 // Note: on graceful shutdown, recv can return 0. In this case, we |
380 // pretend it is blocking, and then signal close, so that simplifying | 390 // pretend it is blocking, and then signal close, so that simplifying |
381 // assumptions can be made about Recv. | 391 // assumptions can be made about Recv. |
382 LOG(LS_WARNING) << "EOF from socket; deferring close event"; | 392 LOG(LS_WARNING) << "EOF from socket; deferring close event"; |
383 // Must turn this back on so that the select() loop will notice the close | 393 // Must turn this back on so that the select() loop will notice the close |
384 // event. | 394 // event. |
385 enabled_events_ |= DE_READ; | 395 EnableEvents(DE_READ); |
386 SetError(EWOULDBLOCK); | 396 SetError(EWOULDBLOCK); |
387 return SOCKET_ERROR; | 397 return SOCKET_ERROR; |
388 } | 398 } |
389 if (timestamp) { | 399 if (timestamp) { |
390 *timestamp = GetSocketRecvTimestamp(s_); | 400 *timestamp = GetSocketRecvTimestamp(s_); |
391 } | 401 } |
392 UpdateLastError(); | 402 UpdateLastError(); |
393 int error = GetError(); | 403 int error = GetError(); |
394 bool success = (received >= 0) || IsBlockingError(error); | 404 bool success = (received >= 0) || IsBlockingError(error); |
395 if (udp_ || success) { | 405 if (udp_ || success) { |
396 enabled_events_ |= DE_READ; | 406 EnableEvents(DE_READ); |
397 } | 407 } |
398 if (!success) { | 408 if (!success) { |
399 LOG_F(LS_VERBOSE) << "Error = " << error; | 409 LOG_F(LS_VERBOSE) << "Error = " << error; |
400 } | 410 } |
401 return received; | 411 return received; |
402 } | 412 } |
403 | 413 |
404 int PhysicalSocket::RecvFrom(void* buffer, | 414 int PhysicalSocket::RecvFrom(void* buffer, |
405 size_t length, | 415 size_t length, |
406 SocketAddress* out_addr, | 416 SocketAddress* out_addr, |
407 int64_t* timestamp) { | 417 int64_t* timestamp) { |
408 sockaddr_storage addr_storage; | 418 sockaddr_storage addr_storage; |
409 socklen_t addr_len = sizeof(addr_storage); | 419 socklen_t addr_len = sizeof(addr_storage); |
410 sockaddr* addr = reinterpret_cast<sockaddr*>(&addr_storage); | 420 sockaddr* addr = reinterpret_cast<sockaddr*>(&addr_storage); |
411 int received = ::recvfrom(s_, static_cast<char*>(buffer), | 421 int received = ::recvfrom(s_, static_cast<char*>(buffer), |
412 static_cast<int>(length), 0, addr, &addr_len); | 422 static_cast<int>(length), 0, addr, &addr_len); |
413 if (timestamp) { | 423 if (timestamp) { |
414 *timestamp = GetSocketRecvTimestamp(s_); | 424 *timestamp = GetSocketRecvTimestamp(s_); |
415 } | 425 } |
416 UpdateLastError(); | 426 UpdateLastError(); |
417 if ((received >= 0) && (out_addr != nullptr)) | 427 if ((received >= 0) && (out_addr != nullptr)) |
418 SocketAddressFromSockAddrStorage(addr_storage, out_addr); | 428 SocketAddressFromSockAddrStorage(addr_storage, out_addr); |
419 int error = GetError(); | 429 int error = GetError(); |
420 bool success = (received >= 0) || IsBlockingError(error); | 430 bool success = (received >= 0) || IsBlockingError(error); |
421 if (udp_ || success) { | 431 if (udp_ || success) { |
422 enabled_events_ |= DE_READ; | 432 EnableEvents(DE_READ); |
423 } | 433 } |
424 if (!success) { | 434 if (!success) { |
425 LOG_F(LS_VERBOSE) << "Error = " << error; | 435 LOG_F(LS_VERBOSE) << "Error = " << error; |
426 } | 436 } |
427 return received; | 437 return received; |
428 } | 438 } |
429 | 439 |
430 int PhysicalSocket::Listen(int backlog) { | 440 int PhysicalSocket::Listen(int backlog) { |
431 int err = ::listen(s_, backlog); | 441 int err = ::listen(s_, backlog); |
432 UpdateLastError(); | 442 UpdateLastError(); |
433 if (err == 0) { | 443 if (err == 0) { |
434 state_ = CS_CONNECTING; | 444 state_ = CS_CONNECTING; |
435 enabled_events_ |= DE_ACCEPT; | 445 EnableEvents(DE_ACCEPT); |
436 #if !defined(NDEBUG) | 446 #if !defined(NDEBUG) |
437 dbg_addr_ = "Listening @ "; | 447 dbg_addr_ = "Listening @ "; |
438 dbg_addr_.append(GetLocalAddress().ToString()); | 448 dbg_addr_.append(GetLocalAddress().ToString()); |
439 #endif | 449 #endif |
440 } | 450 } |
441 return err; | 451 return err; |
442 } | 452 } |
443 | 453 |
444 AsyncSocket* PhysicalSocket::Accept(SocketAddress* out_addr) { | 454 AsyncSocket* PhysicalSocket::Accept(SocketAddress* out_addr) { |
445 // Always re-subscribe DE_ACCEPT to make sure new incoming connections will | 455 // Always re-subscribe DE_ACCEPT to make sure new incoming connections will |
446 // trigger an event even if DoAccept returns an error here. | 456 // trigger an event even if DoAccept returns an error here. |
447 enabled_events_ |= DE_ACCEPT; | 457 EnableEvents(DE_ACCEPT); |
448 sockaddr_storage addr_storage; | 458 sockaddr_storage addr_storage; |
449 socklen_t addr_len = sizeof(addr_storage); | 459 socklen_t addr_len = sizeof(addr_storage); |
450 sockaddr* addr = reinterpret_cast<sockaddr*>(&addr_storage); | 460 sockaddr* addr = reinterpret_cast<sockaddr*>(&addr_storage); |
451 SOCKET s = DoAccept(s_, addr, &addr_len); | 461 SOCKET s = DoAccept(s_, addr, &addr_len); |
452 UpdateLastError(); | 462 UpdateLastError(); |
453 if (s == INVALID_SOCKET) | 463 if (s == INVALID_SOCKET) |
454 return nullptr; | 464 return nullptr; |
455 if (out_addr != nullptr) | 465 if (out_addr != nullptr) |
456 SocketAddressFromSockAddrStorage(addr_storage, out_addr); | 466 SocketAddressFromSockAddrStorage(addr_storage, out_addr); |
457 return ss_->WrapSocket(s); | 467 return ss_->WrapSocket(s); |
458 } | 468 } |
459 | 469 |
460 int PhysicalSocket::Close() { | 470 int PhysicalSocket::Close() { |
461 if (s_ == INVALID_SOCKET) | 471 if (s_ == INVALID_SOCKET) |
462 return 0; | 472 return 0; |
463 int err = ::closesocket(s_); | 473 int err = ::closesocket(s_); |
464 UpdateLastError(); | 474 UpdateLastError(); |
465 s_ = INVALID_SOCKET; | 475 s_ = INVALID_SOCKET; |
466 state_ = CS_CLOSED; | 476 state_ = CS_CLOSED; |
467 enabled_events_ = 0; | 477 SetEnabledEvents(0); |
468 if (resolver_) { | 478 if (resolver_) { |
469 resolver_->Destroy(false); | 479 resolver_->Destroy(false); |
470 resolver_ = nullptr; | 480 resolver_ = nullptr; |
471 } | 481 } |
472 return err; | 482 return err; |
473 } | 483 } |
474 | 484 |
475 int PhysicalSocket::EstimateMTU(uint16_t* mtu) { | 485 int PhysicalSocket::EstimateMTU(uint16_t* mtu) { |
476 SocketAddress addr = GetRemoteAddress(); | 486 SocketAddress addr = GetRemoteAddress(); |
477 if (addr.IsAnyIP()) { | 487 if (addr.IsAnyIP()) { |
(...skipping 105 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
583 // Reference/ManPages/man2/sendto.2.html | 593 // Reference/ManPages/man2/sendto.2.html |
584 // ENOBUFS - The output queue for a network interface is full. | 594 // ENOBUFS - The output queue for a network interface is full. |
585 // This generally indicates that the interface has stopped sending, | 595 // This generally indicates that the interface has stopped sending, |
586 // but may be caused by transient congestion. | 596 // but may be caused by transient congestion. |
587 if (GetError() == ENOBUFS) { | 597 if (GetError() == ENOBUFS) { |
588 SetError(EWOULDBLOCK); | 598 SetError(EWOULDBLOCK); |
589 } | 599 } |
590 #endif | 600 #endif |
591 } | 601 } |
592 | 602 |
603 void PhysicalSocket::SetEnabledEvents(uint8_t events) { | |
604 enabled_events_ = events; | |
605 } | |
606 | |
607 void PhysicalSocket::EnableEvents(uint8_t events) { | |
608 enabled_events_ |= events; | |
609 } | |
610 | |
611 void PhysicalSocket::DisableEvents(uint8_t events) { | |
612 enabled_events_ &= ~events; | |
613 } | |
614 | |
593 int PhysicalSocket::TranslateOption(Option opt, int* slevel, int* sopt) { | 615 int PhysicalSocket::TranslateOption(Option opt, int* slevel, int* sopt) { |
594 switch (opt) { | 616 switch (opt) { |
595 case OPT_DONTFRAGMENT: | 617 case OPT_DONTFRAGMENT: |
596 #if defined(WEBRTC_WIN) | 618 #if defined(WEBRTC_WIN) |
597 *slevel = IPPROTO_IP; | 619 *slevel = IPPROTO_IP; |
598 *sopt = IP_DONTFRAGMENT; | 620 *sopt = IP_DONTFRAGMENT; |
599 break; | 621 break; |
600 #elif defined(WEBRTC_MAC) || defined(BSD) || defined(__native_client__) | 622 #elif defined(WEBRTC_MAC) || defined(BSD) || defined(__native_client__) |
601 LOG(LS_WARNING) << "Socket::OPT_DONTFRAGMENT not supported."; | 623 LOG(LS_WARNING) << "Socket::OPT_DONTFRAGMENT not supported."; |
602 return -1; | 624 return -1; |
(...skipping 178 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
781 | 803 |
782 #if defined(WEBRTC_WIN) | 804 #if defined(WEBRTC_WIN) |
783 | 805 |
784 void SocketDispatcher::OnEvent(uint32_t ff, int err) { | 806 void SocketDispatcher::OnEvent(uint32_t ff, int err) { |
785 int cache_id = id_; | 807 int cache_id = id_; |
786 // Make sure we deliver connect/accept first. Otherwise, consumers may see | 808 // Make sure we deliver connect/accept first. Otherwise, consumers may see |
787 // something like a READ followed by a CONNECT, which would be odd. | 809 // something like a READ followed by a CONNECT, which would be odd. |
788 if (((ff & DE_CONNECT) != 0) && (id_ == cache_id)) { | 810 if (((ff & DE_CONNECT) != 0) && (id_ == cache_id)) { |
789 if (ff != DE_CONNECT) | 811 if (ff != DE_CONNECT) |
790 LOG(LS_VERBOSE) << "Signalled with DE_CONNECT: " << ff; | 812 LOG(LS_VERBOSE) << "Signalled with DE_CONNECT: " << ff; |
791 enabled_events_ &= ~DE_CONNECT; | 813 DisableEvents(DE_CONNECT); |
792 #if !defined(NDEBUG) | 814 #if !defined(NDEBUG) |
793 dbg_addr_ = "Connected @ "; | 815 dbg_addr_ = "Connected @ "; |
794 dbg_addr_.append(GetRemoteAddress().ToString()); | 816 dbg_addr_.append(GetRemoteAddress().ToString()); |
795 #endif | 817 #endif |
796 SignalConnectEvent(this); | 818 SignalConnectEvent(this); |
797 } | 819 } |
798 if (((ff & DE_ACCEPT) != 0) && (id_ == cache_id)) { | 820 if (((ff & DE_ACCEPT) != 0) && (id_ == cache_id)) { |
799 enabled_events_ &= ~DE_ACCEPT; | 821 DisableEvents(DE_ACCEPT); |
800 SignalReadEvent(this); | 822 SignalReadEvent(this); |
801 } | 823 } |
802 if ((ff & DE_READ) != 0) { | 824 if ((ff & DE_READ) != 0) { |
803 enabled_events_ &= ~DE_READ; | 825 DisableEvents(DE_READ); |
804 SignalReadEvent(this); | 826 SignalReadEvent(this); |
805 } | 827 } |
806 if (((ff & DE_WRITE) != 0) && (id_ == cache_id)) { | 828 if (((ff & DE_WRITE) != 0) && (id_ == cache_id)) { |
807 enabled_events_ &= ~DE_WRITE; | 829 DisableEvents(DE_WRITE); |
808 SignalWriteEvent(this); | 830 SignalWriteEvent(this); |
809 } | 831 } |
810 if (((ff & DE_CLOSE) != 0) && (id_ == cache_id)) { | 832 if (((ff & DE_CLOSE) != 0) && (id_ == cache_id)) { |
811 signal_close_ = true; | 833 signal_close_ = true; |
812 signal_err_ = err; | 834 signal_err_ = err; |
813 } | 835 } |
814 } | 836 } |
815 | 837 |
816 #elif defined(WEBRTC_POSIX) | 838 #elif defined(WEBRTC_POSIX) |
817 | 839 |
818 void SocketDispatcher::OnEvent(uint32_t ff, int err) { | 840 void SocketDispatcher::OnEvent(uint32_t ff, int err) { |
841 #if defined(WEBRTC_LINUX) | |
Taylor Brandstetter
2017/05/17 07:20:16
nit: It may be cleaner to do "#ifdef WEBRTC_LINUX
joachim
2017/05/17 21:17:02
Done (named it "WEBRTC_USE_EPOLL").
| |
842 // Remember currently enabled events so we can combine multiple changes | |
843 // into one update call later. | |
844 PushEnabledEvents(); | |
845 #endif | |
819 // Make sure we deliver connect/accept first. Otherwise, consumers may see | 846 // Make sure we deliver connect/accept first. Otherwise, consumers may see |
820 // something like a READ followed by a CONNECT, which would be odd. | 847 // something like a READ followed by a CONNECT, which would be odd. |
821 if ((ff & DE_CONNECT) != 0) { | 848 if ((ff & DE_CONNECT) != 0) { |
822 enabled_events_ &= ~DE_CONNECT; | 849 DisableEvents(DE_CONNECT); |
823 SignalConnectEvent(this); | 850 SignalConnectEvent(this); |
824 } | 851 } |
825 if ((ff & DE_ACCEPT) != 0) { | 852 if ((ff & DE_ACCEPT) != 0) { |
826 enabled_events_ &= ~DE_ACCEPT; | 853 DisableEvents(DE_ACCEPT); |
827 SignalReadEvent(this); | 854 SignalReadEvent(this); |
828 } | 855 } |
829 if ((ff & DE_READ) != 0) { | 856 if ((ff & DE_READ) != 0) { |
830 enabled_events_ &= ~DE_READ; | 857 DisableEvents(DE_READ); |
831 SignalReadEvent(this); | 858 SignalReadEvent(this); |
832 } | 859 } |
833 if ((ff & DE_WRITE) != 0) { | 860 if ((ff & DE_WRITE) != 0) { |
834 enabled_events_ &= ~DE_WRITE; | 861 DisableEvents(DE_WRITE); |
835 SignalWriteEvent(this); | 862 SignalWriteEvent(this); |
836 } | 863 } |
837 if ((ff & DE_CLOSE) != 0) { | 864 if ((ff & DE_CLOSE) != 0) { |
838 // The socket is now dead to us, so stop checking it. | 865 // The socket is now dead to us, so stop checking it. |
839 enabled_events_ = 0; | 866 SetEnabledEvents(0); |
840 SignalCloseEvent(this, err); | 867 SignalCloseEvent(this, err); |
841 } | 868 } |
869 #if defined(WEBRTC_LINUX) | |
870 PopEnabledEvents(); | |
871 #endif | |
842 } | 872 } |
843 | 873 |
874 #if defined(WEBRTC_LINUX) | |
875 | |
876 void SocketDispatcher::PushEnabledEvents() { | |
877 enabled_events_stack_.push_back(enabled_events_); | |
Taylor Brandstetter
2017/05/17 07:20:16
Will the stack ever have a size > 1? If so, can yo
joachim
2017/05/17 21:17:02
Currently it will only be one element. I'll change
| |
878 } | |
879 | |
880 void SocketDispatcher::PopEnabledEvents() { | |
881 RTC_DCHECK(!enabled_events_stack_.empty()); | |
882 uint8_t old_events = enabled_events_stack_.back(); | |
883 enabled_events_stack_.pop_back(); | |
884 MaybeUpdateDispatcher(old_events); | |
885 } | |
886 | |
887 void SocketDispatcher::MaybeUpdateDispatcher(uint8_t old_events) { | |
888 if (enabled_events_ != old_events && enabled_events_stack_.empty()) { | |
889 ss_->Update(this); | |
890 } | |
891 } | |
892 | |
893 void SocketDispatcher::SetEnabledEvents(uint8_t events) { | |
894 uint8_t old_events = enabled_events_; | |
895 PhysicalSocket::SetEnabledEvents(events); | |
896 MaybeUpdateDispatcher(old_events); | |
897 } | |
898 | |
899 void SocketDispatcher::EnableEvents(uint8_t events) { | |
900 uint8_t old_events = enabled_events_; | |
901 PhysicalSocket::EnableEvents(events); | |
902 MaybeUpdateDispatcher(old_events); | |
903 } | |
904 | |
905 void SocketDispatcher::DisableEvents(uint8_t events) { | |
906 uint8_t old_events = enabled_events_; | |
907 PhysicalSocket::DisableEvents(events); | |
908 MaybeUpdateDispatcher(old_events); | |
909 } | |
910 | |
911 #endif // WEBRTC_LINUX | |
912 | |
844 #endif // WEBRTC_POSIX | 913 #endif // WEBRTC_POSIX |
845 | 914 |
846 int SocketDispatcher::Close() { | 915 int SocketDispatcher::Close() { |
847 if (s_ == INVALID_SOCKET) | 916 if (s_ == INVALID_SOCKET) |
848 return 0; | 917 return 0; |
849 | 918 |
850 #if defined(WEBRTC_WIN) | 919 #if defined(WEBRTC_WIN) |
851 id_ = 0; | 920 id_ = 0; |
852 signal_close_ = false; | 921 signal_close_ = false; |
853 #endif | 922 #endif |
(...skipping 310 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
1164 if (pf_) | 1233 if (pf_) |
1165 *pf_ = false; | 1234 *pf_ = false; |
1166 } | 1235 } |
1167 | 1236 |
1168 private: | 1237 private: |
1169 bool *pf_; | 1238 bool *pf_; |
1170 }; | 1239 }; |
1171 | 1240 |
1172 PhysicalSocketServer::PhysicalSocketServer() | 1241 PhysicalSocketServer::PhysicalSocketServer() |
1173 : fWait_(false) { | 1242 : fWait_(false) { |
1243 #if defined(WEBRTC_POSIX) && defined(WEBRTC_LINUX) | |
1244 // Since Linux 2.6.8, the size argument is ignored, but must be greater than | |
1245 // zero. Before that the size served as hint to the kernel for the amount of | |
1246 // space to initially allocate in internal data structures. | |
1247 epoll_fd_ = epoll_create(FD_SETSIZE); | |
1248 if (epoll_fd_ == -1) { | |
1249 LOG_E(LS_WARNING, EN, errno) << "epoll_create"; | |
1250 epoll_fd_ = INVALID_SOCKET; | |
Taylor Brandstetter
2017/05/17 07:20:16
Could you leave a comment mentioning that the code
joachim
2017/05/17 21:17:02
Done.
| |
1251 } | |
1252 #endif | |
1174 signal_wakeup_ = new Signaler(this, &fWait_); | 1253 signal_wakeup_ = new Signaler(this, &fWait_); |
1175 #if defined(WEBRTC_WIN) | 1254 #if defined(WEBRTC_WIN) |
1176 socket_ev_ = WSACreateEvent(); | 1255 socket_ev_ = WSACreateEvent(); |
1177 #endif | 1256 #endif |
1178 } | 1257 } |
1179 | 1258 |
1180 PhysicalSocketServer::~PhysicalSocketServer() { | 1259 PhysicalSocketServer::~PhysicalSocketServer() { |
1181 #if defined(WEBRTC_WIN) | 1260 #if defined(WEBRTC_WIN) |
1182 WSACloseEvent(socket_ev_); | 1261 WSACloseEvent(socket_ev_); |
1183 #endif | 1262 #endif |
1184 #if defined(WEBRTC_POSIX) | 1263 #if defined(WEBRTC_POSIX) |
1185 signal_dispatcher_.reset(); | 1264 signal_dispatcher_.reset(); |
1186 #endif | 1265 #endif |
1187 delete signal_wakeup_; | 1266 delete signal_wakeup_; |
1267 #if defined(WEBRTC_POSIX) && defined(WEBRTC_LINUX) | |
1268 if (epoll_fd_ != INVALID_SOCKET) { | |
1269 close(epoll_fd_); | |
1270 } | |
1271 #endif | |
1188 RTC_DCHECK(dispatchers_.empty()); | 1272 RTC_DCHECK(dispatchers_.empty()); |
1189 } | 1273 } |
1190 | 1274 |
1191 void PhysicalSocketServer::WakeUp() { | 1275 void PhysicalSocketServer::WakeUp() { |
1192 signal_wakeup_->Signal(); | 1276 signal_wakeup_->Signal(); |
1193 } | 1277 } |
1194 | 1278 |
1195 Socket* PhysicalSocketServer::CreateSocket(int type) { | 1279 Socket* PhysicalSocketServer::CreateSocket(int type) { |
1196 return CreateSocket(AF_INET, type); | 1280 return CreateSocket(AF_INET, type); |
1197 } | 1281 } |
(...skipping 27 matching lines...) Expand all Loading... | |
1225 if (dispatcher->Initialize()) { | 1309 if (dispatcher->Initialize()) { |
1226 return dispatcher; | 1310 return dispatcher; |
1227 } else { | 1311 } else { |
1228 delete dispatcher; | 1312 delete dispatcher; |
1229 return nullptr; | 1313 return nullptr; |
1230 } | 1314 } |
1231 } | 1315 } |
1232 | 1316 |
1233 void PhysicalSocketServer::Add(Dispatcher *pdispatcher) { | 1317 void PhysicalSocketServer::Add(Dispatcher *pdispatcher) { |
1234 CritScope cs(&crit_); | 1318 CritScope cs(&crit_); |
1319 #if defined(WEBRTC_WIN) | |
1235 // Prevent duplicates. This can cause dead dispatchers to stick around. | 1320 // Prevent duplicates. This can cause dead dispatchers to stick around. |
1236 DispatcherList::iterator pos = std::find(dispatchers_.begin(), | 1321 DispatcherList::iterator pos = std::find(dispatchers_.begin(), |
1237 dispatchers_.end(), | 1322 dispatchers_.end(), |
1238 pdispatcher); | 1323 pdispatcher); |
1239 if (pos != dispatchers_.end()) | 1324 if (pos != dispatchers_.end()) |
1240 return; | 1325 return; |
1241 dispatchers_.push_back(pdispatcher); | 1326 dispatchers_.push_back(pdispatcher); |
1327 #else | |
Taylor Brandstetter
2017/05/17 07:20:16
Is there a reason the set can't be used on Windows
joachim
2017/05/17 21:17:02
The Windows code has some special handling to inva
| |
1328 dispatchers_.insert(pdispatcher); | |
1329 #if defined(WEBRTC_POSIX) && defined(WEBRTC_LINUX) | |
1330 if (epoll_fd_ != INVALID_SOCKET) { | |
1331 AddEpoll(pdispatcher); | |
1332 } | |
1333 #endif // WEBRTC_POSIX && WEBRTC_LINUX | |
1334 #endif // WEBRTC_WIN | |
1242 } | 1335 } |
1243 | 1336 |
1244 void PhysicalSocketServer::Remove(Dispatcher *pdispatcher) { | 1337 void PhysicalSocketServer::Remove(Dispatcher *pdispatcher) { |
1245 CritScope cs(&crit_); | 1338 CritScope cs(&crit_); |
1339 #if defined(WEBRTC_WIN) | |
1246 DispatcherList::iterator pos = std::find(dispatchers_.begin(), | 1340 DispatcherList::iterator pos = std::find(dispatchers_.begin(), |
1247 dispatchers_.end(), | 1341 dispatchers_.end(), |
1248 pdispatcher); | 1342 pdispatcher); |
1249 // We silently ignore duplicate calls to Add, so we should silently ignore | 1343 // We silently ignore duplicate calls to Add, so we should silently ignore |
1250 // the (expected) symmetric calls to Remove. Note that this may still hide | 1344 // the (expected) symmetric calls to Remove. Note that this may still hide |
1251 // a real issue, so we at least log a warning about it. | 1345 // a real issue, so we at least log a warning about it. |
1252 if (pos == dispatchers_.end()) { | 1346 if (pos == dispatchers_.end()) { |
1253 LOG(LS_WARNING) << "PhysicalSocketServer asked to remove a unknown " | 1347 LOG(LS_WARNING) << "PhysicalSocketServer asked to remove a unknown " |
1254 << "dispatcher, potentially from a duplicate call to Add."; | 1348 << "dispatcher, potentially from a duplicate call to Add."; |
1255 return; | 1349 return; |
1256 } | 1350 } |
1257 size_t index = pos - dispatchers_.begin(); | 1351 size_t index = pos - dispatchers_.begin(); |
1258 dispatchers_.erase(pos); | 1352 dispatchers_.erase(pos); |
1259 for (IteratorList::iterator it = iterators_.begin(); it != iterators_.end(); | 1353 for (IteratorList::iterator it = iterators_.begin(); it != iterators_.end(); |
1260 ++it) { | 1354 ++it) { |
1261 if (index < **it) { | 1355 if (index < **it) { |
1262 --**it; | 1356 --**it; |
1263 } | 1357 } |
1264 } | 1358 } |
1359 #else | |
1360 if (!dispatchers_.erase(pdispatcher)) { | |
1361 LOG(LS_WARNING) << "PhysicalSocketServer asked to remove a unknown " | |
1362 << "dispatcher, potentially from a duplicate call to Add."; | |
1363 return; | |
1364 } | |
1365 #if defined(WEBRTC_POSIX) && defined(WEBRTC_LINUX) | |
1366 if (epoll_fd_ != INVALID_SOCKET) { | |
1367 RemoveEpoll(pdispatcher); | |
1368 } | |
1369 #endif // WEBRTC_POSIX && WEBRTC_LINUX | |
1370 #endif // WEBRTC_WIN | |
1371 } | |
1372 | |
1373 void PhysicalSocketServer::Update(Dispatcher *pdispatcher) { | |
Taylor Brandstetter
2017/05/17 07:20:16
It seems like the code would be more readable if t
joachim
2017/05/17 21:17:02
The call site for this is the SocketDispatcher. I
Taylor Brandstetter
2017/05/17 23:19:02
I see now; in that case this is fine.
| |
1374 #if defined(WEBRTC_POSIX) && defined(WEBRTC_LINUX) | |
1375 if (epoll_fd_ == INVALID_SOCKET) { | |
1376 return; | |
1377 } | |
1378 | |
1379 CritScope cs(&crit_); | |
1380 if (dispatchers_.find(pdispatcher) == dispatchers_.end()) { | |
1381 return; | |
1382 } | |
1383 | |
1384 UpdateEpoll(pdispatcher); | |
1385 #endif | |
1265 } | 1386 } |
1266 | 1387 |
1267 #if defined(WEBRTC_POSIX) | 1388 #if defined(WEBRTC_POSIX) |
1389 | |
1268 bool PhysicalSocketServer::Wait(int cmsWait, bool process_io) { | 1390 bool PhysicalSocketServer::Wait(int cmsWait, bool process_io) { |
1391 #if defined(WEBRTC_LINUX) | |
1392 // We don't keep a dedicated epoll descriptor containing only non-IO (i.e. | |
1393 // signaling) dispatchers, so the default "select" loop will be used in that | |
1394 // case. | |
1395 if (epoll_fd_ != INVALID_SOCKET && process_io) { | |
1396 return WaitEpoll(cmsWait); | |
1397 } | |
1398 #endif | |
1399 return WaitSelect(cmsWait, process_io); | |
1400 } | |
1401 | |
1402 bool PhysicalSocketServer::WaitSelect(int cmsWait, bool process_io) { | |
1269 // Calculate timing information | 1403 // Calculate timing information |
1270 | 1404 |
1271 struct timeval* ptvWait = nullptr; | 1405 struct timeval* ptvWait = nullptr; |
1272 struct timeval tvWait; | 1406 struct timeval tvWait; |
1273 struct timeval tvStop; | 1407 struct timeval tvStop; |
1274 if (cmsWait != kForever) { | 1408 if (cmsWait != kForever) { |
1275 // Calculate wait timeval | 1409 // Calculate wait timeval |
1276 tvWait.tv_sec = cmsWait / 1000; | 1410 tvWait.tv_sec = cmsWait / 1000; |
1277 tvWait.tv_usec = (cmsWait % 1000) * 1000; | 1411 tvWait.tv_usec = (cmsWait % 1000) * 1000; |
1278 ptvWait = &tvWait; | 1412 ptvWait = &tvWait; |
(...skipping 22 matching lines...) Expand all Loading... | |
1301 __msan_unpoison(&fdsRead, sizeof(fdsRead)); | 1435 __msan_unpoison(&fdsRead, sizeof(fdsRead)); |
1302 __msan_unpoison(&fdsWrite, sizeof(fdsWrite)); | 1436 __msan_unpoison(&fdsWrite, sizeof(fdsWrite)); |
1303 #endif | 1437 #endif |
1304 | 1438 |
1305 fWait_ = true; | 1439 fWait_ = true; |
1306 | 1440 |
1307 while (fWait_) { | 1441 while (fWait_) { |
1308 int fdmax = -1; | 1442 int fdmax = -1; |
1309 { | 1443 { |
1310 CritScope cr(&crit_); | 1444 CritScope cr(&crit_); |
1311 for (size_t i = 0; i < dispatchers_.size(); ++i) { | 1445 for (Dispatcher* pdispatcher : dispatchers_) { |
1312 // Query dispatchers for read and write wait state | 1446 // Query dispatchers for read and write wait state |
1313 Dispatcher *pdispatcher = dispatchers_[i]; | |
1314 RTC_DCHECK(pdispatcher); | 1447 RTC_DCHECK(pdispatcher); |
1315 if (!process_io && (pdispatcher != signal_wakeup_)) | 1448 if (!process_io && (pdispatcher != signal_wakeup_)) |
1316 continue; | 1449 continue; |
1317 int fd = pdispatcher->GetDescriptor(); | 1450 int fd = pdispatcher->GetDescriptor(); |
1451 // "select"ing a file descriptor that is equal to or larger than | |
1452 // FD_SETSIZE will result in undefined behavior. | |
1453 RTC_DCHECK_LT(fd, FD_SETSIZE); | |
1318 if (fd > fdmax) | 1454 if (fd > fdmax) |
1319 fdmax = fd; | 1455 fdmax = fd; |
1320 | 1456 |
1321 uint32_t ff = pdispatcher->GetRequestedEvents(); | 1457 uint32_t ff = pdispatcher->GetRequestedEvents(); |
1322 if (ff & (DE_READ | DE_ACCEPT)) | 1458 if (ff & (DE_READ | DE_ACCEPT)) |
1323 FD_SET(fd, &fdsRead); | 1459 FD_SET(fd, &fdsRead); |
1324 if (ff & (DE_WRITE | DE_CONNECT)) | 1460 if (ff & (DE_WRITE | DE_CONNECT)) |
1325 FD_SET(fd, &fdsWrite); | 1461 FD_SET(fd, &fdsWrite); |
1326 } | 1462 } |
1327 } | 1463 } |
(...skipping 13 matching lines...) Expand all Loading... | |
1341 // Else ignore the error and keep going. If this EINTR was for one of the | 1477 // Else ignore the error and keep going. If this EINTR was for one of the |
1342 // signals managed by this PhysicalSocketServer, the | 1478 // signals managed by this PhysicalSocketServer, the |
1343 // PosixSignalDeliveryDispatcher will be in the signaled state in the next | 1479 // PosixSignalDeliveryDispatcher will be in the signaled state in the next |
1344 // iteration. | 1480 // iteration. |
1345 } else if (n == 0) { | 1481 } else if (n == 0) { |
1346 // If timeout, return success | 1482 // If timeout, return success |
1347 return true; | 1483 return true; |
1348 } else { | 1484 } else { |
1349 // We have signaled descriptors | 1485 // We have signaled descriptors |
1350 CritScope cr(&crit_); | 1486 CritScope cr(&crit_); |
1351 for (size_t i = 0; i < dispatchers_.size(); ++i) { | 1487 for (Dispatcher *pdispatcher : dispatchers_) { |
1352 Dispatcher *pdispatcher = dispatchers_[i]; | |
1353 int fd = pdispatcher->GetDescriptor(); | 1488 int fd = pdispatcher->GetDescriptor(); |
1354 uint32_t ff = 0; | 1489 uint32_t ff = 0; |
1355 int errcode = 0; | 1490 int errcode = 0; |
1356 | 1491 |
1357 // Reap any error code, which can be signaled through reads or writes. | 1492 // Reap any error code, which can be signaled through reads or writes. |
1358 // TODO(pthatcher): Should we set errcode if getsockopt fails? | 1493 // TODO(pthatcher): Should we set errcode if getsockopt fails? |
1359 if (FD_ISSET(fd, &fdsRead) || FD_ISSET(fd, &fdsWrite)) { | 1494 if (FD_ISSET(fd, &fdsRead) || FD_ISSET(fd, &fdsWrite)) { |
1360 socklen_t len = sizeof(errcode); | 1495 socklen_t len = sizeof(errcode); |
1361 ::getsockopt(fd, SOL_SOCKET, SO_ERROR, &errcode, &len); | 1496 ::getsockopt(fd, SOL_SOCKET, SO_ERROR, &errcode, &len); |
1362 } | 1497 } |
(...skipping 53 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
1416 ptvWait->tv_usec += 1000000; | 1551 ptvWait->tv_usec += 1000000; |
1417 ptvWait->tv_sec -= 1; | 1552 ptvWait->tv_sec -= 1; |
1418 } | 1553 } |
1419 } | 1554 } |
1420 } | 1555 } |
1421 } | 1556 } |
1422 | 1557 |
1423 return true; | 1558 return true; |
1424 } | 1559 } |
1425 | 1560 |
1561 #if defined(WEBRTC_LINUX) | |
1562 | |
1563 // Initial number of events to process with one call to "epoll_wait". | |
1564 static const size_t kInitialEpollEvents = 128; | |
1565 | |
1566 // Maximum number of events to process with one call to "epoll_wait". | |
1567 static const size_t kMaxEpollEvents = 8192; | |
1568 | |
1569 static int GetEpollEvents(Dispatcher* pdispatcher) { | |
1570 int events = 0; | |
1571 uint32_t ff = pdispatcher->GetRequestedEvents(); | |
1572 if (ff & (DE_READ | DE_ACCEPT)) { | |
1573 events |= EPOLLIN; | |
1574 } | |
1575 if (ff & (DE_WRITE | DE_CONNECT)) { | |
1576 events |= EPOLLOUT; | |
1577 } | |
1578 return events; | |
1579 } | |
1580 | |
1581 void PhysicalSocketServer::AddEpoll(Dispatcher* pdispatcher) { | |
1582 RTC_DCHECK(epoll_fd_ != INVALID_SOCKET); | |
1583 int fd = pdispatcher->GetDescriptor(); | |
1584 RTC_DCHECK(fd != INVALID_SOCKET); | |
1585 if (fd == INVALID_SOCKET) { | |
1586 return; | |
1587 } | |
1588 | |
1589 struct epoll_event event = {0}; | |
1590 event.events = GetEpollEvents(pdispatcher); | |
1591 event.data.ptr = pdispatcher; | |
1592 int err = epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, fd, &event); | |
1593 RTC_DCHECK_EQ(err, 0); | |
1594 if (err == -1) { | |
1595 LOG_E(LS_ERROR, EN, errno) << "epoll_ctl EPOLL_CTL_ADD"; | |
1596 } | |
1597 } | |
1598 | |
1599 void PhysicalSocketServer::RemoveEpoll(Dispatcher* pdispatcher) { | |
1600 RTC_DCHECK(epoll_fd_ != INVALID_SOCKET); | |
1601 int fd = pdispatcher->GetDescriptor(); | |
1602 RTC_DCHECK(fd != INVALID_SOCKET); | |
1603 if (fd == INVALID_SOCKET) { | |
1604 return; | |
1605 } | |
1606 | |
1607 struct epoll_event event = {0}; | |
1608 int err = epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, fd, &event); | |
1609 RTC_DCHECK(err == 0 || errno == ENOENT); | |
1610 if (err == -1) { | |
1611 if (errno == ENOENT) { | |
1612 // Socket has already been closed. | |
1613 LOG_E(LS_VERBOSE, EN, errno) << "epoll_ctl EPOLL_CTL_DEL"; | |
1614 } else { | |
1615 LOG_E(LS_ERROR, EN, errno) << "epoll_ctl EPOLL_CTL_DEL"; | |
1616 } | |
1617 } | |
1618 } | |
1619 | |
1620 void PhysicalSocketServer::UpdateEpoll(Dispatcher* pdispatcher) { | |
1621 RTC_DCHECK(epoll_fd_ != INVALID_SOCKET); | |
1622 int fd = pdispatcher->GetDescriptor(); | |
1623 RTC_DCHECK(fd != INVALID_SOCKET); | |
1624 if (fd == INVALID_SOCKET) { | |
1625 return; | |
1626 } | |
1627 | |
1628 struct epoll_event event = {0}; | |
1629 event.events = GetEpollEvents(pdispatcher); | |
1630 event.data.ptr = pdispatcher; | |
1631 int err = epoll_ctl(epoll_fd_, EPOLL_CTL_MOD, fd, &event); | |
1632 RTC_DCHECK_EQ(err, 0); | |
1633 if (err == -1) { | |
1634 LOG_E(LS_ERROR, EN, errno) << "epoll_ctl EPOLL_CTL_MOD"; | |
1635 } | |
1636 } | |
1637 | |
1638 bool PhysicalSocketServer::WaitEpoll(int cmsWait) { | |
1639 RTC_DCHECK(epoll_fd_ != INVALID_SOCKET); | |
1640 int64_t tvWait = -1; | |
1641 int64_t tvStop = -1; | |
1642 if (cmsWait != kForever) { | |
1643 tvWait = cmsWait; | |
1644 tvStop = TimeAfter(cmsWait); | |
1645 } | |
1646 | |
1647 if (epoll_events_.empty()) { | |
1648 // The initial space to receive events is created only if epoll is used. | |
1649 epoll_events_.resize(kInitialEpollEvents); | |
1650 } | |
1651 | |
1652 fWait_ = true; | |
1653 | |
1654 while (fWait_) { | |
1655 // Wait then call handlers as appropriate | |
1656 // < 0 means error | |
1657 // 0 means timeout | |
1658 // > 0 means count of descriptors ready | |
1659 int n = epoll_wait(epoll_fd_, &epoll_events_[0], | |
1660 static_cast<int>(epoll_events_.size()), static_cast<int>(tvWait)); | |
1661 if (n < 0) { | |
1662 if (errno != EINTR) { | |
1663 LOG_E(LS_ERROR, EN, errno) << "epoll"; | |
1664 return false; | |
1665 } | |
1666 // Else ignore the error and keep going. If this EINTR was for one of the | |
1667 // signals managed by this PhysicalSocketServer, the | |
1668 // PosixSignalDeliveryDispatcher will be in the signaled state in the next | |
1669 // iteration. | |
1670 } else if (n == 0) { | |
1671 // If timeout, return success | |
1672 return true; | |
1673 } else { | |
1674 // We have signaled descriptors | |
1675 CritScope cr(&crit_); | |
1676 for (int i = 0; i < n; ++i) { | |
1677 const epoll_event& event = epoll_events_[i]; | |
1678 Dispatcher* pdispatcher = static_cast<Dispatcher*>(event.data.ptr); | |
1679 if (dispatchers_.find(pdispatcher) == dispatchers_.end()) { | |
1680 // The dispatcher for this socket no longer exists. | |
1681 continue; | |
1682 } | |
1683 | |
1684 int fd = pdispatcher->GetDescriptor(); | |
1685 uint32_t ff = 0; | |
1686 int errcode = 0; | |
1687 | |
1688 // Reap any error code. | |
1689 // TODO(pthatcher): Should we set errcode if getsockopt fails? | |
1690 if (event.events & (EPOLLRDHUP | EPOLLERR | EPOLLHUP)) { | |
1691 socklen_t len = sizeof(errcode); | |
1692 ::getsockopt(fd, SOL_SOCKET, SO_ERROR, &errcode, &len); | |
1693 } | |
1694 | |
1695 // Check readable descriptors. If we're waiting on an accept, signal | |
1696 // that. Otherwise we're waiting for data, check to see if we're | |
1697 // readable or really closed. | |
1698 // TODO(pthatcher): Only peek at TCP descriptors. | |
1699 if ((event.events & (EPOLLIN | EPOLLPRI))) { | |
1700 if (pdispatcher->GetRequestedEvents() & DE_ACCEPT) { | |
1701 ff |= DE_ACCEPT; | |
1702 } else if (errcode || pdispatcher->IsDescriptorClosed()) { | |
1703 ff |= DE_CLOSE; | |
1704 } else { | |
1705 ff |= DE_READ; | |
1706 } | |
1707 } | |
1708 | |
1709 // Check writable descriptors. If we're waiting on a connect, detect | |
1710 // success versus failure by the reaped error code. | |
1711 if ((event.events & EPOLLOUT)) { | |
1712 if (pdispatcher->GetRequestedEvents() & DE_CONNECT) { | |
1713 if (!errcode) { | |
1714 ff |= DE_CONNECT; | |
1715 } else { | |
1716 ff |= DE_CLOSE; | |
1717 } | |
1718 } else { | |
1719 ff |= DE_WRITE; | |
1720 } | |
1721 } | |
1722 | |
1723 // Tell the descriptor about the event. | |
1724 if (ff != 0) { | |
1725 pdispatcher->OnPreEvent(ff); | |
1726 pdispatcher->OnEvent(ff, errcode); | |
1727 } | |
1728 } | |
1729 } | |
1730 | |
1731 if (static_cast<size_t>(n) == epoll_events_.size() && | |
1732 epoll_events_.size() < kMaxEpollEvents) { | |
1733 // We used the complete space to receive events, increase size for future | |
1734 // iterations. | |
1735 epoll_events_.resize(std::max(epoll_events_.size() * 2, kMaxEpollEvents)); | |
1736 } | |
1737 | |
1738 if (cmsWait != kForever) { | |
1739 tvWait = TimeDiff(tvStop, TimeMillis()); | |
1740 if (tvWait < 0) { | |
1741 // Return success on timeout. | |
1742 return true; | |
1743 } | |
1744 } | |
1745 } | |
1746 | |
1747 return true; | |
1748 } | |
1749 | |
1750 #endif // WEBRTC_LINUX | |
1751 | |
1426 static void GlobalSignalHandler(int signum) { | 1752 static void GlobalSignalHandler(int signum) { |
1427 PosixSignalHandler::Instance()->OnPosixSignalReceived(signum); | 1753 PosixSignalHandler::Instance()->OnPosixSignalReceived(signum); |
1428 } | 1754 } |
1429 | 1755 |
1430 bool PhysicalSocketServer::SetPosixSignalHandler(int signum, | 1756 bool PhysicalSocketServer::SetPosixSignalHandler(int signum, |
1431 void (*handler)(int)) { | 1757 void (*handler)(int)) { |
1432 // If handler is SIG_IGN or SIG_DFL then clear our user-level handler, | 1758 // If handler is SIG_IGN or SIG_DFL then clear our user-level handler, |
1433 // otherwise set one. | 1759 // otherwise set one. |
1434 if (handler == SIG_IGN || handler == SIG_DFL) { | 1760 if (handler == SIG_IGN || handler == SIG_DFL) { |
1435 if (!InstallSignal(signum, handler)) { | 1761 if (!InstallSignal(signum, handler)) { |
(...skipping 201 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
1637 break; | 1963 break; |
1638 } | 1964 } |
1639 } | 1965 } |
1640 | 1966 |
1641 // Done | 1967 // Done |
1642 return true; | 1968 return true; |
1643 } | 1969 } |
1644 #endif // WEBRTC_WIN | 1970 #endif // WEBRTC_WIN |
1645 | 1971 |
1646 } // namespace rtc | 1972 } // namespace rtc |
OLD | NEW |