Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(5)

Side by Side Diff: webrtc/base/physicalsocketserver.cc

Issue 1452903006: Keep listening if "accept" returns an invalid socket. (Closed) Base URL: https://chromium.googlesource.com/external/webrtc.git@master
Patch Set: Remove duplicate SocketDispatcher code. Created 5 years ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « webrtc/base/physicalsocketserver.h ('k') | webrtc/base/physicalsocketserver_unittest.cc » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 /* 1 /*
2 * Copyright 2004 The WebRTC Project Authors. All rights reserved. 2 * Copyright 2004 The WebRTC Project Authors. All rights reserved.
3 * 3 *
4 * Use of this source code is governed by a BSD-style license 4 * Use of this source code is governed by a BSD-style license
5 * that can be found in the LICENSE file in the root of the source 5 * that can be found in the LICENSE file in the root of the source
6 * tree. An additional intellectual property rights grant can be found 6 * tree. An additional intellectual property rights grant can be found
7 * in the file PATENTS. All contributing project authors may 7 * in the file PATENTS. All contributing project authors may
8 * be found in the AUTHORS file in the root of the source tree. 8 * be found in the AUTHORS file in the root of the source tree.
9 */ 9 */
10 10
(...skipping 26 matching lines...) Expand all
37 #endif 37 #endif
38 38
39 #include <algorithm> 39 #include <algorithm>
40 #include <map> 40 #include <map>
41 41
42 #include "webrtc/base/arraysize.h" 42 #include "webrtc/base/arraysize.h"
43 #include "webrtc/base/basictypes.h" 43 #include "webrtc/base/basictypes.h"
44 #include "webrtc/base/byteorder.h" 44 #include "webrtc/base/byteorder.h"
45 #include "webrtc/base/common.h" 45 #include "webrtc/base/common.h"
46 #include "webrtc/base/logging.h" 46 #include "webrtc/base/logging.h"
47 #include "webrtc/base/nethelpers.h"
48 #include "webrtc/base/physicalsocketserver.h" 47 #include "webrtc/base/physicalsocketserver.h"
49 #include "webrtc/base/timeutils.h" 48 #include "webrtc/base/timeutils.h"
50 #include "webrtc/base/winping.h" 49 #include "webrtc/base/winping.h"
51 #include "webrtc/base/win32socketinit.h" 50 #include "webrtc/base/win32socketinit.h"
52 51
53 // stm: this will tell us if we are on OSX 52 // stm: this will tell us if we are on OSX
54 #ifdef HAVE_CONFIG_H 53 #ifdef HAVE_CONFIG_H
55 #include "config.h" 54 #include "config.h"
56 #endif 55 #endif
57 56
(...skipping 32 matching lines...) Expand 10 before | Expand all | Expand 10 after
90 68, // Official minimum 89 68, // Official minimum
91 0, // End of list marker 90 0, // End of list marker
92 }; 91 };
93 92
94 static const int IP_HEADER_SIZE = 20u; 93 static const int IP_HEADER_SIZE = 20u;
95 static const int IPV6_HEADER_SIZE = 40u; 94 static const int IPV6_HEADER_SIZE = 40u;
96 static const int ICMP_HEADER_SIZE = 8u; 95 static const int ICMP_HEADER_SIZE = 8u;
97 static const int ICMP_PING_TIMEOUT_MILLIS = 10000u; 96 static const int ICMP_PING_TIMEOUT_MILLIS = 10000u;
98 #endif 97 #endif
99 98
100 class PhysicalSocket : public AsyncSocket, public sigslot::has_slots<> { 99 PhysicalSocket::PhysicalSocket(PhysicalSocketServer* ss, SOCKET s)
101 public: 100 : ss_(ss), s_(s), enabled_events_(0), error_(0),
102 PhysicalSocket(PhysicalSocketServer* ss, SOCKET s = INVALID_SOCKET) 101 state_((s == INVALID_SOCKET) ? CS_CLOSED : CS_CONNECTED),
103 : ss_(ss), s_(s), enabled_events_(0), error_(0), 102 resolver_(nullptr) {
104 state_((s == INVALID_SOCKET) ? CS_CLOSED : CS_CONNECTED), 103 #if defined(WEBRTC_WIN)
105 resolver_(NULL) { 104 // EnsureWinsockInit() ensures that winsock is initialized. The default
106 #if defined(WEBRTC_WIN) 105 // version of this function doesn't do anything because winsock is
107 // EnsureWinsockInit() ensures that winsock is initialized. The default 106 // initialized by constructor of a static object. If neccessary libjingle
108 // version of this function doesn't do anything because winsock is 107 // users can link it with a different version of this function by replacing
109 // initialized by constructor of a static object. If neccessary libjingle 108 // win32socketinit.cc. See win32socketinit.cc for more details.
110 // users can link it with a different version of this function by replacing 109 EnsureWinsockInit();
111 // win32socketinit.cc. See win32socketinit.cc for more details. 110 #endif
112 EnsureWinsockInit(); 111 if (s_ != INVALID_SOCKET) {
113 #endif 112 enabled_events_ = DE_READ | DE_WRITE;
114 if (s_ != INVALID_SOCKET) { 113
115 enabled_events_ = DE_READ | DE_WRITE; 114 int type = SOCK_STREAM;
116 115 socklen_t len = sizeof(type);
117 int type = SOCK_STREAM; 116 VERIFY(0 == getsockopt(s_, SOL_SOCKET, SO_TYPE, (SockOptArg)&type, &len));
118 socklen_t len = sizeof(type);
119 VERIFY(0 == getsockopt(s_, SOL_SOCKET, SO_TYPE, (SockOptArg)&type, &len));
120 udp_ = (SOCK_DGRAM == type);
121 }
122 }
123
124 ~PhysicalSocket() override {
125 Close();
126 }
127
128 // Creates the underlying OS socket (same as the "socket" function).
129 virtual bool Create(int family, int type) {
130 Close();
131 s_ = ::socket(family, type, 0);
132 udp_ = (SOCK_DGRAM == type); 117 udp_ = (SOCK_DGRAM == type);
133 UpdateLastError(); 118 }
134 if (udp_) 119 }
135 enabled_events_ = DE_READ | DE_WRITE; 120
136 return s_ != INVALID_SOCKET; 121 PhysicalSocket::~PhysicalSocket() {
137 } 122 Close();
138 123 }
139 SocketAddress GetLocalAddress() const override { 124
140 sockaddr_storage addr_storage = {0}; 125 bool PhysicalSocket::Create(int family, int type) {
141 socklen_t addrlen = sizeof(addr_storage); 126 Close();
142 sockaddr* addr = reinterpret_cast<sockaddr*>(&addr_storage); 127 s_ = ::socket(family, type, 0);
143 int result = ::getsockname(s_, addr, &addrlen); 128 udp_ = (SOCK_DGRAM == type);
144 SocketAddress address; 129 UpdateLastError();
145 if (result >= 0) { 130 if (udp_)
146 SocketAddressFromSockAddrStorage(addr_storage, &address); 131 enabled_events_ = DE_READ | DE_WRITE;
147 } else { 132 return s_ != INVALID_SOCKET;
148 LOG(LS_WARNING) << "GetLocalAddress: unable to get local addr, socket=" 133 }
149 << s_; 134
150 } 135 SocketAddress PhysicalSocket::GetLocalAddress() const {
151 return address; 136 sockaddr_storage addr_storage = {0};
152 } 137 socklen_t addrlen = sizeof(addr_storage);
153 138 sockaddr* addr = reinterpret_cast<sockaddr*>(&addr_storage);
154 SocketAddress GetRemoteAddress() const override { 139 int result = ::getsockname(s_, addr, &addrlen);
155 sockaddr_storage addr_storage = {0}; 140 SocketAddress address;
156 socklen_t addrlen = sizeof(addr_storage); 141 if (result >= 0) {
157 sockaddr* addr = reinterpret_cast<sockaddr*>(&addr_storage); 142 SocketAddressFromSockAddrStorage(addr_storage, &address);
158 int result = ::getpeername(s_, addr, &addrlen); 143 } else {
159 SocketAddress address; 144 LOG(LS_WARNING) << "GetLocalAddress: unable to get local addr, socket="
160 if (result >= 0) { 145 << s_;
161 SocketAddressFromSockAddrStorage(addr_storage, &address); 146 }
162 } else { 147 return address;
163 LOG(LS_WARNING) << "GetRemoteAddress: unable to get remote addr, socket=" 148 }
164 << s_; 149
165 } 150 SocketAddress PhysicalSocket::GetRemoteAddress() const {
166 return address; 151 sockaddr_storage addr_storage = {0};
167 } 152 socklen_t addrlen = sizeof(addr_storage);
168 153 sockaddr* addr = reinterpret_cast<sockaddr*>(&addr_storage);
169 int Bind(const SocketAddress& bind_addr) override { 154 int result = ::getpeername(s_, addr, &addrlen);
170 sockaddr_storage addr_storage; 155 SocketAddress address;
171 size_t len = bind_addr.ToSockAddrStorage(&addr_storage); 156 if (result >= 0) {
172 sockaddr* addr = reinterpret_cast<sockaddr*>(&addr_storage); 157 SocketAddressFromSockAddrStorage(addr_storage, &address);
173 int err = ::bind(s_, addr, static_cast<int>(len)); 158 } else {
174 UpdateLastError(); 159 LOG(LS_WARNING) << "GetRemoteAddress: unable to get remote addr, socket="
160 << s_;
161 }
162 return address;
163 }
164
165 int PhysicalSocket::Bind(const SocketAddress& bind_addr) {
166 sockaddr_storage addr_storage;
167 size_t len = bind_addr.ToSockAddrStorage(&addr_storage);
168 sockaddr* addr = reinterpret_cast<sockaddr*>(&addr_storage);
169 int err = ::bind(s_, addr, static_cast<int>(len));
170 UpdateLastError();
175 #if !defined(NDEBUG) 171 #if !defined(NDEBUG)
176 if (0 == err) { 172 if (0 == err) {
177 dbg_addr_ = "Bound @ "; 173 dbg_addr_ = "Bound @ ";
178 dbg_addr_.append(GetLocalAddress().ToString()); 174 dbg_addr_.append(GetLocalAddress().ToString());
179 } 175 }
180 #endif 176 #endif
181 return err; 177 return err;
182 } 178 }
183 179
184 int Connect(const SocketAddress& addr) override { 180 int PhysicalSocket::Connect(const SocketAddress& addr) {
185 // TODO: Implicit creation is required to reconnect... 181 // TODO: Implicit creation is required to reconnect...
186 // ...but should we make it more explicit? 182 // ...but should we make it more explicit?
187 if (state_ != CS_CLOSED) { 183 if (state_ != CS_CLOSED) {
188 SetError(EALREADY); 184 SetError(EALREADY);
189 return SOCKET_ERROR; 185 return SOCKET_ERROR;
190 } 186 }
191 if (addr.IsUnresolvedIP()) { 187 if (addr.IsUnresolvedIP()) {
192 LOG(LS_VERBOSE) << "Resolving addr in PhysicalSocket::Connect"; 188 LOG(LS_VERBOSE) << "Resolving addr in PhysicalSocket::Connect";
193 resolver_ = new AsyncResolver(); 189 resolver_ = new AsyncResolver();
194 resolver_->SignalDone.connect(this, &PhysicalSocket::OnResolveResult); 190 resolver_->SignalDone.connect(this, &PhysicalSocket::OnResolveResult);
195 resolver_->Start(addr); 191 resolver_->Start(addr);
196 state_ = CS_CONNECTING; 192 state_ = CS_CONNECTING;
193 return 0;
194 }
195
196 return DoConnect(addr);
197 }
198
199 int PhysicalSocket::DoConnect(const SocketAddress& connect_addr) {
200 if ((s_ == INVALID_SOCKET) &&
201 !Create(connect_addr.family(), SOCK_STREAM)) {
202 return SOCKET_ERROR;
203 }
204 sockaddr_storage addr_storage;
205 size_t len = connect_addr.ToSockAddrStorage(&addr_storage);
206 sockaddr* addr = reinterpret_cast<sockaddr*>(&addr_storage);
207 int err = ::connect(s_, addr, static_cast<int>(len));
208 UpdateLastError();
209 if (err == 0) {
210 state_ = CS_CONNECTED;
211 } else if (IsBlockingError(GetError())) {
212 state_ = CS_CONNECTING;
213 enabled_events_ |= DE_CONNECT;
214 } else {
215 return SOCKET_ERROR;
216 }
217
218 enabled_events_ |= DE_READ | DE_WRITE;
219 return 0;
220 }
221
222 int PhysicalSocket::GetError() const {
223 CritScope cs(&crit_);
224 return error_;
225 }
226
227 void PhysicalSocket::SetError(int error) {
228 CritScope cs(&crit_);
229 error_ = error;
230 }
231
232 AsyncSocket::ConnState PhysicalSocket::GetState() const {
233 return state_;
234 }
235
236 int PhysicalSocket::GetOption(Option opt, int* value) {
237 int slevel;
238 int sopt;
239 if (TranslateOption(opt, &slevel, &sopt) == -1)
240 return -1;
241 socklen_t optlen = sizeof(*value);
242 int ret = ::getsockopt(s_, slevel, sopt, (SockOptArg)value, &optlen);
243 if (ret != -1 && opt == OPT_DONTFRAGMENT) {
244 #if defined(WEBRTC_LINUX) && !defined(WEBRTC_ANDROID)
245 *value = (*value != IP_PMTUDISC_DONT) ? 1 : 0;
246 #endif
247 }
248 return ret;
249 }
250
251 int PhysicalSocket::SetOption(Option opt, int value) {
252 int slevel;
253 int sopt;
254 if (TranslateOption(opt, &slevel, &sopt) == -1)
255 return -1;
256 if (opt == OPT_DONTFRAGMENT) {
257 #if defined(WEBRTC_LINUX) && !defined(WEBRTC_ANDROID)
258 value = (value) ? IP_PMTUDISC_DO : IP_PMTUDISC_DONT;
259 #endif
260 }
261 return ::setsockopt(s_, slevel, sopt, (SockOptArg)&value, sizeof(value));
262 }
263
264 int PhysicalSocket::Send(const void* pv, size_t cb) {
265 int sent = ::send(s_, reinterpret_cast<const char *>(pv), (int)cb,
266 #if defined(WEBRTC_LINUX) && !defined(WEBRTC_ANDROID)
267 // Suppress SIGPIPE. Without this, attempting to send on a socket whose
268 // other end is closed will result in a SIGPIPE signal being raised to
269 // our process, which by default will terminate the process, which we
270 // don't want. By specifying this flag, we'll just get the error EPIPE
271 // instead and can handle the error gracefully.
272 MSG_NOSIGNAL
273 #else
274 0
275 #endif
276 );
277 UpdateLastError();
278 MaybeRemapSendError();
279 // We have seen minidumps where this may be false.
280 ASSERT(sent <= static_cast<int>(cb));
281 if ((sent < 0) && IsBlockingError(GetError())) {
282 enabled_events_ |= DE_WRITE;
283 }
284 return sent;
285 }
286
287 int PhysicalSocket::SendTo(const void* buffer,
288 size_t length,
289 const SocketAddress& addr) {
290 sockaddr_storage saddr;
291 size_t len = addr.ToSockAddrStorage(&saddr);
292 int sent = ::sendto(
293 s_, static_cast<const char *>(buffer), static_cast<int>(length),
294 #if defined(WEBRTC_LINUX) && !defined(WEBRTC_ANDROID)
295 // Suppress SIGPIPE. See above for explanation.
296 MSG_NOSIGNAL,
297 #else
298 0,
299 #endif
300 reinterpret_cast<sockaddr*>(&saddr), static_cast<int>(len));
301 UpdateLastError();
302 MaybeRemapSendError();
303 // We have seen minidumps where this may be false.
304 ASSERT(sent <= static_cast<int>(length));
305 if ((sent < 0) && IsBlockingError(GetError())) {
306 enabled_events_ |= DE_WRITE;
307 }
308 return sent;
309 }
310
311 int PhysicalSocket::Recv(void* buffer, size_t length) {
312 int received = ::recv(s_, static_cast<char*>(buffer),
313 static_cast<int>(length), 0);
314 if ((received == 0) && (length != 0)) {
315 // Note: on graceful shutdown, recv can return 0. In this case, we
316 // pretend it is blocking, and then signal close, so that simplifying
317 // assumptions can be made about Recv.
318 LOG(LS_WARNING) << "EOF from socket; deferring close event";
319 // Must turn this back on so that the select() loop will notice the close
320 // event.
321 enabled_events_ |= DE_READ;
322 SetError(EWOULDBLOCK);
323 return SOCKET_ERROR;
324 }
325 UpdateLastError();
326 int error = GetError();
327 bool success = (received >= 0) || IsBlockingError(error);
328 if (udp_ || success) {
329 enabled_events_ |= DE_READ;
330 }
331 if (!success) {
332 LOG_F(LS_VERBOSE) << "Error = " << error;
333 }
334 return received;
335 }
336
337 int PhysicalSocket::RecvFrom(void* buffer,
338 size_t length,
339 SocketAddress* out_addr) {
340 sockaddr_storage addr_storage;
341 socklen_t addr_len = sizeof(addr_storage);
342 sockaddr* addr = reinterpret_cast<sockaddr*>(&addr_storage);
343 int received = ::recvfrom(s_, static_cast<char*>(buffer),
344 static_cast<int>(length), 0, addr, &addr_len);
345 UpdateLastError();
346 if ((received >= 0) && (out_addr != nullptr))
347 SocketAddressFromSockAddrStorage(addr_storage, out_addr);
348 int error = GetError();
349 bool success = (received >= 0) || IsBlockingError(error);
350 if (udp_ || success) {
351 enabled_events_ |= DE_READ;
352 }
353 if (!success) {
354 LOG_F(LS_VERBOSE) << "Error = " << error;
355 }
356 return received;
357 }
358
359 int PhysicalSocket::Listen(int backlog) {
360 int err = ::listen(s_, backlog);
361 UpdateLastError();
362 if (err == 0) {
363 state_ = CS_CONNECTING;
364 enabled_events_ |= DE_ACCEPT;
365 #if !defined(NDEBUG)
366 dbg_addr_ = "Listening @ ";
367 dbg_addr_.append(GetLocalAddress().ToString());
368 #endif
369 }
370 return err;
371 }
372
373 AsyncSocket* PhysicalSocket::Accept(SocketAddress* out_addr) {
374 // Always re-subscribe DE_ACCEPT to make sure new incoming connections will
375 // trigger an event even if DoAccept returns an error here.
376 enabled_events_ |= DE_ACCEPT;
377 sockaddr_storage addr_storage;
378 socklen_t addr_len = sizeof(addr_storage);
379 sockaddr* addr = reinterpret_cast<sockaddr*>(&addr_storage);
380 SOCKET s = DoAccept(s_, addr, &addr_len);
381 UpdateLastError();
382 if (s == INVALID_SOCKET)
383 return nullptr;
384 if (out_addr != nullptr)
385 SocketAddressFromSockAddrStorage(addr_storage, out_addr);
386 return ss_->WrapSocket(s);
387 }
388
389 int PhysicalSocket::Close() {
390 if (s_ == INVALID_SOCKET)
391 return 0;
392 int err = ::closesocket(s_);
393 UpdateLastError();
394 s_ = INVALID_SOCKET;
395 state_ = CS_CLOSED;
396 enabled_events_ = 0;
397 if (resolver_) {
398 resolver_->Destroy(false);
399 resolver_ = nullptr;
400 }
401 return err;
402 }
403
404 int PhysicalSocket::EstimateMTU(uint16_t* mtu) {
405 SocketAddress addr = GetRemoteAddress();
406 if (addr.IsAnyIP()) {
407 SetError(ENOTCONN);
408 return -1;
409 }
410
411 #if defined(WEBRTC_WIN)
412 // Gets the interface MTU (TTL=1) for the interface used to reach |addr|.
413 WinPing ping;
414 if (!ping.IsValid()) {
415 SetError(EINVAL); // can't think of a better error ID
416 return -1;
417 }
418 int header_size = ICMP_HEADER_SIZE;
419 if (addr.family() == AF_INET6) {
420 header_size += IPV6_HEADER_SIZE;
421 } else if (addr.family() == AF_INET) {
422 header_size += IP_HEADER_SIZE;
423 }
424
425 for (int level = 0; PACKET_MAXIMUMS[level + 1] > 0; ++level) {
426 int32_t size = PACKET_MAXIMUMS[level] - header_size;
427 WinPing::PingResult result = ping.Ping(addr.ipaddr(), size,
428 ICMP_PING_TIMEOUT_MILLIS,
429 1, false);
430 if (result == WinPing::PING_FAIL) {
431 SetError(EINVAL); // can't think of a better error ID
432 return -1;
433 } else if (result != WinPing::PING_TOO_LARGE) {
434 *mtu = PACKET_MAXIMUMS[level];
197 return 0; 435 return 0;
198 } 436 }
199 437 }
200 return DoConnect(addr); 438
201 } 439 ASSERT(false);
202 440 return -1;
203 int DoConnect(const SocketAddress& connect_addr) { 441 #elif defined(WEBRTC_MAC)
204 if ((s_ == INVALID_SOCKET) && 442 // No simple way to do this on Mac OS X.
205 !Create(connect_addr.family(), SOCK_STREAM)) { 443 // SIOCGIFMTU would work if we knew which interface would be used, but
206 return SOCKET_ERROR; 444 // figuring that out is pretty complicated. For now we'll return an error
445 // and let the caller pick a default MTU.
446 SetError(EINVAL);
447 return -1;
448 #elif defined(WEBRTC_LINUX)
449 // Gets the path MTU.
450 int value;
451 socklen_t vlen = sizeof(value);
452 int err = getsockopt(s_, IPPROTO_IP, IP_MTU, &value, &vlen);
453 if (err < 0) {
454 UpdateLastError();
455 return err;
456 }
457
458 ASSERT((0 <= value) && (value <= 65536));
459 *mtu = value;
460 return 0;
461 #elif defined(__native_client__)
462 // Most socket operations, including this, will fail in NaCl's sandbox.
463 error_ = EACCES;
464 return -1;
465 #endif
466 }
467
468
469 SOCKET PhysicalSocket::DoAccept(SOCKET socket,
470 sockaddr* addr,
471 socklen_t* addrlen) {
472 return ::accept(socket, addr, addrlen);
473 }
474
475 void PhysicalSocket::OnResolveResult(AsyncResolverInterface* resolver) {
476 if (resolver != resolver_) {
477 return;
478 }
479
480 int error = resolver_->GetError();
481 if (error == 0) {
482 error = DoConnect(resolver_->address());
483 } else {
484 Close();
485 }
486
487 if (error) {
488 SetError(error);
489 SignalCloseEvent(this, error);
490 }
491 }
492
493 void PhysicalSocket::UpdateLastError() {
494 SetError(LAST_SYSTEM_ERROR);
495 }
496
497 void PhysicalSocket::MaybeRemapSendError() {
498 #if defined(WEBRTC_MAC)
499 // https://developer.apple.com/library/mac/documentation/Darwin/
500 // Reference/ManPages/man2/sendto.2.html
501 // ENOBUFS - The output queue for a network interface is full.
502 // This generally indicates that the interface has stopped sending,
503 // but may be caused by transient congestion.
504 if (GetError() == ENOBUFS) {
505 SetError(EWOULDBLOCK);
506 }
507 #endif
508 }
509
510 int PhysicalSocket::TranslateOption(Option opt, int* slevel, int* sopt) {
511 switch (opt) {
512 case OPT_DONTFRAGMENT:
513 #if defined(WEBRTC_WIN)
514 *slevel = IPPROTO_IP;
515 *sopt = IP_DONTFRAGMENT;
516 break;
517 #elif defined(WEBRTC_MAC) || defined(BSD) || defined(__native_client__)
518 LOG(LS_WARNING) << "Socket::OPT_DONTFRAGMENT not supported.";
519 return -1;
520 #elif defined(WEBRTC_POSIX)
521 *slevel = IPPROTO_IP;
522 *sopt = IP_MTU_DISCOVER;
523 break;
524 #endif
525 case OPT_RCVBUF:
526 *slevel = SOL_SOCKET;
527 *sopt = SO_RCVBUF;
528 break;
529 case OPT_SNDBUF:
530 *slevel = SOL_SOCKET;
531 *sopt = SO_SNDBUF;
532 break;
533 case OPT_NODELAY:
534 *slevel = IPPROTO_TCP;
535 *sopt = TCP_NODELAY;
536 break;
537 case OPT_DSCP:
538 LOG(LS_WARNING) << "Socket::OPT_DSCP not supported.";
539 return -1;
540 case OPT_RTP_SENDTIME_EXTN_ID:
541 return -1; // No logging is necessary as this not a OS socket option.
542 default:
543 ASSERT(false);
544 return -1;
545 }
546 return 0;
547 }
548
549 SocketDispatcher::SocketDispatcher(PhysicalSocketServer *ss)
550 : PhysicalSocket(ss)
551 #if defined(WEBRTC_WIN)
552 , id_(0), signal_close_(false)
joachim 2015/12/07 21:15:30 I didn't know how to properly format this accordin
pthatcher1 2015/12/08 20:20:43 Maybe this? #if defined(WEBRTC_WIN) : PhysicalS
553 #endif
554 {
555 }
556
557 SocketDispatcher::SocketDispatcher(SOCKET s, PhysicalSocketServer *ss)
558 : PhysicalSocket(ss, s)
559 #if defined(WEBRTC_WIN)
560 , id_(0), signal_close_(false)
joachim 2015/12/07 21:15:30 I didn't know how to properly format this accordin
561 #endif
562 {
563 }
564
565 SocketDispatcher::~SocketDispatcher() {
566 Close();
567 }
568
569 bool SocketDispatcher::Initialize() {
570 ASSERT(s_ != INVALID_SOCKET);
571 #if defined(WEBRTC_WIN)
572 // Must be a non-blocking
573 u_long argp = 1;
574 ioctlsocket(s_, FIONBIO, &argp);
575 #endif
576 ss_->Add(this);
577 #if defined(WEBRTC_POSIX)
578 fcntl(s_, F_SETFL, fcntl(s_, F_GETFL, 0) | O_NONBLOCK);
579 #endif
580 return true;
581 }
582
583 bool SocketDispatcher::Create(int type) {
584 return Create(AF_INET, type);
585 }
586
587 bool SocketDispatcher::Create(int family, int type) {
588 // Change the socket to be non-blocking.
589 if (!PhysicalSocket::Create(family, type))
590 return false;
591
592 if (!Initialize())
593 return false;
594
595 #if defined(WEBRTC_WIN)
596 do { id_ = ++next_id_; } while (id_ == 0);
597 #endif
598 return true;
599 }
600
601 #if defined(WEBRTC_WIN)
602
603 WSAEVENT SocketDispatcher::GetWSAEvent() {
604 return WSA_INVALID_EVENT;
605 }
606
607 SOCKET SocketDispatcher::GetSocket() {
608 return s_;
609 }
610
611 bool SocketDispatcher::CheckSignalClose() {
612 if (!signal_close_)
613 return false;
614
615 char ch;
616 if (recv(s_, &ch, 1, MSG_PEEK) > 0)
617 return false;
618
619 state_ = CS_CLOSED;
620 signal_close_ = false;
621 SignalCloseEvent(this, signal_err_);
622 return true;
623 }
624
625 int SocketDispatcher::next_id_ = 0;
626
627 #elif defined(WEBRTC_POSIX)
628
629 int SocketDispatcher::GetDescriptor() {
630 return s_;
631 }
632
633 bool SocketDispatcher::IsDescriptorClosed() {
634 // We don't have a reliable way of distinguishing end-of-stream
635 // from readability. So test on each readable call. Is this
636 // inefficient? Probably.
637 char ch;
638 ssize_t res = ::recv(s_, &ch, 1, MSG_PEEK);
639 if (res > 0) {
640 // Data available, so not closed.
641 return false;
642 } else if (res == 0) {
643 // EOF, so closed.
644 return true;
645 } else { // error
646 switch (errno) {
647 // Returned if we've already closed s_.
648 case EBADF:
649 // Returned during ungraceful peer shutdown.
650 case ECONNRESET:
651 return true;
652 default:
653 // Assume that all other errors are just blocking errors, meaning the
654 // connection is still good but we just can't read from it right now.
655 // This should only happen when connecting (and at most once), because
656 // in all other cases this function is only called if the file
657 // descriptor is already known to be in the readable state. However,
658 // it's not necessary a problem if we spuriously interpret a
659 // "connection lost"-type error as a blocking error, because typically
660 // the next recv() will get EOF, so we'll still eventually notice that
661 // the socket is closed.
662 LOG_ERR(LS_WARNING) << "Assuming benign blocking error";
663 return false;
207 } 664 }
208 sockaddr_storage addr_storage; 665 }
209 size_t len = connect_addr.ToSockAddrStorage(&addr_storage); 666 }
210 sockaddr* addr = reinterpret_cast<sockaddr*>(&addr_storage); 667
211 int err = ::connect(s_, addr, static_cast<int>(len)); 668 #endif // WEBRTC_POSIX
212 UpdateLastError(); 669
213 if (err == 0) { 670 uint32_t SocketDispatcher::GetRequestedEvents() {
214 state_ = CS_CONNECTED; 671 return enabled_events_;
215 } else if (IsBlockingError(GetError())) { 672 }
216 state_ = CS_CONNECTING; 673
217 enabled_events_ |= DE_CONNECT; 674 void SocketDispatcher::OnPreEvent(uint32_t ff) {
218 } else { 675 if ((ff & DE_CONNECT) != 0)
219 return SOCKET_ERROR; 676 state_ = CS_CONNECTED;
220 } 677
221 678 #if defined(WEBRTC_WIN)
222 enabled_events_ |= DE_READ | DE_WRITE; 679 // We set CS_CLOSED from CheckSignalClose.
680 #elif defined(WEBRTC_POSIX)
681 if ((ff & DE_CLOSE) != 0)
682 state_ = CS_CLOSED;
683 #endif
684 }
685
686 #if defined(WEBRTC_WIN)
687 void SocketDispatcher::OnEvent(uint32_t ff, int err) {
688 int cache_id = id_;
689 // Make sure we deliver connect/accept first. Otherwise, consumers may see
690 // something like a READ followed by a CONNECT, which would be odd.
691 if (((ff & DE_CONNECT) != 0) && (id_ == cache_id)) {
692 if (ff != DE_CONNECT)
693 LOG(LS_VERBOSE) << "Signalled with DE_CONNECT: " << ff;
694 enabled_events_ &= ~DE_CONNECT;
695 #if !defined(NDEBUG)
696 dbg_addr_ = "Connected @ ";
697 dbg_addr_.append(GetRemoteAddress().ToString());
698 #endif
699 SignalConnectEvent(this);
700 }
701 if (((ff & DE_ACCEPT) != 0) && (id_ == cache_id)) {
702 enabled_events_ &= ~DE_ACCEPT;
703 SignalReadEvent(this);
704 }
705 if ((ff & DE_READ) != 0) {
706 enabled_events_ &= ~DE_READ;
707 SignalReadEvent(this);
708 }
709 if (((ff & DE_WRITE) != 0) && (id_ == cache_id)) {
710 enabled_events_ &= ~DE_WRITE;
711 SignalWriteEvent(this);
712 }
713 if (((ff & DE_CLOSE) != 0) && (id_ == cache_id)) {
714 signal_close_ = true;
715 signal_err_ = err;
716 }
717 }
718 #elif defined(WEBRTC_POSIX)
719 void SocketDispatcher::OnEvent(uint32_t ff, int err) {
720 // Make sure we deliver connect/accept first. Otherwise, consumers may see
721 // something like a READ followed by a CONNECT, which would be odd.
722 if ((ff & DE_CONNECT) != 0) {
723 enabled_events_ &= ~DE_CONNECT;
724 SignalConnectEvent(this);
725 }
726 if ((ff & DE_ACCEPT) != 0) {
727 enabled_events_ &= ~DE_ACCEPT;
728 SignalReadEvent(this);
729 }
730 if ((ff & DE_READ) != 0) {
731 enabled_events_ &= ~DE_READ;
732 SignalReadEvent(this);
733 }
734 if ((ff & DE_WRITE) != 0) {
735 enabled_events_ &= ~DE_WRITE;
736 SignalWriteEvent(this);
737 }
738 if ((ff & DE_CLOSE) != 0) {
739 // The socket is now dead to us, so stop checking it.
740 enabled_events_ = 0;
741 SignalCloseEvent(this, err);
742 }
743 }
744 #endif // WEBRTC_POSIX
745
746 int SocketDispatcher::Close() {
747 if (s_ == INVALID_SOCKET)
223 return 0; 748 return 0;
224 } 749
225 750 #if defined(WEBRTC_WIN)
226 int GetError() const override { 751 id_ = 0;
227 CritScope cs(&crit_); 752 signal_close_ = false;
228 return error_; 753 #endif
229 } 754 ss_->Remove(this);
230 755 return PhysicalSocket::Close();
231 void SetError(int error) override { 756 }
232 CritScope cs(&crit_);
233 error_ = error;
234 }
235
236 ConnState GetState() const override { return state_; }
237
238 int GetOption(Option opt, int* value) override {
239 int slevel;
240 int sopt;
241 if (TranslateOption(opt, &slevel, &sopt) == -1)
242 return -1;
243 socklen_t optlen = sizeof(*value);
244 int ret = ::getsockopt(s_, slevel, sopt, (SockOptArg)value, &optlen);
245 if (ret != -1 && opt == OPT_DONTFRAGMENT) {
246 #if defined(WEBRTC_LINUX) && !defined(WEBRTC_ANDROID)
247 *value = (*value != IP_PMTUDISC_DONT) ? 1 : 0;
248 #endif
249 }
250 return ret;
251 }
252
253 int SetOption(Option opt, int value) override {
254 int slevel;
255 int sopt;
256 if (TranslateOption(opt, &slevel, &sopt) == -1)
257 return -1;
258 if (opt == OPT_DONTFRAGMENT) {
259 #if defined(WEBRTC_LINUX) && !defined(WEBRTC_ANDROID)
260 value = (value) ? IP_PMTUDISC_DO : IP_PMTUDISC_DONT;
261 #endif
262 }
263 return ::setsockopt(s_, slevel, sopt, (SockOptArg)&value, sizeof(value));
264 }
265
266 int Send(const void* pv, size_t cb) override {
267 int sent = ::send(s_, reinterpret_cast<const char *>(pv), (int)cb,
268 #if defined(WEBRTC_LINUX) && !defined(WEBRTC_ANDROID)
269 // Suppress SIGPIPE. Without this, attempting to send on a socket whose
270 // other end is closed will result in a SIGPIPE signal being raised to
271 // our process, which by default will terminate the process, which we
272 // don't want. By specifying this flag, we'll just get the error EPIPE
273 // instead and can handle the error gracefully.
274 MSG_NOSIGNAL
275 #else
276 0
277 #endif
278 );
279 UpdateLastError();
280 MaybeRemapSendError();
281 // We have seen minidumps where this may be false.
282 ASSERT(sent <= static_cast<int>(cb));
283 if ((sent < 0) && IsBlockingError(GetError())) {
284 enabled_events_ |= DE_WRITE;
285 }
286 return sent;
287 }
288
289 int SendTo(const void* buffer,
290 size_t length,
291 const SocketAddress& addr) override {
292 sockaddr_storage saddr;
293 size_t len = addr.ToSockAddrStorage(&saddr);
294 int sent = ::sendto(
295 s_, static_cast<const char *>(buffer), static_cast<int>(length),
296 #if defined(WEBRTC_LINUX) && !defined(WEBRTC_ANDROID)
297 // Suppress SIGPIPE. See above for explanation.
298 MSG_NOSIGNAL,
299 #else
300 0,
301 #endif
302 reinterpret_cast<sockaddr*>(&saddr), static_cast<int>(len));
303 UpdateLastError();
304 MaybeRemapSendError();
305 // We have seen minidumps where this may be false.
306 ASSERT(sent <= static_cast<int>(length));
307 if ((sent < 0) && IsBlockingError(GetError())) {
308 enabled_events_ |= DE_WRITE;
309 }
310 return sent;
311 }
312
313 int Recv(void* buffer, size_t length) override {
314 int received = ::recv(s_, static_cast<char*>(buffer),
315 static_cast<int>(length), 0);
316 if ((received == 0) && (length != 0)) {
317 // Note: on graceful shutdown, recv can return 0. In this case, we
318 // pretend it is blocking, and then signal close, so that simplifying
319 // assumptions can be made about Recv.
320 LOG(LS_WARNING) << "EOF from socket; deferring close event";
321 // Must turn this back on so that the select() loop will notice the close
322 // event.
323 enabled_events_ |= DE_READ;
324 SetError(EWOULDBLOCK);
325 return SOCKET_ERROR;
326 }
327 UpdateLastError();
328 int error = GetError();
329 bool success = (received >= 0) || IsBlockingError(error);
330 if (udp_ || success) {
331 enabled_events_ |= DE_READ;
332 }
333 if (!success) {
334 LOG_F(LS_VERBOSE) << "Error = " << error;
335 }
336 return received;
337 }
338
339 int RecvFrom(void* buffer, size_t length, SocketAddress* out_addr) override {
340 sockaddr_storage addr_storage;
341 socklen_t addr_len = sizeof(addr_storage);
342 sockaddr* addr = reinterpret_cast<sockaddr*>(&addr_storage);
343 int received = ::recvfrom(s_, static_cast<char*>(buffer),
344 static_cast<int>(length), 0, addr, &addr_len);
345 UpdateLastError();
346 if ((received >= 0) && (out_addr != NULL))
347 SocketAddressFromSockAddrStorage(addr_storage, out_addr);
348 int error = GetError();
349 bool success = (received >= 0) || IsBlockingError(error);
350 if (udp_ || success) {
351 enabled_events_ |= DE_READ;
352 }
353 if (!success) {
354 LOG_F(LS_VERBOSE) << "Error = " << error;
355 }
356 return received;
357 }
358
359 int Listen(int backlog) override {
360 int err = ::listen(s_, backlog);
361 UpdateLastError();
362 if (err == 0) {
363 state_ = CS_CONNECTING;
364 enabled_events_ |= DE_ACCEPT;
365 #if !defined(NDEBUG)
366 dbg_addr_ = "Listening @ ";
367 dbg_addr_.append(GetLocalAddress().ToString());
368 #endif
369 }
370 return err;
371 }
372
373 AsyncSocket* Accept(SocketAddress* out_addr) override {
374 sockaddr_storage addr_storage;
375 socklen_t addr_len = sizeof(addr_storage);
376 sockaddr* addr = reinterpret_cast<sockaddr*>(&addr_storage);
377 SOCKET s = ::accept(s_, addr, &addr_len);
378 UpdateLastError();
379 if (s == INVALID_SOCKET)
380 return NULL;
381 enabled_events_ |= DE_ACCEPT;
382 if (out_addr != NULL)
383 SocketAddressFromSockAddrStorage(addr_storage, out_addr);
384 return ss_->WrapSocket(s);
385 }
386
387 int Close() override {
388 if (s_ == INVALID_SOCKET)
389 return 0;
390 int err = ::closesocket(s_);
391 UpdateLastError();
392 s_ = INVALID_SOCKET;
393 state_ = CS_CLOSED;
394 enabled_events_ = 0;
395 if (resolver_) {
396 resolver_->Destroy(false);
397 resolver_ = NULL;
398 }
399 return err;
400 }
401
402 int EstimateMTU(uint16_t* mtu) override {
403 SocketAddress addr = GetRemoteAddress();
404 if (addr.IsAnyIP()) {
405 SetError(ENOTCONN);
406 return -1;
407 }
408
409 #if defined(WEBRTC_WIN)
410 // Gets the interface MTU (TTL=1) for the interface used to reach |addr|.
411 WinPing ping;
412 if (!ping.IsValid()) {
413 SetError(EINVAL); // can't think of a better error ID
414 return -1;
415 }
416 int header_size = ICMP_HEADER_SIZE;
417 if (addr.family() == AF_INET6) {
418 header_size += IPV6_HEADER_SIZE;
419 } else if (addr.family() == AF_INET) {
420 header_size += IP_HEADER_SIZE;
421 }
422
423 for (int level = 0; PACKET_MAXIMUMS[level + 1] > 0; ++level) {
424 int32_t size = PACKET_MAXIMUMS[level] - header_size;
425 WinPing::PingResult result = ping.Ping(addr.ipaddr(), size,
426 ICMP_PING_TIMEOUT_MILLIS,
427 1, false);
428 if (result == WinPing::PING_FAIL) {
429 SetError(EINVAL); // can't think of a better error ID
430 return -1;
431 } else if (result != WinPing::PING_TOO_LARGE) {
432 *mtu = PACKET_MAXIMUMS[level];
433 return 0;
434 }
435 }
436
437 ASSERT(false);
438 return -1;
439 #elif defined(WEBRTC_MAC)
440 // No simple way to do this on Mac OS X.
441 // SIOCGIFMTU would work if we knew which interface would be used, but
442 // figuring that out is pretty complicated. For now we'll return an error
443 // and let the caller pick a default MTU.
444 SetError(EINVAL);
445 return -1;
446 #elif defined(WEBRTC_LINUX)
447 // Gets the path MTU.
448 int value;
449 socklen_t vlen = sizeof(value);
450 int err = getsockopt(s_, IPPROTO_IP, IP_MTU, &value, &vlen);
451 if (err < 0) {
452 UpdateLastError();
453 return err;
454 }
455
456 ASSERT((0 <= value) && (value <= 65536));
457 *mtu = value;
458 return 0;
459 #elif defined(__native_client__)
460 // Most socket operations, including this, will fail in NaCl's sandbox.
461 error_ = EACCES;
462 return -1;
463 #endif
464 }
465
466 SocketServer* socketserver() { return ss_; }
467
468 protected:
469 void OnResolveResult(AsyncResolverInterface* resolver) {
470 if (resolver != resolver_) {
471 return;
472 }
473
474 int error = resolver_->GetError();
475 if (error == 0) {
476 error = DoConnect(resolver_->address());
477 } else {
478 Close();
479 }
480
481 if (error) {
482 SetError(error);
483 SignalCloseEvent(this, error);
484 }
485 }
486
487 void UpdateLastError() {
488 SetError(LAST_SYSTEM_ERROR);
489 }
490
491 void MaybeRemapSendError() {
492 #if defined(WEBRTC_MAC)
493 // https://developer.apple.com/library/mac/documentation/Darwin/
494 // Reference/ManPages/man2/sendto.2.html
495 // ENOBUFS - The output queue for a network interface is full.
496 // This generally indicates that the interface has stopped sending,
497 // but may be caused by transient congestion.
498 if (GetError() == ENOBUFS) {
499 SetError(EWOULDBLOCK);
500 }
501 #endif
502 }
503
504 static int TranslateOption(Option opt, int* slevel, int* sopt) {
505 switch (opt) {
506 case OPT_DONTFRAGMENT:
507 #if defined(WEBRTC_WIN)
508 *slevel = IPPROTO_IP;
509 *sopt = IP_DONTFRAGMENT;
510 break;
511 #elif defined(WEBRTC_MAC) || defined(BSD) || defined(__native_client__)
512 LOG(LS_WARNING) << "Socket::OPT_DONTFRAGMENT not supported.";
513 return -1;
514 #elif defined(WEBRTC_POSIX)
515 *slevel = IPPROTO_IP;
516 *sopt = IP_MTU_DISCOVER;
517 break;
518 #endif
519 case OPT_RCVBUF:
520 *slevel = SOL_SOCKET;
521 *sopt = SO_RCVBUF;
522 break;
523 case OPT_SNDBUF:
524 *slevel = SOL_SOCKET;
525 *sopt = SO_SNDBUF;
526 break;
527 case OPT_NODELAY:
528 *slevel = IPPROTO_TCP;
529 *sopt = TCP_NODELAY;
530 break;
531 case OPT_DSCP:
532 LOG(LS_WARNING) << "Socket::OPT_DSCP not supported.";
533 return -1;
534 case OPT_RTP_SENDTIME_EXTN_ID:
535 return -1; // No logging is necessary as this not a OS socket option.
536 default:
537 ASSERT(false);
538 return -1;
539 }
540 return 0;
541 }
542
543 PhysicalSocketServer* ss_;
544 SOCKET s_;
545 uint8_t enabled_events_;
546 bool udp_;
547 int error_;
548 // Protects |error_| that is accessed from different threads.
549 mutable CriticalSection crit_;
550 ConnState state_;
551 AsyncResolver* resolver_;
552
553 #if !defined(NDEBUG)
554 std::string dbg_addr_;
555 #endif
556 };
557 757
558 #if defined(WEBRTC_POSIX) 758 #if defined(WEBRTC_POSIX)
559 class EventDispatcher : public Dispatcher { 759 class EventDispatcher : public Dispatcher {
560 public: 760 public:
561 EventDispatcher(PhysicalSocketServer* ss) : ss_(ss), fSignaled_(false) { 761 EventDispatcher(PhysicalSocketServer* ss) : ss_(ss), fSignaled_(false) {
562 if (pipe(afd_) < 0) 762 if (pipe(afd_) < 0)
563 LOG(LERROR) << "pipe failed"; 763 LOG(LERROR) << "pipe failed";
564 ss_->Add(this); 764 ss_->Add(this);
565 } 765 }
566 766
(...skipping 217 matching lines...) Expand 10 before | Expand all | Expand 10 after
784 } 984 }
785 985
786 private: 986 private:
787 typedef std::map<int, void (*)(int)> HandlerMap; 987 typedef std::map<int, void (*)(int)> HandlerMap;
788 988
789 HandlerMap handlers_; 989 HandlerMap handlers_;
790 // Our owner. 990 // Our owner.
791 PhysicalSocketServer *owner_; 991 PhysicalSocketServer *owner_;
792 }; 992 };
793 993
794 class SocketDispatcher : public Dispatcher, public PhysicalSocket {
795 public:
796 explicit SocketDispatcher(PhysicalSocketServer *ss) : PhysicalSocket(ss) {
797 }
798 SocketDispatcher(SOCKET s, PhysicalSocketServer *ss) : PhysicalSocket(ss, s) {
799 }
800
801 ~SocketDispatcher() override {
802 Close();
803 }
804
805 bool Initialize() {
806 ss_->Add(this);
807 fcntl(s_, F_SETFL, fcntl(s_, F_GETFL, 0) | O_NONBLOCK);
808 return true;
809 }
810
811 virtual bool Create(int type) {
812 return Create(AF_INET, type);
813 }
814
815 bool Create(int family, int type) override {
816 // Change the socket to be non-blocking.
817 if (!PhysicalSocket::Create(family, type))
818 return false;
819
820 return Initialize();
821 }
822
823 int GetDescriptor() override { return s_; }
824
825 bool IsDescriptorClosed() override {
826 // We don't have a reliable way of distinguishing end-of-stream
827 // from readability. So test on each readable call. Is this
828 // inefficient? Probably.
829 char ch;
830 ssize_t res = ::recv(s_, &ch, 1, MSG_PEEK);
831 if (res > 0) {
832 // Data available, so not closed.
833 return false;
834 } else if (res == 0) {
835 // EOF, so closed.
836 return true;
837 } else { // error
838 switch (errno) {
839 // Returned if we've already closed s_.
840 case EBADF:
841 // Returned during ungraceful peer shutdown.
842 case ECONNRESET:
843 return true;
844 default:
845 // Assume that all other errors are just blocking errors, meaning the
846 // connection is still good but we just can't read from it right now.
847 // This should only happen when connecting (and at most once), because
848 // in all other cases this function is only called if the file
849 // descriptor is already known to be in the readable state. However,
850 // it's not necessary a problem if we spuriously interpret a
851 // "connection lost"-type error as a blocking error, because typically
852 // the next recv() will get EOF, so we'll still eventually notice that
853 // the socket is closed.
854 LOG_ERR(LS_WARNING) << "Assuming benign blocking error";
855 return false;
856 }
857 }
858 }
859
860 uint32_t GetRequestedEvents() override { return enabled_events_; }
861
862 void OnPreEvent(uint32_t ff) override {
863 if ((ff & DE_CONNECT) != 0)
864 state_ = CS_CONNECTED;
865 if ((ff & DE_CLOSE) != 0)
866 state_ = CS_CLOSED;
867 }
868
869 void OnEvent(uint32_t ff, int err) override {
870 // Make sure we deliver connect/accept first. Otherwise, consumers may see
871 // something like a READ followed by a CONNECT, which would be odd.
872 if ((ff & DE_CONNECT) != 0) {
873 enabled_events_ &= ~DE_CONNECT;
874 SignalConnectEvent(this);
875 }
876 if ((ff & DE_ACCEPT) != 0) {
877 enabled_events_ &= ~DE_ACCEPT;
878 SignalReadEvent(this);
879 }
880 if ((ff & DE_READ) != 0) {
881 enabled_events_ &= ~DE_READ;
882 SignalReadEvent(this);
883 }
884 if ((ff & DE_WRITE) != 0) {
885 enabled_events_ &= ~DE_WRITE;
886 SignalWriteEvent(this);
887 }
888 if ((ff & DE_CLOSE) != 0) {
889 // The socket is now dead to us, so stop checking it.
890 enabled_events_ = 0;
891 SignalCloseEvent(this, err);
892 }
893 }
894
895 int Close() override {
896 if (s_ == INVALID_SOCKET)
897 return 0;
898
899 ss_->Remove(this);
900 return PhysicalSocket::Close();
901 }
902 };
903
904 class FileDispatcher: public Dispatcher, public AsyncFile { 994 class FileDispatcher: public Dispatcher, public AsyncFile {
905 public: 995 public:
906 FileDispatcher(int fd, PhysicalSocketServer *ss) : ss_(ss), fd_(fd) { 996 FileDispatcher(int fd, PhysicalSocketServer *ss) : ss_(ss), fd_(fd) {
907 set_readable(true); 997 set_readable(true);
908 998
909 ss_->Add(this); 999 ss_->Add(this);
910 1000
911 fcntl(fd_, F_SETFL, fcntl(fd_, F_GETFL, 0) | O_NONBLOCK); 1001 fcntl(fd_, F_SETFL, fcntl(fd_, F_GETFL, 0) | O_NONBLOCK);
912 } 1002 }
913 1003
(...skipping 93 matching lines...) Expand 10 before | Expand all | Expand 10 after
1007 virtual SOCKET GetSocket() { 1097 virtual SOCKET GetSocket() {
1008 return INVALID_SOCKET; 1098 return INVALID_SOCKET;
1009 } 1099 }
1010 1100
1011 virtual bool CheckSignalClose() { return false; } 1101 virtual bool CheckSignalClose() { return false; }
1012 1102
1013 private: 1103 private:
1014 PhysicalSocketServer* ss_; 1104 PhysicalSocketServer* ss_;
1015 WSAEVENT hev_; 1105 WSAEVENT hev_;
1016 }; 1106 };
1017
1018 class SocketDispatcher : public Dispatcher, public PhysicalSocket {
1019 public:
1020 static int next_id_;
1021 int id_;
1022 bool signal_close_;
1023 int signal_err_;
1024
1025 SocketDispatcher(PhysicalSocketServer* ss)
1026 : PhysicalSocket(ss),
1027 id_(0),
1028 signal_close_(false) {
1029 }
1030
1031 SocketDispatcher(SOCKET s, PhysicalSocketServer* ss)
1032 : PhysicalSocket(ss, s),
1033 id_(0),
1034 signal_close_(false) {
1035 }
1036
1037 virtual ~SocketDispatcher() {
1038 Close();
1039 }
1040
1041 bool Initialize() {
1042 ASSERT(s_ != INVALID_SOCKET);
1043 // Must be a non-blocking
1044 u_long argp = 1;
1045 ioctlsocket(s_, FIONBIO, &argp);
1046 ss_->Add(this);
1047 return true;
1048 }
1049
1050 virtual bool Create(int type) {
1051 return Create(AF_INET, type);
1052 }
1053
1054 virtual bool Create(int family, int type) {
1055 // Create socket
1056 if (!PhysicalSocket::Create(family, type))
1057 return false;
1058
1059 if (!Initialize())
1060 return false;
1061
1062 do { id_ = ++next_id_; } while (id_ == 0);
1063 return true;
1064 }
1065
1066 virtual int Close() {
1067 if (s_ == INVALID_SOCKET)
1068 return 0;
1069
1070 id_ = 0;
1071 signal_close_ = false;
1072 ss_->Remove(this);
1073 return PhysicalSocket::Close();
1074 }
1075
1076 virtual uint32_t GetRequestedEvents() { return enabled_events_; }
1077
1078 virtual void OnPreEvent(uint32_t ff) {
1079 if ((ff & DE_CONNECT) != 0)
1080 state_ = CS_CONNECTED;
1081 // We set CS_CLOSED from CheckSignalClose.
1082 }
1083
1084 virtual void OnEvent(uint32_t ff, int err) {
1085 int cache_id = id_;
1086 // Make sure we deliver connect/accept first. Otherwise, consumers may see
1087 // something like a READ followed by a CONNECT, which would be odd.
1088 if (((ff & DE_CONNECT) != 0) && (id_ == cache_id)) {
1089 if (ff != DE_CONNECT)
1090 LOG(LS_VERBOSE) << "Signalled with DE_CONNECT: " << ff;
1091 enabled_events_ &= ~DE_CONNECT;
1092 #if !defined(NDEBUG)
1093 dbg_addr_ = "Connected @ ";
1094 dbg_addr_.append(GetRemoteAddress().ToString());
1095 #endif
1096 SignalConnectEvent(this);
1097 }
1098 if (((ff & DE_ACCEPT) != 0) && (id_ == cache_id)) {
1099 enabled_events_ &= ~DE_ACCEPT;
1100 SignalReadEvent(this);
1101 }
1102 if ((ff & DE_READ) != 0) {
1103 enabled_events_ &= ~DE_READ;
1104 SignalReadEvent(this);
1105 }
1106 if (((ff & DE_WRITE) != 0) && (id_ == cache_id)) {
1107 enabled_events_ &= ~DE_WRITE;
1108 SignalWriteEvent(this);
1109 }
1110 if (((ff & DE_CLOSE) != 0) && (id_ == cache_id)) {
1111 signal_close_ = true;
1112 signal_err_ = err;
1113 }
1114 }
1115
1116 virtual WSAEVENT GetWSAEvent() {
1117 return WSA_INVALID_EVENT;
1118 }
1119
1120 virtual SOCKET GetSocket() {
1121 return s_;
1122 }
1123
1124 virtual bool CheckSignalClose() {
1125 if (!signal_close_)
1126 return false;
1127
1128 char ch;
1129 if (recv(s_, &ch, 1, MSG_PEEK) > 0)
1130 return false;
1131
1132 state_ = CS_CLOSED;
1133 signal_close_ = false;
1134 SignalCloseEvent(this, signal_err_);
1135 return true;
1136 }
1137 };
1138
1139 int SocketDispatcher::next_id_ = 0;
1140
1141 #endif // WEBRTC_WIN 1107 #endif // WEBRTC_WIN
1142 1108
1143 // Sets the value of a boolean value to false when signaled. 1109 // Sets the value of a boolean value to false when signaled.
1144 class Signaler : public EventDispatcher { 1110 class Signaler : public EventDispatcher {
1145 public: 1111 public:
1146 Signaler(PhysicalSocketServer* ss, bool* pf) 1112 Signaler(PhysicalSocketServer* ss, bool* pf)
1147 : EventDispatcher(ss), pf_(pf) { 1113 : EventDispatcher(ss), pf_(pf) {
1148 } 1114 }
1149 ~Signaler() override { } 1115 ~Signaler() override { }
1150 1116
(...skipping 32 matching lines...) Expand 10 before | Expand all | Expand 10 after
1183 Socket* PhysicalSocketServer::CreateSocket(int type) { 1149 Socket* PhysicalSocketServer::CreateSocket(int type) {
1184 return CreateSocket(AF_INET, type); 1150 return CreateSocket(AF_INET, type);
1185 } 1151 }
1186 1152
1187 Socket* PhysicalSocketServer::CreateSocket(int family, int type) { 1153 Socket* PhysicalSocketServer::CreateSocket(int family, int type) {
1188 PhysicalSocket* socket = new PhysicalSocket(this); 1154 PhysicalSocket* socket = new PhysicalSocket(this);
1189 if (socket->Create(family, type)) { 1155 if (socket->Create(family, type)) {
1190 return socket; 1156 return socket;
1191 } else { 1157 } else {
1192 delete socket; 1158 delete socket;
1193 return 0; 1159 return nullptr;
1194 } 1160 }
1195 } 1161 }
1196 1162
1197 AsyncSocket* PhysicalSocketServer::CreateAsyncSocket(int type) { 1163 AsyncSocket* PhysicalSocketServer::CreateAsyncSocket(int type) {
1198 return CreateAsyncSocket(AF_INET, type); 1164 return CreateAsyncSocket(AF_INET, type);
1199 } 1165 }
1200 1166
1201 AsyncSocket* PhysicalSocketServer::CreateAsyncSocket(int family, int type) { 1167 AsyncSocket* PhysicalSocketServer::CreateAsyncSocket(int family, int type) {
1202 SocketDispatcher* dispatcher = new SocketDispatcher(this); 1168 SocketDispatcher* dispatcher = new SocketDispatcher(this);
1203 if (dispatcher->Create(family, type)) { 1169 if (dispatcher->Create(family, type)) {
1204 return dispatcher; 1170 return dispatcher;
1205 } else { 1171 } else {
1206 delete dispatcher; 1172 delete dispatcher;
1207 return 0; 1173 return nullptr;
1208 } 1174 }
1209 } 1175 }
1210 1176
1211 AsyncSocket* PhysicalSocketServer::WrapSocket(SOCKET s) { 1177 AsyncSocket* PhysicalSocketServer::WrapSocket(SOCKET s) {
1212 SocketDispatcher* dispatcher = new SocketDispatcher(s, this); 1178 SocketDispatcher* dispatcher = new SocketDispatcher(s, this);
1213 if (dispatcher->Initialize()) { 1179 if (dispatcher->Initialize()) {
1214 return dispatcher; 1180 return dispatcher;
1215 } else { 1181 } else {
1216 delete dispatcher; 1182 delete dispatcher;
1217 return 0; 1183 return nullptr;
1218 } 1184 }
1219 } 1185 }
1220 1186
1221 void PhysicalSocketServer::Add(Dispatcher *pdispatcher) { 1187 void PhysicalSocketServer::Add(Dispatcher *pdispatcher) {
1222 CritScope cs(&crit_); 1188 CritScope cs(&crit_);
1223 // Prevent duplicates. This can cause dead dispatchers to stick around. 1189 // Prevent duplicates. This can cause dead dispatchers to stick around.
1224 DispatcherList::iterator pos = std::find(dispatchers_.begin(), 1190 DispatcherList::iterator pos = std::find(dispatchers_.begin(),
1225 dispatchers_.end(), 1191 dispatchers_.end(),
1226 pdispatcher); 1192 pdispatcher);
1227 if (pos != dispatchers_.end()) 1193 if (pos != dispatchers_.end())
(...skipping 400 matching lines...) Expand 10 before | Expand all | Expand 10 after
1628 break; 1594 break;
1629 } 1595 }
1630 } 1596 }
1631 1597
1632 // Done 1598 // Done
1633 return true; 1599 return true;
1634 } 1600 }
1635 #endif // WEBRTC_WIN 1601 #endif // WEBRTC_WIN
1636 1602
1637 } // namespace rtc 1603 } // namespace rtc
OLDNEW
« no previous file with comments | « webrtc/base/physicalsocketserver.h ('k') | webrtc/base/physicalsocketserver_unittest.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698