OLD | NEW |
| (Empty) |
1 /* | |
2 * Copyright 2004 The WebRTC Project Authors. All rights reserved. | |
3 * | |
4 * Use of this source code is governed by a BSD-style license | |
5 * that can be found in the LICENSE file in the root of the source | |
6 * tree. An additional intellectual property rights grant can be found | |
7 * in the file PATENTS. All contributing project authors may | |
8 * be found in the AUTHORS file in the root of the source tree. | |
9 */ | |
10 #include "webrtc/base/physicalsocketserver.h" | |
11 | |
12 #if defined(_MSC_VER) && _MSC_VER < 1300 | |
13 #pragma warning(disable:4786) | |
14 #endif | |
15 | |
16 #ifdef MEMORY_SANITIZER | |
17 #include <sanitizer/msan_interface.h> | |
18 #endif | |
19 | |
20 #if defined(WEBRTC_POSIX) | |
21 #include <string.h> | |
22 #include <errno.h> | |
23 #include <fcntl.h> | |
24 #if defined(WEBRTC_USE_EPOLL) | |
25 // "poll" will be used to wait for the signal dispatcher. | |
26 #include <poll.h> | |
27 #endif | |
28 #include <sys/ioctl.h> | |
29 #include <sys/time.h> | |
30 #include <sys/select.h> | |
31 #include <unistd.h> | |
32 #include <signal.h> | |
33 #endif | |
34 | |
35 #if defined(WEBRTC_WIN) | |
36 #define WIN32_LEAN_AND_MEAN | |
37 #include <windows.h> | |
38 #include <winsock2.h> | |
39 #include <ws2tcpip.h> | |
40 #undef SetPort | |
41 #endif | |
42 | |
43 #include <algorithm> | |
44 #include <map> | |
45 | |
46 #include "webrtc/base/arraysize.h" | |
47 #include "webrtc/base/basictypes.h" | |
48 #include "webrtc/base/byteorder.h" | |
49 #include "webrtc/base/checks.h" | |
50 #include "webrtc/base/logging.h" | |
51 #include "webrtc/base/networkmonitor.h" | |
52 #include "webrtc/base/nullsocketserver.h" | |
53 #include "webrtc/base/timeutils.h" | |
54 #include "webrtc/base/win32socketinit.h" | |
55 | |
56 #if defined(WEBRTC_POSIX) | |
57 #include <netinet/tcp.h> // for TCP_NODELAY | |
58 #define IP_MTU 14 // Until this is integrated from linux/in.h to netinet/in.h | |
59 typedef void* SockOptArg; | |
60 | |
61 #endif // WEBRTC_POSIX | |
62 | |
63 #if defined(WEBRTC_POSIX) && !defined(WEBRTC_MAC) && !defined(__native_client__) | |
64 | |
65 int64_t GetSocketRecvTimestamp(int socket) { | |
66 struct timeval tv_ioctl; | |
67 int ret = ioctl(socket, SIOCGSTAMP, &tv_ioctl); | |
68 if (ret != 0) | |
69 return -1; | |
70 int64_t timestamp = | |
71 rtc::kNumMicrosecsPerSec * static_cast<int64_t>(tv_ioctl.tv_sec) + | |
72 static_cast<int64_t>(tv_ioctl.tv_usec); | |
73 return timestamp; | |
74 } | |
75 | |
76 #else | |
77 | |
78 int64_t GetSocketRecvTimestamp(int socket) { | |
79 return -1; | |
80 } | |
81 #endif | |
82 | |
83 #if defined(WEBRTC_WIN) | |
84 typedef char* SockOptArg; | |
85 #endif | |
86 | |
87 #if defined(WEBRTC_USE_EPOLL) | |
88 // POLLRDHUP / EPOLLRDHUP are only defined starting with Linux 2.6.17. | |
89 #if !defined(POLLRDHUP) | |
90 #define POLLRDHUP 0x2000 | |
91 #endif | |
92 #if !defined(EPOLLRDHUP) | |
93 #define EPOLLRDHUP 0x2000 | |
94 #endif | |
95 #endif | |
96 | |
97 namespace rtc { | |
98 | |
99 std::unique_ptr<SocketServer> SocketServer::CreateDefault() { | |
100 #if defined(__native_client__) | |
101 return std::unique_ptr<SocketServer>(new rtc::NullSocketServer); | |
102 #else | |
103 return std::unique_ptr<SocketServer>(new rtc::PhysicalSocketServer); | |
104 #endif | |
105 } | |
106 | |
107 #if defined(WEBRTC_WIN) | |
108 // Standard MTUs, from RFC 1191 | |
109 const uint16_t PACKET_MAXIMUMS[] = { | |
110 65535, // Theoretical maximum, Hyperchannel | |
111 32000, // Nothing | |
112 17914, // 16Mb IBM Token Ring | |
113 8166, // IEEE 802.4 | |
114 // 4464, // IEEE 802.5 (4Mb max) | |
115 4352, // FDDI | |
116 // 2048, // Wideband Network | |
117 2002, // IEEE 802.5 (4Mb recommended) | |
118 // 1536, // Expermental Ethernet Networks | |
119 // 1500, // Ethernet, Point-to-Point (default) | |
120 1492, // IEEE 802.3 | |
121 1006, // SLIP, ARPANET | |
122 // 576, // X.25 Networks | |
123 // 544, // DEC IP Portal | |
124 // 512, // NETBIOS | |
125 508, // IEEE 802/Source-Rt Bridge, ARCNET | |
126 296, // Point-to-Point (low delay) | |
127 68, // Official minimum | |
128 0, // End of list marker | |
129 }; | |
130 | |
131 static const int IP_HEADER_SIZE = 20u; | |
132 static const int IPV6_HEADER_SIZE = 40u; | |
133 static const int ICMP_HEADER_SIZE = 8u; | |
134 static const int ICMP_PING_TIMEOUT_MILLIS = 10000u; | |
135 #endif | |
136 | |
137 PhysicalSocket::PhysicalSocket(PhysicalSocketServer* ss, SOCKET s) | |
138 : ss_(ss), s_(s), error_(0), | |
139 state_((s == INVALID_SOCKET) ? CS_CLOSED : CS_CONNECTED), | |
140 resolver_(nullptr) { | |
141 #if defined(WEBRTC_WIN) | |
142 // EnsureWinsockInit() ensures that winsock is initialized. The default | |
143 // version of this function doesn't do anything because winsock is | |
144 // initialized by constructor of a static object. If neccessary libjingle | |
145 // users can link it with a different version of this function by replacing | |
146 // win32socketinit.cc. See win32socketinit.cc for more details. | |
147 EnsureWinsockInit(); | |
148 #endif | |
149 if (s_ != INVALID_SOCKET) { | |
150 SetEnabledEvents(DE_READ | DE_WRITE); | |
151 | |
152 int type = SOCK_STREAM; | |
153 socklen_t len = sizeof(type); | |
154 const int res = | |
155 getsockopt(s_, SOL_SOCKET, SO_TYPE, (SockOptArg)&type, &len); | |
156 RTC_DCHECK_EQ(0, res); | |
157 udp_ = (SOCK_DGRAM == type); | |
158 } | |
159 } | |
160 | |
161 PhysicalSocket::~PhysicalSocket() { | |
162 Close(); | |
163 } | |
164 | |
165 bool PhysicalSocket::Create(int family, int type) { | |
166 Close(); | |
167 s_ = ::socket(family, type, 0); | |
168 udp_ = (SOCK_DGRAM == type); | |
169 UpdateLastError(); | |
170 if (udp_) { | |
171 SetEnabledEvents(DE_READ | DE_WRITE); | |
172 } | |
173 return s_ != INVALID_SOCKET; | |
174 } | |
175 | |
176 SocketAddress PhysicalSocket::GetLocalAddress() const { | |
177 sockaddr_storage addr_storage = {0}; | |
178 socklen_t addrlen = sizeof(addr_storage); | |
179 sockaddr* addr = reinterpret_cast<sockaddr*>(&addr_storage); | |
180 int result = ::getsockname(s_, addr, &addrlen); | |
181 SocketAddress address; | |
182 if (result >= 0) { | |
183 SocketAddressFromSockAddrStorage(addr_storage, &address); | |
184 } else { | |
185 LOG(LS_WARNING) << "GetLocalAddress: unable to get local addr, socket=" | |
186 << s_; | |
187 } | |
188 return address; | |
189 } | |
190 | |
191 SocketAddress PhysicalSocket::GetRemoteAddress() const { | |
192 sockaddr_storage addr_storage = {0}; | |
193 socklen_t addrlen = sizeof(addr_storage); | |
194 sockaddr* addr = reinterpret_cast<sockaddr*>(&addr_storage); | |
195 int result = ::getpeername(s_, addr, &addrlen); | |
196 SocketAddress address; | |
197 if (result >= 0) { | |
198 SocketAddressFromSockAddrStorage(addr_storage, &address); | |
199 } else { | |
200 LOG(LS_WARNING) << "GetRemoteAddress: unable to get remote addr, socket=" | |
201 << s_; | |
202 } | |
203 return address; | |
204 } | |
205 | |
206 int PhysicalSocket::Bind(const SocketAddress& bind_addr) { | |
207 SocketAddress copied_bind_addr = bind_addr; | |
208 // If a network binder is available, use it to bind a socket to an interface | |
209 // instead of bind(), since this is more reliable on an OS with a weak host | |
210 // model. | |
211 if (ss_->network_binder() && !bind_addr.IsAnyIP()) { | |
212 NetworkBindingResult result = | |
213 ss_->network_binder()->BindSocketToNetwork(s_, bind_addr.ipaddr()); | |
214 if (result == NetworkBindingResult::SUCCESS) { | |
215 // Since the network binder handled binding the socket to the desired | |
216 // network interface, we don't need to (and shouldn't) include an IP in | |
217 // the bind() call; bind() just needs to assign a port. | |
218 copied_bind_addr.SetIP(GetAnyIP(copied_bind_addr.ipaddr().family())); | |
219 } else if (result == NetworkBindingResult::NOT_IMPLEMENTED) { | |
220 LOG(LS_INFO) << "Can't bind socket to network because " | |
221 "network binding is not implemented for this OS."; | |
222 } else { | |
223 if (bind_addr.IsLoopbackIP()) { | |
224 // If we couldn't bind to a loopback IP (which should only happen in | |
225 // test scenarios), continue on. This may be expected behavior. | |
226 LOG(LS_VERBOSE) << "Binding socket to loopback address " | |
227 << bind_addr.ipaddr().ToString() | |
228 << " failed; result: " << static_cast<int>(result); | |
229 } else { | |
230 LOG(LS_WARNING) << "Binding socket to network address " | |
231 << bind_addr.ipaddr().ToString() | |
232 << " failed; result: " << static_cast<int>(result); | |
233 // If a network binding was attempted and failed, we should stop here | |
234 // and not try to use the socket. Otherwise, we may end up sending | |
235 // packets with an invalid source address. | |
236 // See: https://bugs.chromium.org/p/webrtc/issues/detail?id=7026 | |
237 return -1; | |
238 } | |
239 } | |
240 } | |
241 sockaddr_storage addr_storage; | |
242 size_t len = copied_bind_addr.ToSockAddrStorage(&addr_storage); | |
243 sockaddr* addr = reinterpret_cast<sockaddr*>(&addr_storage); | |
244 int err = ::bind(s_, addr, static_cast<int>(len)); | |
245 UpdateLastError(); | |
246 #if !defined(NDEBUG) | |
247 if (0 == err) { | |
248 dbg_addr_ = "Bound @ "; | |
249 dbg_addr_.append(GetLocalAddress().ToString()); | |
250 } | |
251 #endif | |
252 return err; | |
253 } | |
254 | |
255 int PhysicalSocket::Connect(const SocketAddress& addr) { | |
256 // TODO(pthatcher): Implicit creation is required to reconnect... | |
257 // ...but should we make it more explicit? | |
258 if (state_ != CS_CLOSED) { | |
259 SetError(EALREADY); | |
260 return SOCKET_ERROR; | |
261 } | |
262 if (addr.IsUnresolvedIP()) { | |
263 LOG(LS_VERBOSE) << "Resolving addr in PhysicalSocket::Connect"; | |
264 resolver_ = new AsyncResolver(); | |
265 resolver_->SignalDone.connect(this, &PhysicalSocket::OnResolveResult); | |
266 resolver_->Start(addr); | |
267 state_ = CS_CONNECTING; | |
268 return 0; | |
269 } | |
270 | |
271 return DoConnect(addr); | |
272 } | |
273 | |
274 int PhysicalSocket::DoConnect(const SocketAddress& connect_addr) { | |
275 if ((s_ == INVALID_SOCKET) && | |
276 !Create(connect_addr.family(), SOCK_STREAM)) { | |
277 return SOCKET_ERROR; | |
278 } | |
279 sockaddr_storage addr_storage; | |
280 size_t len = connect_addr.ToSockAddrStorage(&addr_storage); | |
281 sockaddr* addr = reinterpret_cast<sockaddr*>(&addr_storage); | |
282 int err = ::connect(s_, addr, static_cast<int>(len)); | |
283 UpdateLastError(); | |
284 uint8_t events = DE_READ | DE_WRITE; | |
285 if (err == 0) { | |
286 state_ = CS_CONNECTED; | |
287 } else if (IsBlockingError(GetError())) { | |
288 state_ = CS_CONNECTING; | |
289 events |= DE_CONNECT; | |
290 } else { | |
291 return SOCKET_ERROR; | |
292 } | |
293 | |
294 EnableEvents(events); | |
295 return 0; | |
296 } | |
297 | |
298 int PhysicalSocket::GetError() const { | |
299 CritScope cs(&crit_); | |
300 return error_; | |
301 } | |
302 | |
303 void PhysicalSocket::SetError(int error) { | |
304 CritScope cs(&crit_); | |
305 error_ = error; | |
306 } | |
307 | |
308 AsyncSocket::ConnState PhysicalSocket::GetState() const { | |
309 return state_; | |
310 } | |
311 | |
312 int PhysicalSocket::GetOption(Option opt, int* value) { | |
313 int slevel; | |
314 int sopt; | |
315 if (TranslateOption(opt, &slevel, &sopt) == -1) | |
316 return -1; | |
317 socklen_t optlen = sizeof(*value); | |
318 int ret = ::getsockopt(s_, slevel, sopt, (SockOptArg)value, &optlen); | |
319 if (ret != -1 && opt == OPT_DONTFRAGMENT) { | |
320 #if defined(WEBRTC_LINUX) && !defined(WEBRTC_ANDROID) | |
321 *value = (*value != IP_PMTUDISC_DONT) ? 1 : 0; | |
322 #endif | |
323 } | |
324 return ret; | |
325 } | |
326 | |
327 int PhysicalSocket::SetOption(Option opt, int value) { | |
328 int slevel; | |
329 int sopt; | |
330 if (TranslateOption(opt, &slevel, &sopt) == -1) | |
331 return -1; | |
332 if (opt == OPT_DONTFRAGMENT) { | |
333 #if defined(WEBRTC_LINUX) && !defined(WEBRTC_ANDROID) | |
334 value = (value) ? IP_PMTUDISC_DO : IP_PMTUDISC_DONT; | |
335 #endif | |
336 } | |
337 return ::setsockopt(s_, slevel, sopt, (SockOptArg)&value, sizeof(value)); | |
338 } | |
339 | |
340 int PhysicalSocket::Send(const void* pv, size_t cb) { | |
341 int sent = DoSend(s_, reinterpret_cast<const char *>(pv), | |
342 static_cast<int>(cb), | |
343 #if defined(WEBRTC_LINUX) && !defined(WEBRTC_ANDROID) | |
344 // Suppress SIGPIPE. Without this, attempting to send on a socket whose | |
345 // other end is closed will result in a SIGPIPE signal being raised to | |
346 // our process, which by default will terminate the process, which we | |
347 // don't want. By specifying this flag, we'll just get the error EPIPE | |
348 // instead and can handle the error gracefully. | |
349 MSG_NOSIGNAL | |
350 #else | |
351 0 | |
352 #endif | |
353 ); | |
354 UpdateLastError(); | |
355 MaybeRemapSendError(); | |
356 // We have seen minidumps where this may be false. | |
357 RTC_DCHECK(sent <= static_cast<int>(cb)); | |
358 if ((sent > 0 && sent < static_cast<int>(cb)) || | |
359 (sent < 0 && IsBlockingError(GetError()))) { | |
360 EnableEvents(DE_WRITE); | |
361 } | |
362 return sent; | |
363 } | |
364 | |
365 int PhysicalSocket::SendTo(const void* buffer, | |
366 size_t length, | |
367 const SocketAddress& addr) { | |
368 sockaddr_storage saddr; | |
369 size_t len = addr.ToSockAddrStorage(&saddr); | |
370 int sent = DoSendTo( | |
371 s_, static_cast<const char *>(buffer), static_cast<int>(length), | |
372 #if defined(WEBRTC_LINUX) && !defined(WEBRTC_ANDROID) | |
373 // Suppress SIGPIPE. See above for explanation. | |
374 MSG_NOSIGNAL, | |
375 #else | |
376 0, | |
377 #endif | |
378 reinterpret_cast<sockaddr*>(&saddr), static_cast<int>(len)); | |
379 UpdateLastError(); | |
380 MaybeRemapSendError(); | |
381 // We have seen minidumps where this may be false. | |
382 RTC_DCHECK(sent <= static_cast<int>(length)); | |
383 if ((sent > 0 && sent < static_cast<int>(length)) || | |
384 (sent < 0 && IsBlockingError(GetError()))) { | |
385 EnableEvents(DE_WRITE); | |
386 } | |
387 return sent; | |
388 } | |
389 | |
390 int PhysicalSocket::Recv(void* buffer, size_t length, int64_t* timestamp) { | |
391 int received = ::recv(s_, static_cast<char*>(buffer), | |
392 static_cast<int>(length), 0); | |
393 if ((received == 0) && (length != 0)) { | |
394 // Note: on graceful shutdown, recv can return 0. In this case, we | |
395 // pretend it is blocking, and then signal close, so that simplifying | |
396 // assumptions can be made about Recv. | |
397 LOG(LS_WARNING) << "EOF from socket; deferring close event"; | |
398 // Must turn this back on so that the select() loop will notice the close | |
399 // event. | |
400 EnableEvents(DE_READ); | |
401 SetError(EWOULDBLOCK); | |
402 return SOCKET_ERROR; | |
403 } | |
404 if (timestamp) { | |
405 *timestamp = GetSocketRecvTimestamp(s_); | |
406 } | |
407 UpdateLastError(); | |
408 int error = GetError(); | |
409 bool success = (received >= 0) || IsBlockingError(error); | |
410 if (udp_ || success) { | |
411 EnableEvents(DE_READ); | |
412 } | |
413 if (!success) { | |
414 LOG_F(LS_VERBOSE) << "Error = " << error; | |
415 } | |
416 return received; | |
417 } | |
418 | |
419 int PhysicalSocket::RecvFrom(void* buffer, | |
420 size_t length, | |
421 SocketAddress* out_addr, | |
422 int64_t* timestamp) { | |
423 sockaddr_storage addr_storage; | |
424 socklen_t addr_len = sizeof(addr_storage); | |
425 sockaddr* addr = reinterpret_cast<sockaddr*>(&addr_storage); | |
426 int received = ::recvfrom(s_, static_cast<char*>(buffer), | |
427 static_cast<int>(length), 0, addr, &addr_len); | |
428 if (timestamp) { | |
429 *timestamp = GetSocketRecvTimestamp(s_); | |
430 } | |
431 UpdateLastError(); | |
432 if ((received >= 0) && (out_addr != nullptr)) | |
433 SocketAddressFromSockAddrStorage(addr_storage, out_addr); | |
434 int error = GetError(); | |
435 bool success = (received >= 0) || IsBlockingError(error); | |
436 if (udp_ || success) { | |
437 EnableEvents(DE_READ); | |
438 } | |
439 if (!success) { | |
440 LOG_F(LS_VERBOSE) << "Error = " << error; | |
441 } | |
442 return received; | |
443 } | |
444 | |
445 int PhysicalSocket::Listen(int backlog) { | |
446 int err = ::listen(s_, backlog); | |
447 UpdateLastError(); | |
448 if (err == 0) { | |
449 state_ = CS_CONNECTING; | |
450 EnableEvents(DE_ACCEPT); | |
451 #if !defined(NDEBUG) | |
452 dbg_addr_ = "Listening @ "; | |
453 dbg_addr_.append(GetLocalAddress().ToString()); | |
454 #endif | |
455 } | |
456 return err; | |
457 } | |
458 | |
459 AsyncSocket* PhysicalSocket::Accept(SocketAddress* out_addr) { | |
460 // Always re-subscribe DE_ACCEPT to make sure new incoming connections will | |
461 // trigger an event even if DoAccept returns an error here. | |
462 EnableEvents(DE_ACCEPT); | |
463 sockaddr_storage addr_storage; | |
464 socklen_t addr_len = sizeof(addr_storage); | |
465 sockaddr* addr = reinterpret_cast<sockaddr*>(&addr_storage); | |
466 SOCKET s = DoAccept(s_, addr, &addr_len); | |
467 UpdateLastError(); | |
468 if (s == INVALID_SOCKET) | |
469 return nullptr; | |
470 if (out_addr != nullptr) | |
471 SocketAddressFromSockAddrStorage(addr_storage, out_addr); | |
472 return ss_->WrapSocket(s); | |
473 } | |
474 | |
475 int PhysicalSocket::Close() { | |
476 if (s_ == INVALID_SOCKET) | |
477 return 0; | |
478 int err = ::closesocket(s_); | |
479 UpdateLastError(); | |
480 s_ = INVALID_SOCKET; | |
481 state_ = CS_CLOSED; | |
482 SetEnabledEvents(0); | |
483 if (resolver_) { | |
484 resolver_->Destroy(false); | |
485 resolver_ = nullptr; | |
486 } | |
487 return err; | |
488 } | |
489 | |
490 SOCKET PhysicalSocket::DoAccept(SOCKET socket, | |
491 sockaddr* addr, | |
492 socklen_t* addrlen) { | |
493 return ::accept(socket, addr, addrlen); | |
494 } | |
495 | |
496 int PhysicalSocket::DoSend(SOCKET socket, const char* buf, int len, int flags) { | |
497 return ::send(socket, buf, len, flags); | |
498 } | |
499 | |
500 int PhysicalSocket::DoSendTo(SOCKET socket, | |
501 const char* buf, | |
502 int len, | |
503 int flags, | |
504 const struct sockaddr* dest_addr, | |
505 socklen_t addrlen) { | |
506 return ::sendto(socket, buf, len, flags, dest_addr, addrlen); | |
507 } | |
508 | |
509 void PhysicalSocket::OnResolveResult(AsyncResolverInterface* resolver) { | |
510 if (resolver != resolver_) { | |
511 return; | |
512 } | |
513 | |
514 int error = resolver_->GetError(); | |
515 if (error == 0) { | |
516 error = DoConnect(resolver_->address()); | |
517 } else { | |
518 Close(); | |
519 } | |
520 | |
521 if (error) { | |
522 SetError(error); | |
523 SignalCloseEvent(this, error); | |
524 } | |
525 } | |
526 | |
527 void PhysicalSocket::UpdateLastError() { | |
528 SetError(LAST_SYSTEM_ERROR); | |
529 } | |
530 | |
531 void PhysicalSocket::MaybeRemapSendError() { | |
532 #if defined(WEBRTC_MAC) | |
533 // https://developer.apple.com/library/mac/documentation/Darwin/ | |
534 // Reference/ManPages/man2/sendto.2.html | |
535 // ENOBUFS - The output queue for a network interface is full. | |
536 // This generally indicates that the interface has stopped sending, | |
537 // but may be caused by transient congestion. | |
538 if (GetError() == ENOBUFS) { | |
539 SetError(EWOULDBLOCK); | |
540 } | |
541 #endif | |
542 } | |
543 | |
544 void PhysicalSocket::SetEnabledEvents(uint8_t events) { | |
545 enabled_events_ = events; | |
546 } | |
547 | |
548 void PhysicalSocket::EnableEvents(uint8_t events) { | |
549 enabled_events_ |= events; | |
550 } | |
551 | |
552 void PhysicalSocket::DisableEvents(uint8_t events) { | |
553 enabled_events_ &= ~events; | |
554 } | |
555 | |
556 int PhysicalSocket::TranslateOption(Option opt, int* slevel, int* sopt) { | |
557 switch (opt) { | |
558 case OPT_DONTFRAGMENT: | |
559 #if defined(WEBRTC_WIN) | |
560 *slevel = IPPROTO_IP; | |
561 *sopt = IP_DONTFRAGMENT; | |
562 break; | |
563 #elif defined(WEBRTC_MAC) || defined(BSD) || defined(__native_client__) | |
564 LOG(LS_WARNING) << "Socket::OPT_DONTFRAGMENT not supported."; | |
565 return -1; | |
566 #elif defined(WEBRTC_POSIX) | |
567 *slevel = IPPROTO_IP; | |
568 *sopt = IP_MTU_DISCOVER; | |
569 break; | |
570 #endif | |
571 case OPT_RCVBUF: | |
572 *slevel = SOL_SOCKET; | |
573 *sopt = SO_RCVBUF; | |
574 break; | |
575 case OPT_SNDBUF: | |
576 *slevel = SOL_SOCKET; | |
577 *sopt = SO_SNDBUF; | |
578 break; | |
579 case OPT_NODELAY: | |
580 *slevel = IPPROTO_TCP; | |
581 *sopt = TCP_NODELAY; | |
582 break; | |
583 case OPT_DSCP: | |
584 LOG(LS_WARNING) << "Socket::OPT_DSCP not supported."; | |
585 return -1; | |
586 case OPT_RTP_SENDTIME_EXTN_ID: | |
587 return -1; // No logging is necessary as this not a OS socket option. | |
588 default: | |
589 RTC_NOTREACHED(); | |
590 return -1; | |
591 } | |
592 return 0; | |
593 } | |
594 | |
595 SocketDispatcher::SocketDispatcher(PhysicalSocketServer *ss) | |
596 #if defined(WEBRTC_WIN) | |
597 : PhysicalSocket(ss), id_(0), signal_close_(false) | |
598 #else | |
599 : PhysicalSocket(ss) | |
600 #endif | |
601 { | |
602 } | |
603 | |
604 SocketDispatcher::SocketDispatcher(SOCKET s, PhysicalSocketServer *ss) | |
605 #if defined(WEBRTC_WIN) | |
606 : PhysicalSocket(ss, s), id_(0), signal_close_(false) | |
607 #else | |
608 : PhysicalSocket(ss, s) | |
609 #endif | |
610 { | |
611 } | |
612 | |
613 SocketDispatcher::~SocketDispatcher() { | |
614 Close(); | |
615 } | |
616 | |
617 bool SocketDispatcher::Initialize() { | |
618 RTC_DCHECK(s_ != INVALID_SOCKET); | |
619 // Must be a non-blocking | |
620 #if defined(WEBRTC_WIN) | |
621 u_long argp = 1; | |
622 ioctlsocket(s_, FIONBIO, &argp); | |
623 #elif defined(WEBRTC_POSIX) | |
624 fcntl(s_, F_SETFL, fcntl(s_, F_GETFL, 0) | O_NONBLOCK); | |
625 #endif | |
626 #if defined(WEBRTC_IOS) | |
627 // iOS may kill sockets when the app is moved to the background | |
628 // (specifically, if the app doesn't use the "voip" UIBackgroundMode). When | |
629 // we attempt to write to such a socket, SIGPIPE will be raised, which by | |
630 // default will terminate the process, which we don't want. By specifying | |
631 // this socket option, SIGPIPE will be disabled for the socket. | |
632 int value = 1; | |
633 ::setsockopt(s_, SOL_SOCKET, SO_NOSIGPIPE, &value, sizeof(value)); | |
634 #endif | |
635 ss_->Add(this); | |
636 return true; | |
637 } | |
638 | |
639 bool SocketDispatcher::Create(int type) { | |
640 return Create(AF_INET, type); | |
641 } | |
642 | |
643 bool SocketDispatcher::Create(int family, int type) { | |
644 // Change the socket to be non-blocking. | |
645 if (!PhysicalSocket::Create(family, type)) | |
646 return false; | |
647 | |
648 if (!Initialize()) | |
649 return false; | |
650 | |
651 #if defined(WEBRTC_WIN) | |
652 do { id_ = ++next_id_; } while (id_ == 0); | |
653 #endif | |
654 return true; | |
655 } | |
656 | |
657 #if defined(WEBRTC_WIN) | |
658 | |
659 WSAEVENT SocketDispatcher::GetWSAEvent() { | |
660 return WSA_INVALID_EVENT; | |
661 } | |
662 | |
663 SOCKET SocketDispatcher::GetSocket() { | |
664 return s_; | |
665 } | |
666 | |
667 bool SocketDispatcher::CheckSignalClose() { | |
668 if (!signal_close_) | |
669 return false; | |
670 | |
671 char ch; | |
672 if (recv(s_, &ch, 1, MSG_PEEK) > 0) | |
673 return false; | |
674 | |
675 state_ = CS_CLOSED; | |
676 signal_close_ = false; | |
677 SignalCloseEvent(this, signal_err_); | |
678 return true; | |
679 } | |
680 | |
681 int SocketDispatcher::next_id_ = 0; | |
682 | |
683 #elif defined(WEBRTC_POSIX) | |
684 | |
685 int SocketDispatcher::GetDescriptor() { | |
686 return s_; | |
687 } | |
688 | |
689 bool SocketDispatcher::IsDescriptorClosed() { | |
690 if (udp_) { | |
691 // The MSG_PEEK trick doesn't work for UDP, since (at least in some | |
692 // circumstances) it requires reading an entire UDP packet, which would be | |
693 // bad for performance here. So, just check whether |s_| has been closed, | |
694 // which should be sufficient. | |
695 return s_ == INVALID_SOCKET; | |
696 } | |
697 // We don't have a reliable way of distinguishing end-of-stream | |
698 // from readability. So test on each readable call. Is this | |
699 // inefficient? Probably. | |
700 char ch; | |
701 ssize_t res = ::recv(s_, &ch, 1, MSG_PEEK); | |
702 if (res > 0) { | |
703 // Data available, so not closed. | |
704 return false; | |
705 } else if (res == 0) { | |
706 // EOF, so closed. | |
707 return true; | |
708 } else { // error | |
709 switch (errno) { | |
710 // Returned if we've already closed s_. | |
711 case EBADF: | |
712 // Returned during ungraceful peer shutdown. | |
713 case ECONNRESET: | |
714 return true; | |
715 // The normal blocking error; don't log anything. | |
716 case EWOULDBLOCK: | |
717 // Interrupted system call. | |
718 case EINTR: | |
719 return false; | |
720 default: | |
721 // Assume that all other errors are just blocking errors, meaning the | |
722 // connection is still good but we just can't read from it right now. | |
723 // This should only happen when connecting (and at most once), because | |
724 // in all other cases this function is only called if the file | |
725 // descriptor is already known to be in the readable state. However, | |
726 // it's not necessary a problem if we spuriously interpret a | |
727 // "connection lost"-type error as a blocking error, because typically | |
728 // the next recv() will get EOF, so we'll still eventually notice that | |
729 // the socket is closed. | |
730 LOG_ERR(LS_WARNING) << "Assuming benign blocking error"; | |
731 return false; | |
732 } | |
733 } | |
734 } | |
735 | |
736 #endif // WEBRTC_POSIX | |
737 | |
738 uint32_t SocketDispatcher::GetRequestedEvents() { | |
739 return enabled_events(); | |
740 } | |
741 | |
742 void SocketDispatcher::OnPreEvent(uint32_t ff) { | |
743 if ((ff & DE_CONNECT) != 0) | |
744 state_ = CS_CONNECTED; | |
745 | |
746 #if defined(WEBRTC_WIN) | |
747 // We set CS_CLOSED from CheckSignalClose. | |
748 #elif defined(WEBRTC_POSIX) | |
749 if ((ff & DE_CLOSE) != 0) | |
750 state_ = CS_CLOSED; | |
751 #endif | |
752 } | |
753 | |
754 #if defined(WEBRTC_WIN) | |
755 | |
756 void SocketDispatcher::OnEvent(uint32_t ff, int err) { | |
757 int cache_id = id_; | |
758 // Make sure we deliver connect/accept first. Otherwise, consumers may see | |
759 // something like a READ followed by a CONNECT, which would be odd. | |
760 if (((ff & DE_CONNECT) != 0) && (id_ == cache_id)) { | |
761 if (ff != DE_CONNECT) | |
762 LOG(LS_VERBOSE) << "Signalled with DE_CONNECT: " << ff; | |
763 DisableEvents(DE_CONNECT); | |
764 #if !defined(NDEBUG) | |
765 dbg_addr_ = "Connected @ "; | |
766 dbg_addr_.append(GetRemoteAddress().ToString()); | |
767 #endif | |
768 SignalConnectEvent(this); | |
769 } | |
770 if (((ff & DE_ACCEPT) != 0) && (id_ == cache_id)) { | |
771 DisableEvents(DE_ACCEPT); | |
772 SignalReadEvent(this); | |
773 } | |
774 if ((ff & DE_READ) != 0) { | |
775 DisableEvents(DE_READ); | |
776 SignalReadEvent(this); | |
777 } | |
778 if (((ff & DE_WRITE) != 0) && (id_ == cache_id)) { | |
779 DisableEvents(DE_WRITE); | |
780 SignalWriteEvent(this); | |
781 } | |
782 if (((ff & DE_CLOSE) != 0) && (id_ == cache_id)) { | |
783 signal_close_ = true; | |
784 signal_err_ = err; | |
785 } | |
786 } | |
787 | |
788 #elif defined(WEBRTC_POSIX) | |
789 | |
790 void SocketDispatcher::OnEvent(uint32_t ff, int err) { | |
791 #if defined(WEBRTC_USE_EPOLL) | |
792 // Remember currently enabled events so we can combine multiple changes | |
793 // into one update call later. | |
794 // The signal handlers might re-enable events disabled here, so we can't | |
795 // keep a list of events to disable at the end of the method. This list | |
796 // would not be updated with the events enabled by the signal handlers. | |
797 StartBatchedEventUpdates(); | |
798 #endif | |
799 // Make sure we deliver connect/accept first. Otherwise, consumers may see | |
800 // something like a READ followed by a CONNECT, which would be odd. | |
801 if ((ff & DE_CONNECT) != 0) { | |
802 DisableEvents(DE_CONNECT); | |
803 SignalConnectEvent(this); | |
804 } | |
805 if ((ff & DE_ACCEPT) != 0) { | |
806 DisableEvents(DE_ACCEPT); | |
807 SignalReadEvent(this); | |
808 } | |
809 if ((ff & DE_READ) != 0) { | |
810 DisableEvents(DE_READ); | |
811 SignalReadEvent(this); | |
812 } | |
813 if ((ff & DE_WRITE) != 0) { | |
814 DisableEvents(DE_WRITE); | |
815 SignalWriteEvent(this); | |
816 } | |
817 if ((ff & DE_CLOSE) != 0) { | |
818 // The socket is now dead to us, so stop checking it. | |
819 SetEnabledEvents(0); | |
820 SignalCloseEvent(this, err); | |
821 } | |
822 #if defined(WEBRTC_USE_EPOLL) | |
823 FinishBatchedEventUpdates(); | |
824 #endif | |
825 } | |
826 | |
827 #endif // WEBRTC_POSIX | |
828 | |
829 #if defined(WEBRTC_USE_EPOLL) | |
830 | |
831 static int GetEpollEvents(uint32_t ff) { | |
832 int events = 0; | |
833 if (ff & (DE_READ | DE_ACCEPT)) { | |
834 events |= EPOLLIN; | |
835 } | |
836 if (ff & (DE_WRITE | DE_CONNECT)) { | |
837 events |= EPOLLOUT; | |
838 } | |
839 return events; | |
840 } | |
841 | |
842 void SocketDispatcher::StartBatchedEventUpdates() { | |
843 RTC_DCHECK_EQ(saved_enabled_events_, -1); | |
844 saved_enabled_events_ = enabled_events(); | |
845 } | |
846 | |
847 void SocketDispatcher::FinishBatchedEventUpdates() { | |
848 RTC_DCHECK_NE(saved_enabled_events_, -1); | |
849 uint8_t old_events = static_cast<uint8_t>(saved_enabled_events_); | |
850 saved_enabled_events_ = -1; | |
851 MaybeUpdateDispatcher(old_events); | |
852 } | |
853 | |
854 void SocketDispatcher::MaybeUpdateDispatcher(uint8_t old_events) { | |
855 if (GetEpollEvents(enabled_events()) != GetEpollEvents(old_events) && | |
856 saved_enabled_events_ == -1) { | |
857 ss_->Update(this); | |
858 } | |
859 } | |
860 | |
861 void SocketDispatcher::SetEnabledEvents(uint8_t events) { | |
862 uint8_t old_events = enabled_events(); | |
863 PhysicalSocket::SetEnabledEvents(events); | |
864 MaybeUpdateDispatcher(old_events); | |
865 } | |
866 | |
867 void SocketDispatcher::EnableEvents(uint8_t events) { | |
868 uint8_t old_events = enabled_events(); | |
869 PhysicalSocket::EnableEvents(events); | |
870 MaybeUpdateDispatcher(old_events); | |
871 } | |
872 | |
873 void SocketDispatcher::DisableEvents(uint8_t events) { | |
874 uint8_t old_events = enabled_events(); | |
875 PhysicalSocket::DisableEvents(events); | |
876 MaybeUpdateDispatcher(old_events); | |
877 } | |
878 | |
879 #endif // WEBRTC_USE_EPOLL | |
880 | |
881 int SocketDispatcher::Close() { | |
882 if (s_ == INVALID_SOCKET) | |
883 return 0; | |
884 | |
885 #if defined(WEBRTC_WIN) | |
886 id_ = 0; | |
887 signal_close_ = false; | |
888 #endif | |
889 ss_->Remove(this); | |
890 return PhysicalSocket::Close(); | |
891 } | |
892 | |
893 #if defined(WEBRTC_POSIX) | |
894 class EventDispatcher : public Dispatcher { | |
895 public: | |
896 EventDispatcher(PhysicalSocketServer* ss) : ss_(ss), fSignaled_(false) { | |
897 if (pipe(afd_) < 0) | |
898 LOG(LERROR) << "pipe failed"; | |
899 ss_->Add(this); | |
900 } | |
901 | |
902 ~EventDispatcher() override { | |
903 ss_->Remove(this); | |
904 close(afd_[0]); | |
905 close(afd_[1]); | |
906 } | |
907 | |
908 virtual void Signal() { | |
909 CritScope cs(&crit_); | |
910 if (!fSignaled_) { | |
911 const uint8_t b[1] = {0}; | |
912 const ssize_t res = write(afd_[1], b, sizeof(b)); | |
913 RTC_DCHECK_EQ(1, res); | |
914 fSignaled_ = true; | |
915 } | |
916 } | |
917 | |
918 uint32_t GetRequestedEvents() override { return DE_READ; } | |
919 | |
920 void OnPreEvent(uint32_t ff) override { | |
921 // It is not possible to perfectly emulate an auto-resetting event with | |
922 // pipes. This simulates it by resetting before the event is handled. | |
923 | |
924 CritScope cs(&crit_); | |
925 if (fSignaled_) { | |
926 uint8_t b[4]; // Allow for reading more than 1 byte, but expect 1. | |
927 const ssize_t res = read(afd_[0], b, sizeof(b)); | |
928 RTC_DCHECK_EQ(1, res); | |
929 fSignaled_ = false; | |
930 } | |
931 } | |
932 | |
933 void OnEvent(uint32_t ff, int err) override { RTC_NOTREACHED(); } | |
934 | |
935 int GetDescriptor() override { return afd_[0]; } | |
936 | |
937 bool IsDescriptorClosed() override { return false; } | |
938 | |
939 private: | |
940 PhysicalSocketServer *ss_; | |
941 int afd_[2]; | |
942 bool fSignaled_; | |
943 CriticalSection crit_; | |
944 }; | |
945 | |
946 // These two classes use the self-pipe trick to deliver POSIX signals to our | |
947 // select loop. This is the only safe, reliable, cross-platform way to do | |
948 // non-trivial things with a POSIX signal in an event-driven program (until | |
949 // proper pselect() implementations become ubiquitous). | |
950 | |
951 class PosixSignalHandler { | |
952 public: | |
953 // POSIX only specifies 32 signals, but in principle the system might have | |
954 // more and the programmer might choose to use them, so we size our array | |
955 // for 128. | |
956 static const int kNumPosixSignals = 128; | |
957 | |
958 // There is just a single global instance. (Signal handlers do not get any | |
959 // sort of user-defined void * parameter, so they can't access anything that | |
960 // isn't global.) | |
961 static PosixSignalHandler* Instance() { | |
962 RTC_DEFINE_STATIC_LOCAL(PosixSignalHandler, instance, ()); | |
963 return &instance; | |
964 } | |
965 | |
966 // Returns true if the given signal number is set. | |
967 bool IsSignalSet(int signum) const { | |
968 RTC_DCHECK(signum < static_cast<int>(arraysize(received_signal_))); | |
969 if (signum < static_cast<int>(arraysize(received_signal_))) { | |
970 return received_signal_[signum]; | |
971 } else { | |
972 return false; | |
973 } | |
974 } | |
975 | |
976 // Clears the given signal number. | |
977 void ClearSignal(int signum) { | |
978 RTC_DCHECK(signum < static_cast<int>(arraysize(received_signal_))); | |
979 if (signum < static_cast<int>(arraysize(received_signal_))) { | |
980 received_signal_[signum] = false; | |
981 } | |
982 } | |
983 | |
984 // Returns the file descriptor to monitor for signal events. | |
985 int GetDescriptor() const { | |
986 return afd_[0]; | |
987 } | |
988 | |
989 // This is called directly from our real signal handler, so it must be | |
990 // signal-handler-safe. That means it cannot assume anything about the | |
991 // user-level state of the process, since the handler could be executed at any | |
992 // time on any thread. | |
993 void OnPosixSignalReceived(int signum) { | |
994 if (signum >= static_cast<int>(arraysize(received_signal_))) { | |
995 // We don't have space in our array for this. | |
996 return; | |
997 } | |
998 // Set a flag saying we've seen this signal. | |
999 received_signal_[signum] = true; | |
1000 // Notify application code that we got a signal. | |
1001 const uint8_t b[1] = {0}; | |
1002 if (-1 == write(afd_[1], b, sizeof(b))) { | |
1003 // Nothing we can do here. If there's an error somehow then there's | |
1004 // nothing we can safely do from a signal handler. | |
1005 // No, we can't even safely log it. | |
1006 // But, we still have to check the return value here. Otherwise, | |
1007 // GCC 4.4.1 complains ignoring return value. Even (void) doesn't help. | |
1008 return; | |
1009 } | |
1010 } | |
1011 | |
1012 private: | |
1013 PosixSignalHandler() { | |
1014 if (pipe(afd_) < 0) { | |
1015 LOG_ERR(LS_ERROR) << "pipe failed"; | |
1016 return; | |
1017 } | |
1018 if (fcntl(afd_[0], F_SETFL, O_NONBLOCK) < 0) { | |
1019 LOG_ERR(LS_WARNING) << "fcntl #1 failed"; | |
1020 } | |
1021 if (fcntl(afd_[1], F_SETFL, O_NONBLOCK) < 0) { | |
1022 LOG_ERR(LS_WARNING) << "fcntl #2 failed"; | |
1023 } | |
1024 memset(const_cast<void *>(static_cast<volatile void *>(received_signal_)), | |
1025 0, | |
1026 sizeof(received_signal_)); | |
1027 } | |
1028 | |
1029 ~PosixSignalHandler() { | |
1030 int fd1 = afd_[0]; | |
1031 int fd2 = afd_[1]; | |
1032 // We clobber the stored file descriptor numbers here or else in principle | |
1033 // a signal that happens to be delivered during application termination | |
1034 // could erroneously write a zero byte to an unrelated file handle in | |
1035 // OnPosixSignalReceived() if some other file happens to be opened later | |
1036 // during shutdown and happens to be given the same file descriptor number | |
1037 // as our pipe had. Unfortunately even with this precaution there is still a | |
1038 // race where that could occur if said signal happens to be handled | |
1039 // concurrently with this code and happens to have already read the value of | |
1040 // afd_[1] from memory before we clobber it, but that's unlikely. | |
1041 afd_[0] = -1; | |
1042 afd_[1] = -1; | |
1043 close(fd1); | |
1044 close(fd2); | |
1045 } | |
1046 | |
1047 int afd_[2]; | |
1048 // These are boolean flags that will be set in our signal handler and read | |
1049 // and cleared from Wait(). There is a race involved in this, but it is | |
1050 // benign. The signal handler sets the flag before signaling the pipe, so | |
1051 // we'll never end up blocking in select() while a flag is still true. | |
1052 // However, if two of the same signal arrive close to each other then it's | |
1053 // possible that the second time the handler may set the flag while it's still | |
1054 // true, meaning that signal will be missed. But the first occurrence of it | |
1055 // will still be handled, so this isn't a problem. | |
1056 // Volatile is not necessary here for correctness, but this data _is_ volatile | |
1057 // so I've marked it as such. | |
1058 volatile uint8_t received_signal_[kNumPosixSignals]; | |
1059 }; | |
1060 | |
1061 class PosixSignalDispatcher : public Dispatcher { | |
1062 public: | |
1063 PosixSignalDispatcher(PhysicalSocketServer *owner) : owner_(owner) { | |
1064 owner_->Add(this); | |
1065 } | |
1066 | |
1067 ~PosixSignalDispatcher() override { | |
1068 owner_->Remove(this); | |
1069 } | |
1070 | |
1071 uint32_t GetRequestedEvents() override { return DE_READ; } | |
1072 | |
1073 void OnPreEvent(uint32_t ff) override { | |
1074 // Events might get grouped if signals come very fast, so we read out up to | |
1075 // 16 bytes to make sure we keep the pipe empty. | |
1076 uint8_t b[16]; | |
1077 ssize_t ret = read(GetDescriptor(), b, sizeof(b)); | |
1078 if (ret < 0) { | |
1079 LOG_ERR(LS_WARNING) << "Error in read()"; | |
1080 } else if (ret == 0) { | |
1081 LOG(LS_WARNING) << "Should have read at least one byte"; | |
1082 } | |
1083 } | |
1084 | |
1085 void OnEvent(uint32_t ff, int err) override { | |
1086 for (int signum = 0; signum < PosixSignalHandler::kNumPosixSignals; | |
1087 ++signum) { | |
1088 if (PosixSignalHandler::Instance()->IsSignalSet(signum)) { | |
1089 PosixSignalHandler::Instance()->ClearSignal(signum); | |
1090 HandlerMap::iterator i = handlers_.find(signum); | |
1091 if (i == handlers_.end()) { | |
1092 // This can happen if a signal is delivered to our process at around | |
1093 // the same time as we unset our handler for it. It is not an error | |
1094 // condition, but it's unusual enough to be worth logging. | |
1095 LOG(LS_INFO) << "Received signal with no handler: " << signum; | |
1096 } else { | |
1097 // Otherwise, execute our handler. | |
1098 (*i->second)(signum); | |
1099 } | |
1100 } | |
1101 } | |
1102 } | |
1103 | |
1104 int GetDescriptor() override { | |
1105 return PosixSignalHandler::Instance()->GetDescriptor(); | |
1106 } | |
1107 | |
1108 bool IsDescriptorClosed() override { return false; } | |
1109 | |
1110 void SetHandler(int signum, void (*handler)(int)) { | |
1111 handlers_[signum] = handler; | |
1112 } | |
1113 | |
1114 void ClearHandler(int signum) { | |
1115 handlers_.erase(signum); | |
1116 } | |
1117 | |
1118 bool HasHandlers() { | |
1119 return !handlers_.empty(); | |
1120 } | |
1121 | |
1122 private: | |
1123 typedef std::map<int, void (*)(int)> HandlerMap; | |
1124 | |
1125 HandlerMap handlers_; | |
1126 // Our owner. | |
1127 PhysicalSocketServer *owner_; | |
1128 }; | |
1129 | |
1130 #endif // WEBRTC_POSIX | |
1131 | |
1132 #if defined(WEBRTC_WIN) | |
1133 static uint32_t FlagsToEvents(uint32_t events) { | |
1134 uint32_t ffFD = FD_CLOSE; | |
1135 if (events & DE_READ) | |
1136 ffFD |= FD_READ; | |
1137 if (events & DE_WRITE) | |
1138 ffFD |= FD_WRITE; | |
1139 if (events & DE_CONNECT) | |
1140 ffFD |= FD_CONNECT; | |
1141 if (events & DE_ACCEPT) | |
1142 ffFD |= FD_ACCEPT; | |
1143 return ffFD; | |
1144 } | |
1145 | |
1146 class EventDispatcher : public Dispatcher { | |
1147 public: | |
1148 EventDispatcher(PhysicalSocketServer *ss) : ss_(ss) { | |
1149 hev_ = WSACreateEvent(); | |
1150 if (hev_) { | |
1151 ss_->Add(this); | |
1152 } | |
1153 } | |
1154 | |
1155 ~EventDispatcher() { | |
1156 if (hev_ != nullptr) { | |
1157 ss_->Remove(this); | |
1158 WSACloseEvent(hev_); | |
1159 hev_ = nullptr; | |
1160 } | |
1161 } | |
1162 | |
1163 virtual void Signal() { | |
1164 if (hev_ != nullptr) | |
1165 WSASetEvent(hev_); | |
1166 } | |
1167 | |
1168 virtual uint32_t GetRequestedEvents() { return 0; } | |
1169 | |
1170 virtual void OnPreEvent(uint32_t ff) { WSAResetEvent(hev_); } | |
1171 | |
1172 virtual void OnEvent(uint32_t ff, int err) {} | |
1173 | |
1174 virtual WSAEVENT GetWSAEvent() { | |
1175 return hev_; | |
1176 } | |
1177 | |
1178 virtual SOCKET GetSocket() { | |
1179 return INVALID_SOCKET; | |
1180 } | |
1181 | |
1182 virtual bool CheckSignalClose() { return false; } | |
1183 | |
1184 private: | |
1185 PhysicalSocketServer* ss_; | |
1186 WSAEVENT hev_; | |
1187 }; | |
1188 #endif // WEBRTC_WIN | |
1189 | |
1190 // Sets the value of a boolean value to false when signaled. | |
1191 class Signaler : public EventDispatcher { | |
1192 public: | |
1193 Signaler(PhysicalSocketServer* ss, bool* pf) | |
1194 : EventDispatcher(ss), pf_(pf) { | |
1195 } | |
1196 ~Signaler() override { } | |
1197 | |
1198 void OnEvent(uint32_t ff, int err) override { | |
1199 if (pf_) | |
1200 *pf_ = false; | |
1201 } | |
1202 | |
1203 private: | |
1204 bool *pf_; | |
1205 }; | |
1206 | |
1207 PhysicalSocketServer::PhysicalSocketServer() | |
1208 : fWait_(false) { | |
1209 #if defined(WEBRTC_USE_EPOLL) | |
1210 // Since Linux 2.6.8, the size argument is ignored, but must be greater than | |
1211 // zero. Before that the size served as hint to the kernel for the amount of | |
1212 // space to initially allocate in internal data structures. | |
1213 epoll_fd_ = epoll_create(FD_SETSIZE); | |
1214 if (epoll_fd_ == -1) { | |
1215 // Not an error, will fall back to "select" below. | |
1216 LOG_E(LS_WARNING, EN, errno) << "epoll_create"; | |
1217 epoll_fd_ = INVALID_SOCKET; | |
1218 } | |
1219 #endif | |
1220 signal_wakeup_ = new Signaler(this, &fWait_); | |
1221 #if defined(WEBRTC_WIN) | |
1222 socket_ev_ = WSACreateEvent(); | |
1223 #endif | |
1224 } | |
1225 | |
1226 PhysicalSocketServer::~PhysicalSocketServer() { | |
1227 #if defined(WEBRTC_WIN) | |
1228 WSACloseEvent(socket_ev_); | |
1229 #endif | |
1230 #if defined(WEBRTC_POSIX) | |
1231 signal_dispatcher_.reset(); | |
1232 #endif | |
1233 delete signal_wakeup_; | |
1234 #if defined(WEBRTC_USE_EPOLL) | |
1235 if (epoll_fd_ != INVALID_SOCKET) { | |
1236 close(epoll_fd_); | |
1237 } | |
1238 #endif | |
1239 RTC_DCHECK(dispatchers_.empty()); | |
1240 } | |
1241 | |
1242 void PhysicalSocketServer::WakeUp() { | |
1243 signal_wakeup_->Signal(); | |
1244 } | |
1245 | |
1246 Socket* PhysicalSocketServer::CreateSocket(int type) { | |
1247 return CreateSocket(AF_INET, type); | |
1248 } | |
1249 | |
1250 Socket* PhysicalSocketServer::CreateSocket(int family, int type) { | |
1251 PhysicalSocket* socket = new PhysicalSocket(this); | |
1252 if (socket->Create(family, type)) { | |
1253 return socket; | |
1254 } else { | |
1255 delete socket; | |
1256 return nullptr; | |
1257 } | |
1258 } | |
1259 | |
1260 AsyncSocket* PhysicalSocketServer::CreateAsyncSocket(int type) { | |
1261 return CreateAsyncSocket(AF_INET, type); | |
1262 } | |
1263 | |
1264 AsyncSocket* PhysicalSocketServer::CreateAsyncSocket(int family, int type) { | |
1265 SocketDispatcher* dispatcher = new SocketDispatcher(this); | |
1266 if (dispatcher->Create(family, type)) { | |
1267 return dispatcher; | |
1268 } else { | |
1269 delete dispatcher; | |
1270 return nullptr; | |
1271 } | |
1272 } | |
1273 | |
1274 AsyncSocket* PhysicalSocketServer::WrapSocket(SOCKET s) { | |
1275 SocketDispatcher* dispatcher = new SocketDispatcher(s, this); | |
1276 if (dispatcher->Initialize()) { | |
1277 return dispatcher; | |
1278 } else { | |
1279 delete dispatcher; | |
1280 return nullptr; | |
1281 } | |
1282 } | |
1283 | |
1284 void PhysicalSocketServer::Add(Dispatcher *pdispatcher) { | |
1285 CritScope cs(&crit_); | |
1286 if (processing_dispatchers_) { | |
1287 // A dispatcher is being added while a "Wait" call is processing the | |
1288 // list of socket events. | |
1289 // Defer adding to "dispatchers_" set until processing is done to avoid | |
1290 // invalidating the iterator in "Wait". | |
1291 pending_remove_dispatchers_.erase(pdispatcher); | |
1292 pending_add_dispatchers_.insert(pdispatcher); | |
1293 } else { | |
1294 dispatchers_.insert(pdispatcher); | |
1295 } | |
1296 #if defined(WEBRTC_USE_EPOLL) | |
1297 if (epoll_fd_ != INVALID_SOCKET) { | |
1298 AddEpoll(pdispatcher); | |
1299 } | |
1300 #endif // WEBRTC_USE_EPOLL | |
1301 } | |
1302 | |
1303 void PhysicalSocketServer::Remove(Dispatcher *pdispatcher) { | |
1304 CritScope cs(&crit_); | |
1305 if (processing_dispatchers_) { | |
1306 // A dispatcher is being removed while a "Wait" call is processing the | |
1307 // list of socket events. | |
1308 // Defer removal from "dispatchers_" set until processing is done to avoid | |
1309 // invalidating the iterator in "Wait". | |
1310 if (!pending_add_dispatchers_.erase(pdispatcher) && | |
1311 dispatchers_.find(pdispatcher) == dispatchers_.end()) { | |
1312 LOG(LS_WARNING) << "PhysicalSocketServer asked to remove a unknown " | |
1313 << "dispatcher, potentially from a duplicate call to " | |
1314 << "Add."; | |
1315 return; | |
1316 } | |
1317 | |
1318 pending_remove_dispatchers_.insert(pdispatcher); | |
1319 } else if (!dispatchers_.erase(pdispatcher)) { | |
1320 LOG(LS_WARNING) << "PhysicalSocketServer asked to remove a unknown " | |
1321 << "dispatcher, potentially from a duplicate call to Add."; | |
1322 return; | |
1323 } | |
1324 #if defined(WEBRTC_USE_EPOLL) | |
1325 if (epoll_fd_ != INVALID_SOCKET) { | |
1326 RemoveEpoll(pdispatcher); | |
1327 } | |
1328 #endif // WEBRTC_USE_EPOLL | |
1329 } | |
1330 | |
1331 void PhysicalSocketServer::Update(Dispatcher* pdispatcher) { | |
1332 #if defined(WEBRTC_USE_EPOLL) | |
1333 if (epoll_fd_ == INVALID_SOCKET) { | |
1334 return; | |
1335 } | |
1336 | |
1337 CritScope cs(&crit_); | |
1338 if (dispatchers_.find(pdispatcher) == dispatchers_.end()) { | |
1339 return; | |
1340 } | |
1341 | |
1342 UpdateEpoll(pdispatcher); | |
1343 #endif | |
1344 } | |
1345 | |
1346 void PhysicalSocketServer::AddRemovePendingDispatchers() { | |
1347 if (!pending_add_dispatchers_.empty()) { | |
1348 for (Dispatcher* pdispatcher : pending_add_dispatchers_) { | |
1349 dispatchers_.insert(pdispatcher); | |
1350 } | |
1351 pending_add_dispatchers_.clear(); | |
1352 } | |
1353 | |
1354 if (!pending_remove_dispatchers_.empty()) { | |
1355 for (Dispatcher* pdispatcher : pending_remove_dispatchers_) { | |
1356 dispatchers_.erase(pdispatcher); | |
1357 } | |
1358 pending_remove_dispatchers_.clear(); | |
1359 } | |
1360 } | |
1361 | |
1362 #if defined(WEBRTC_POSIX) | |
1363 | |
1364 bool PhysicalSocketServer::Wait(int cmsWait, bool process_io) { | |
1365 #if defined(WEBRTC_USE_EPOLL) | |
1366 // We don't keep a dedicated "epoll" descriptor containing only the non-IO | |
1367 // (i.e. signaling) dispatcher, so "poll" will be used instead of the default | |
1368 // "select" to support sockets larger than FD_SETSIZE. | |
1369 if (!process_io) { | |
1370 return WaitPoll(cmsWait, signal_wakeup_); | |
1371 } else if (epoll_fd_ != INVALID_SOCKET) { | |
1372 return WaitEpoll(cmsWait); | |
1373 } | |
1374 #endif | |
1375 return WaitSelect(cmsWait, process_io); | |
1376 } | |
1377 | |
1378 static void ProcessEvents(Dispatcher* dispatcher, | |
1379 bool readable, | |
1380 bool writable, | |
1381 bool check_error) { | |
1382 int errcode = 0; | |
1383 // TODO(pthatcher): Should we set errcode if getsockopt fails? | |
1384 if (check_error) { | |
1385 socklen_t len = sizeof(errcode); | |
1386 ::getsockopt(dispatcher->GetDescriptor(), SOL_SOCKET, SO_ERROR, &errcode, | |
1387 &len); | |
1388 } | |
1389 | |
1390 uint32_t ff = 0; | |
1391 | |
1392 // Check readable descriptors. If we're waiting on an accept, signal | |
1393 // that. Otherwise we're waiting for data, check to see if we're | |
1394 // readable or really closed. | |
1395 // TODO(pthatcher): Only peek at TCP descriptors. | |
1396 if (readable) { | |
1397 if (dispatcher->GetRequestedEvents() & DE_ACCEPT) { | |
1398 ff |= DE_ACCEPT; | |
1399 } else if (errcode || dispatcher->IsDescriptorClosed()) { | |
1400 ff |= DE_CLOSE; | |
1401 } else { | |
1402 ff |= DE_READ; | |
1403 } | |
1404 } | |
1405 | |
1406 // Check writable descriptors. If we're waiting on a connect, detect | |
1407 // success versus failure by the reaped error code. | |
1408 if (writable) { | |
1409 if (dispatcher->GetRequestedEvents() & DE_CONNECT) { | |
1410 if (!errcode) { | |
1411 ff |= DE_CONNECT; | |
1412 } else { | |
1413 ff |= DE_CLOSE; | |
1414 } | |
1415 } else { | |
1416 ff |= DE_WRITE; | |
1417 } | |
1418 } | |
1419 | |
1420 // Tell the descriptor about the event. | |
1421 if (ff != 0) { | |
1422 dispatcher->OnPreEvent(ff); | |
1423 dispatcher->OnEvent(ff, errcode); | |
1424 } | |
1425 } | |
1426 | |
1427 bool PhysicalSocketServer::WaitSelect(int cmsWait, bool process_io) { | |
1428 // Calculate timing information | |
1429 | |
1430 struct timeval* ptvWait = nullptr; | |
1431 struct timeval tvWait; | |
1432 struct timeval tvStop; | |
1433 if (cmsWait != kForever) { | |
1434 // Calculate wait timeval | |
1435 tvWait.tv_sec = cmsWait / 1000; | |
1436 tvWait.tv_usec = (cmsWait % 1000) * 1000; | |
1437 ptvWait = &tvWait; | |
1438 | |
1439 // Calculate when to return in a timeval | |
1440 gettimeofday(&tvStop, nullptr); | |
1441 tvStop.tv_sec += tvWait.tv_sec; | |
1442 tvStop.tv_usec += tvWait.tv_usec; | |
1443 if (tvStop.tv_usec >= 1000000) { | |
1444 tvStop.tv_usec -= 1000000; | |
1445 tvStop.tv_sec += 1; | |
1446 } | |
1447 } | |
1448 | |
1449 // Zero all fd_sets. Don't need to do this inside the loop since | |
1450 // select() zeros the descriptors not signaled | |
1451 | |
1452 fd_set fdsRead; | |
1453 FD_ZERO(&fdsRead); | |
1454 fd_set fdsWrite; | |
1455 FD_ZERO(&fdsWrite); | |
1456 // Explicitly unpoison these FDs on MemorySanitizer which doesn't handle the | |
1457 // inline assembly in FD_ZERO. | |
1458 // http://crbug.com/344505 | |
1459 #ifdef MEMORY_SANITIZER | |
1460 __msan_unpoison(&fdsRead, sizeof(fdsRead)); | |
1461 __msan_unpoison(&fdsWrite, sizeof(fdsWrite)); | |
1462 #endif | |
1463 | |
1464 fWait_ = true; | |
1465 | |
1466 while (fWait_) { | |
1467 int fdmax = -1; | |
1468 { | |
1469 CritScope cr(&crit_); | |
1470 // TODO(jbauch): Support re-entrant waiting. | |
1471 RTC_DCHECK(!processing_dispatchers_); | |
1472 for (Dispatcher* pdispatcher : dispatchers_) { | |
1473 // Query dispatchers for read and write wait state | |
1474 RTC_DCHECK(pdispatcher); | |
1475 if (!process_io && (pdispatcher != signal_wakeup_)) | |
1476 continue; | |
1477 int fd = pdispatcher->GetDescriptor(); | |
1478 // "select"ing a file descriptor that is equal to or larger than | |
1479 // FD_SETSIZE will result in undefined behavior. | |
1480 RTC_DCHECK_LT(fd, FD_SETSIZE); | |
1481 if (fd > fdmax) | |
1482 fdmax = fd; | |
1483 | |
1484 uint32_t ff = pdispatcher->GetRequestedEvents(); | |
1485 if (ff & (DE_READ | DE_ACCEPT)) | |
1486 FD_SET(fd, &fdsRead); | |
1487 if (ff & (DE_WRITE | DE_CONNECT)) | |
1488 FD_SET(fd, &fdsWrite); | |
1489 } | |
1490 } | |
1491 | |
1492 // Wait then call handlers as appropriate | |
1493 // < 0 means error | |
1494 // 0 means timeout | |
1495 // > 0 means count of descriptors ready | |
1496 int n = select(fdmax + 1, &fdsRead, &fdsWrite, nullptr, ptvWait); | |
1497 | |
1498 // If error, return error. | |
1499 if (n < 0) { | |
1500 if (errno != EINTR) { | |
1501 LOG_E(LS_ERROR, EN, errno) << "select"; | |
1502 return false; | |
1503 } | |
1504 // Else ignore the error and keep going. If this EINTR was for one of the | |
1505 // signals managed by this PhysicalSocketServer, the | |
1506 // PosixSignalDeliveryDispatcher will be in the signaled state in the next | |
1507 // iteration. | |
1508 } else if (n == 0) { | |
1509 // If timeout, return success | |
1510 return true; | |
1511 } else { | |
1512 // We have signaled descriptors | |
1513 CritScope cr(&crit_); | |
1514 processing_dispatchers_ = true; | |
1515 for (Dispatcher* pdispatcher : dispatchers_) { | |
1516 int fd = pdispatcher->GetDescriptor(); | |
1517 | |
1518 bool readable = FD_ISSET(fd, &fdsRead); | |
1519 if (readable) { | |
1520 FD_CLR(fd, &fdsRead); | |
1521 } | |
1522 | |
1523 bool writable = FD_ISSET(fd, &fdsWrite); | |
1524 if (writable) { | |
1525 FD_CLR(fd, &fdsWrite); | |
1526 } | |
1527 | |
1528 // The error code can be signaled through reads or writes. | |
1529 ProcessEvents(pdispatcher, readable, writable, readable || writable); | |
1530 } | |
1531 | |
1532 processing_dispatchers_ = false; | |
1533 // Process deferred dispatchers that have been added/removed while the | |
1534 // events were handled above. | |
1535 AddRemovePendingDispatchers(); | |
1536 } | |
1537 | |
1538 // Recalc the time remaining to wait. Doing it here means it doesn't get | |
1539 // calced twice the first time through the loop | |
1540 if (ptvWait) { | |
1541 ptvWait->tv_sec = 0; | |
1542 ptvWait->tv_usec = 0; | |
1543 struct timeval tvT; | |
1544 gettimeofday(&tvT, nullptr); | |
1545 if ((tvStop.tv_sec > tvT.tv_sec) | |
1546 || ((tvStop.tv_sec == tvT.tv_sec) | |
1547 && (tvStop.tv_usec > tvT.tv_usec))) { | |
1548 ptvWait->tv_sec = tvStop.tv_sec - tvT.tv_sec; | |
1549 ptvWait->tv_usec = tvStop.tv_usec - tvT.tv_usec; | |
1550 if (ptvWait->tv_usec < 0) { | |
1551 RTC_DCHECK(ptvWait->tv_sec > 0); | |
1552 ptvWait->tv_usec += 1000000; | |
1553 ptvWait->tv_sec -= 1; | |
1554 } | |
1555 } | |
1556 } | |
1557 } | |
1558 | |
1559 return true; | |
1560 } | |
1561 | |
1562 #if defined(WEBRTC_USE_EPOLL) | |
1563 | |
1564 // Initial number of events to process with one call to "epoll_wait". | |
1565 static const size_t kInitialEpollEvents = 128; | |
1566 | |
1567 // Maximum number of events to process with one call to "epoll_wait". | |
1568 static const size_t kMaxEpollEvents = 8192; | |
1569 | |
1570 void PhysicalSocketServer::AddEpoll(Dispatcher* pdispatcher) { | |
1571 RTC_DCHECK(epoll_fd_ != INVALID_SOCKET); | |
1572 int fd = pdispatcher->GetDescriptor(); | |
1573 RTC_DCHECK(fd != INVALID_SOCKET); | |
1574 if (fd == INVALID_SOCKET) { | |
1575 return; | |
1576 } | |
1577 | |
1578 struct epoll_event event = {0}; | |
1579 event.events = GetEpollEvents(pdispatcher->GetRequestedEvents()); | |
1580 event.data.ptr = pdispatcher; | |
1581 int err = epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, fd, &event); | |
1582 RTC_DCHECK_EQ(err, 0); | |
1583 if (err == -1) { | |
1584 LOG_E(LS_ERROR, EN, errno) << "epoll_ctl EPOLL_CTL_ADD"; | |
1585 } | |
1586 } | |
1587 | |
1588 void PhysicalSocketServer::RemoveEpoll(Dispatcher* pdispatcher) { | |
1589 RTC_DCHECK(epoll_fd_ != INVALID_SOCKET); | |
1590 int fd = pdispatcher->GetDescriptor(); | |
1591 RTC_DCHECK(fd != INVALID_SOCKET); | |
1592 if (fd == INVALID_SOCKET) { | |
1593 return; | |
1594 } | |
1595 | |
1596 struct epoll_event event = {0}; | |
1597 int err = epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, fd, &event); | |
1598 RTC_DCHECK(err == 0 || errno == ENOENT); | |
1599 if (err == -1) { | |
1600 if (errno == ENOENT) { | |
1601 // Socket has already been closed. | |
1602 LOG_E(LS_VERBOSE, EN, errno) << "epoll_ctl EPOLL_CTL_DEL"; | |
1603 } else { | |
1604 LOG_E(LS_ERROR, EN, errno) << "epoll_ctl EPOLL_CTL_DEL"; | |
1605 } | |
1606 } | |
1607 } | |
1608 | |
1609 void PhysicalSocketServer::UpdateEpoll(Dispatcher* pdispatcher) { | |
1610 RTC_DCHECK(epoll_fd_ != INVALID_SOCKET); | |
1611 int fd = pdispatcher->GetDescriptor(); | |
1612 RTC_DCHECK(fd != INVALID_SOCKET); | |
1613 if (fd == INVALID_SOCKET) { | |
1614 return; | |
1615 } | |
1616 | |
1617 struct epoll_event event = {0}; | |
1618 event.events = GetEpollEvents(pdispatcher->GetRequestedEvents()); | |
1619 event.data.ptr = pdispatcher; | |
1620 int err = epoll_ctl(epoll_fd_, EPOLL_CTL_MOD, fd, &event); | |
1621 RTC_DCHECK_EQ(err, 0); | |
1622 if (err == -1) { | |
1623 LOG_E(LS_ERROR, EN, errno) << "epoll_ctl EPOLL_CTL_MOD"; | |
1624 } | |
1625 } | |
1626 | |
1627 bool PhysicalSocketServer::WaitEpoll(int cmsWait) { | |
1628 RTC_DCHECK(epoll_fd_ != INVALID_SOCKET); | |
1629 int64_t tvWait = -1; | |
1630 int64_t tvStop = -1; | |
1631 if (cmsWait != kForever) { | |
1632 tvWait = cmsWait; | |
1633 tvStop = TimeAfter(cmsWait); | |
1634 } | |
1635 | |
1636 if (epoll_events_.empty()) { | |
1637 // The initial space to receive events is created only if epoll is used. | |
1638 epoll_events_.resize(kInitialEpollEvents); | |
1639 } | |
1640 | |
1641 fWait_ = true; | |
1642 | |
1643 while (fWait_) { | |
1644 // Wait then call handlers as appropriate | |
1645 // < 0 means error | |
1646 // 0 means timeout | |
1647 // > 0 means count of descriptors ready | |
1648 int n = epoll_wait(epoll_fd_, &epoll_events_[0], | |
1649 static_cast<int>(epoll_events_.size()), | |
1650 static_cast<int>(tvWait)); | |
1651 if (n < 0) { | |
1652 if (errno != EINTR) { | |
1653 LOG_E(LS_ERROR, EN, errno) << "epoll"; | |
1654 return false; | |
1655 } | |
1656 // Else ignore the error and keep going. If this EINTR was for one of the | |
1657 // signals managed by this PhysicalSocketServer, the | |
1658 // PosixSignalDeliveryDispatcher will be in the signaled state in the next | |
1659 // iteration. | |
1660 } else if (n == 0) { | |
1661 // If timeout, return success | |
1662 return true; | |
1663 } else { | |
1664 // We have signaled descriptors | |
1665 CritScope cr(&crit_); | |
1666 for (int i = 0; i < n; ++i) { | |
1667 const epoll_event& event = epoll_events_[i]; | |
1668 Dispatcher* pdispatcher = static_cast<Dispatcher*>(event.data.ptr); | |
1669 if (dispatchers_.find(pdispatcher) == dispatchers_.end()) { | |
1670 // The dispatcher for this socket no longer exists. | |
1671 continue; | |
1672 } | |
1673 | |
1674 bool readable = (event.events & (EPOLLIN | EPOLLPRI)); | |
1675 bool writable = (event.events & EPOLLOUT); | |
1676 bool check_error = (event.events & (EPOLLRDHUP | EPOLLERR | EPOLLHUP)); | |
1677 | |
1678 ProcessEvents(pdispatcher, readable, writable, check_error); | |
1679 } | |
1680 } | |
1681 | |
1682 if (static_cast<size_t>(n) == epoll_events_.size() && | |
1683 epoll_events_.size() < kMaxEpollEvents) { | |
1684 // We used the complete space to receive events, increase size for future | |
1685 // iterations. | |
1686 epoll_events_.resize(std::max(epoll_events_.size() * 2, kMaxEpollEvents)); | |
1687 } | |
1688 | |
1689 if (cmsWait != kForever) { | |
1690 tvWait = TimeDiff(tvStop, TimeMillis()); | |
1691 if (tvWait < 0) { | |
1692 // Return success on timeout. | |
1693 return true; | |
1694 } | |
1695 } | |
1696 } | |
1697 | |
1698 return true; | |
1699 } | |
1700 | |
1701 bool PhysicalSocketServer::WaitPoll(int cmsWait, Dispatcher* dispatcher) { | |
1702 RTC_DCHECK(dispatcher); | |
1703 int64_t tvWait = -1; | |
1704 int64_t tvStop = -1; | |
1705 if (cmsWait != kForever) { | |
1706 tvWait = cmsWait; | |
1707 tvStop = TimeAfter(cmsWait); | |
1708 } | |
1709 | |
1710 fWait_ = true; | |
1711 | |
1712 struct pollfd fds = {0}; | |
1713 int fd = dispatcher->GetDescriptor(); | |
1714 fds.fd = fd; | |
1715 | |
1716 while (fWait_) { | |
1717 uint32_t ff = dispatcher->GetRequestedEvents(); | |
1718 fds.events = 0; | |
1719 if (ff & (DE_READ | DE_ACCEPT)) { | |
1720 fds.events |= POLLIN; | |
1721 } | |
1722 if (ff & (DE_WRITE | DE_CONNECT)) { | |
1723 fds.events |= POLLOUT; | |
1724 } | |
1725 fds.revents = 0; | |
1726 | |
1727 // Wait then call handlers as appropriate | |
1728 // < 0 means error | |
1729 // 0 means timeout | |
1730 // > 0 means count of descriptors ready | |
1731 int n = poll(&fds, 1, static_cast<int>(tvWait)); | |
1732 if (n < 0) { | |
1733 if (errno != EINTR) { | |
1734 LOG_E(LS_ERROR, EN, errno) << "poll"; | |
1735 return false; | |
1736 } | |
1737 // Else ignore the error and keep going. If this EINTR was for one of the | |
1738 // signals managed by this PhysicalSocketServer, the | |
1739 // PosixSignalDeliveryDispatcher will be in the signaled state in the next | |
1740 // iteration. | |
1741 } else if (n == 0) { | |
1742 // If timeout, return success | |
1743 return true; | |
1744 } else { | |
1745 // We have signaled descriptors (should only be the passed dispatcher). | |
1746 RTC_DCHECK_EQ(n, 1); | |
1747 RTC_DCHECK_EQ(fds.fd, fd); | |
1748 | |
1749 bool readable = (fds.revents & (POLLIN | POLLPRI)); | |
1750 bool writable = (fds.revents & POLLOUT); | |
1751 bool check_error = (fds.revents & (POLLRDHUP | POLLERR | POLLHUP)); | |
1752 | |
1753 ProcessEvents(dispatcher, readable, writable, check_error); | |
1754 } | |
1755 | |
1756 if (cmsWait != kForever) { | |
1757 tvWait = TimeDiff(tvStop, TimeMillis()); | |
1758 if (tvWait < 0) { | |
1759 // Return success on timeout. | |
1760 return true; | |
1761 } | |
1762 } | |
1763 } | |
1764 | |
1765 return true; | |
1766 } | |
1767 | |
1768 #endif // WEBRTC_USE_EPOLL | |
1769 | |
1770 static void GlobalSignalHandler(int signum) { | |
1771 PosixSignalHandler::Instance()->OnPosixSignalReceived(signum); | |
1772 } | |
1773 | |
1774 bool PhysicalSocketServer::SetPosixSignalHandler(int signum, | |
1775 void (*handler)(int)) { | |
1776 // If handler is SIG_IGN or SIG_DFL then clear our user-level handler, | |
1777 // otherwise set one. | |
1778 if (handler == SIG_IGN || handler == SIG_DFL) { | |
1779 if (!InstallSignal(signum, handler)) { | |
1780 return false; | |
1781 } | |
1782 if (signal_dispatcher_) { | |
1783 signal_dispatcher_->ClearHandler(signum); | |
1784 if (!signal_dispatcher_->HasHandlers()) { | |
1785 signal_dispatcher_.reset(); | |
1786 } | |
1787 } | |
1788 } else { | |
1789 if (!signal_dispatcher_) { | |
1790 signal_dispatcher_.reset(new PosixSignalDispatcher(this)); | |
1791 } | |
1792 signal_dispatcher_->SetHandler(signum, handler); | |
1793 if (!InstallSignal(signum, &GlobalSignalHandler)) { | |
1794 return false; | |
1795 } | |
1796 } | |
1797 return true; | |
1798 } | |
1799 | |
1800 Dispatcher* PhysicalSocketServer::signal_dispatcher() { | |
1801 return signal_dispatcher_.get(); | |
1802 } | |
1803 | |
1804 bool PhysicalSocketServer::InstallSignal(int signum, void (*handler)(int)) { | |
1805 struct sigaction act; | |
1806 // It doesn't really matter what we set this mask to. | |
1807 if (sigemptyset(&act.sa_mask) != 0) { | |
1808 LOG_ERR(LS_ERROR) << "Couldn't set mask"; | |
1809 return false; | |
1810 } | |
1811 act.sa_handler = handler; | |
1812 #if !defined(__native_client__) | |
1813 // Use SA_RESTART so that our syscalls don't get EINTR, since we don't need it | |
1814 // and it's a nuisance. Though some syscalls still return EINTR and there's no | |
1815 // real standard for which ones. :( | |
1816 act.sa_flags = SA_RESTART; | |
1817 #else | |
1818 act.sa_flags = 0; | |
1819 #endif | |
1820 if (sigaction(signum, &act, nullptr) != 0) { | |
1821 LOG_ERR(LS_ERROR) << "Couldn't set sigaction"; | |
1822 return false; | |
1823 } | |
1824 return true; | |
1825 } | |
1826 #endif // WEBRTC_POSIX | |
1827 | |
1828 #if defined(WEBRTC_WIN) | |
1829 bool PhysicalSocketServer::Wait(int cmsWait, bool process_io) { | |
1830 int64_t cmsTotal = cmsWait; | |
1831 int64_t cmsElapsed = 0; | |
1832 int64_t msStart = Time(); | |
1833 | |
1834 fWait_ = true; | |
1835 while (fWait_) { | |
1836 std::vector<WSAEVENT> events; | |
1837 std::vector<Dispatcher *> event_owners; | |
1838 | |
1839 events.push_back(socket_ev_); | |
1840 | |
1841 { | |
1842 CritScope cr(&crit_); | |
1843 // TODO(jbauch): Support re-entrant waiting. | |
1844 RTC_DCHECK(!processing_dispatchers_); | |
1845 | |
1846 // Calling "CheckSignalClose" might remove a closed dispatcher from the | |
1847 // set. This must be deferred to prevent invalidating the iterator. | |
1848 processing_dispatchers_ = true; | |
1849 for (Dispatcher* disp : dispatchers_) { | |
1850 if (!process_io && (disp != signal_wakeup_)) | |
1851 continue; | |
1852 SOCKET s = disp->GetSocket(); | |
1853 if (disp->CheckSignalClose()) { | |
1854 // We just signalled close, don't poll this socket | |
1855 } else if (s != INVALID_SOCKET) { | |
1856 WSAEventSelect(s, | |
1857 events[0], | |
1858 FlagsToEvents(disp->GetRequestedEvents())); | |
1859 } else { | |
1860 events.push_back(disp->GetWSAEvent()); | |
1861 event_owners.push_back(disp); | |
1862 } | |
1863 } | |
1864 | |
1865 processing_dispatchers_ = false; | |
1866 // Process deferred dispatchers that have been added/removed while the | |
1867 // events were handled above. | |
1868 AddRemovePendingDispatchers(); | |
1869 } | |
1870 | |
1871 // Which is shorter, the delay wait or the asked wait? | |
1872 | |
1873 int64_t cmsNext; | |
1874 if (cmsWait == kForever) { | |
1875 cmsNext = cmsWait; | |
1876 } else { | |
1877 cmsNext = std::max<int64_t>(0, cmsTotal - cmsElapsed); | |
1878 } | |
1879 | |
1880 // Wait for one of the events to signal | |
1881 DWORD dw = WSAWaitForMultipleEvents(static_cast<DWORD>(events.size()), | |
1882 &events[0], | |
1883 false, | |
1884 static_cast<DWORD>(cmsNext), | |
1885 false); | |
1886 | |
1887 if (dw == WSA_WAIT_FAILED) { | |
1888 // Failed? | |
1889 // TODO(pthatcher): need a better strategy than this! | |
1890 WSAGetLastError(); | |
1891 RTC_NOTREACHED(); | |
1892 return false; | |
1893 } else if (dw == WSA_WAIT_TIMEOUT) { | |
1894 // Timeout? | |
1895 return true; | |
1896 } else { | |
1897 // Figure out which one it is and call it | |
1898 CritScope cr(&crit_); | |
1899 int index = dw - WSA_WAIT_EVENT_0; | |
1900 if (index > 0) { | |
1901 --index; // The first event is the socket event | |
1902 Dispatcher* disp = event_owners[index]; | |
1903 // The dispatcher could have been removed while waiting for events. | |
1904 if (dispatchers_.find(disp) != dispatchers_.end()) { | |
1905 disp->OnPreEvent(0); | |
1906 disp->OnEvent(0, 0); | |
1907 } | |
1908 } else if (process_io) { | |
1909 processing_dispatchers_ = true; | |
1910 for (Dispatcher* disp : dispatchers_) { | |
1911 SOCKET s = disp->GetSocket(); | |
1912 if (s == INVALID_SOCKET) | |
1913 continue; | |
1914 | |
1915 WSANETWORKEVENTS wsaEvents; | |
1916 int err = WSAEnumNetworkEvents(s, events[0], &wsaEvents); | |
1917 if (err == 0) { | |
1918 { | |
1919 if ((wsaEvents.lNetworkEvents & FD_READ) && | |
1920 wsaEvents.iErrorCode[FD_READ_BIT] != 0) { | |
1921 LOG(WARNING) << "PhysicalSocketServer got FD_READ_BIT error " | |
1922 << wsaEvents.iErrorCode[FD_READ_BIT]; | |
1923 } | |
1924 if ((wsaEvents.lNetworkEvents & FD_WRITE) && | |
1925 wsaEvents.iErrorCode[FD_WRITE_BIT] != 0) { | |
1926 LOG(WARNING) << "PhysicalSocketServer got FD_WRITE_BIT error " | |
1927 << wsaEvents.iErrorCode[FD_WRITE_BIT]; | |
1928 } | |
1929 if ((wsaEvents.lNetworkEvents & FD_CONNECT) && | |
1930 wsaEvents.iErrorCode[FD_CONNECT_BIT] != 0) { | |
1931 LOG(WARNING) << "PhysicalSocketServer got FD_CONNECT_BIT error " | |
1932 << wsaEvents.iErrorCode[FD_CONNECT_BIT]; | |
1933 } | |
1934 if ((wsaEvents.lNetworkEvents & FD_ACCEPT) && | |
1935 wsaEvents.iErrorCode[FD_ACCEPT_BIT] != 0) { | |
1936 LOG(WARNING) << "PhysicalSocketServer got FD_ACCEPT_BIT error " | |
1937 << wsaEvents.iErrorCode[FD_ACCEPT_BIT]; | |
1938 } | |
1939 if ((wsaEvents.lNetworkEvents & FD_CLOSE) && | |
1940 wsaEvents.iErrorCode[FD_CLOSE_BIT] != 0) { | |
1941 LOG(WARNING) << "PhysicalSocketServer got FD_CLOSE_BIT error " | |
1942 << wsaEvents.iErrorCode[FD_CLOSE_BIT]; | |
1943 } | |
1944 } | |
1945 uint32_t ff = 0; | |
1946 int errcode = 0; | |
1947 if (wsaEvents.lNetworkEvents & FD_READ) | |
1948 ff |= DE_READ; | |
1949 if (wsaEvents.lNetworkEvents & FD_WRITE) | |
1950 ff |= DE_WRITE; | |
1951 if (wsaEvents.lNetworkEvents & FD_CONNECT) { | |
1952 if (wsaEvents.iErrorCode[FD_CONNECT_BIT] == 0) { | |
1953 ff |= DE_CONNECT; | |
1954 } else { | |
1955 ff |= DE_CLOSE; | |
1956 errcode = wsaEvents.iErrorCode[FD_CONNECT_BIT]; | |
1957 } | |
1958 } | |
1959 if (wsaEvents.lNetworkEvents & FD_ACCEPT) | |
1960 ff |= DE_ACCEPT; | |
1961 if (wsaEvents.lNetworkEvents & FD_CLOSE) { | |
1962 ff |= DE_CLOSE; | |
1963 errcode = wsaEvents.iErrorCode[FD_CLOSE_BIT]; | |
1964 } | |
1965 if (ff != 0) { | |
1966 disp->OnPreEvent(ff); | |
1967 disp->OnEvent(ff, errcode); | |
1968 } | |
1969 } | |
1970 } | |
1971 | |
1972 processing_dispatchers_ = false; | |
1973 // Process deferred dispatchers that have been added/removed while the | |
1974 // events were handled above. | |
1975 AddRemovePendingDispatchers(); | |
1976 } | |
1977 | |
1978 // Reset the network event until new activity occurs | |
1979 WSAResetEvent(socket_ev_); | |
1980 } | |
1981 | |
1982 // Break? | |
1983 if (!fWait_) | |
1984 break; | |
1985 cmsElapsed = TimeSince(msStart); | |
1986 if ((cmsWait != kForever) && (cmsElapsed >= cmsWait)) { | |
1987 break; | |
1988 } | |
1989 } | |
1990 | |
1991 // Done | |
1992 return true; | |
1993 } | |
1994 #endif // WEBRTC_WIN | |
1995 | |
1996 } // namespace rtc | |
OLD | NEW |