Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(149)

Side by Side Diff: webrtc/base/physicalsocketserver.cc

Issue 1452903006: Keep listening if "accept" returns an invalid socket. (Closed) Base URL: https://chromium.googlesource.com/external/webrtc.git@master
Patch Set: Fixed presubmit Created 5 years ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « webrtc/base/physicalsocketserver.h ('k') | webrtc/base/physicalsocketserver_unittest.cc » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 /* 1 /*
2 * Copyright 2004 The WebRTC Project Authors. All rights reserved. 2 * Copyright 2004 The WebRTC Project Authors. All rights reserved.
3 * 3 *
4 * Use of this source code is governed by a BSD-style license 4 * Use of this source code is governed by a BSD-style license
5 * that can be found in the LICENSE file in the root of the source 5 * that can be found in the LICENSE file in the root of the source
6 * tree. An additional intellectual property rights grant can be found 6 * tree. An additional intellectual property rights grant can be found
7 * in the file PATENTS. All contributing project authors may 7 * in the file PATENTS. All contributing project authors may
8 * be found in the AUTHORS file in the root of the source tree. 8 * be found in the AUTHORS file in the root of the source tree.
9 */ 9 */
10 10
(...skipping 26 matching lines...) Expand all
37 #endif 37 #endif
38 38
39 #include <algorithm> 39 #include <algorithm>
40 #include <map> 40 #include <map>
41 41
42 #include "webrtc/base/arraysize.h" 42 #include "webrtc/base/arraysize.h"
43 #include "webrtc/base/basictypes.h" 43 #include "webrtc/base/basictypes.h"
44 #include "webrtc/base/byteorder.h" 44 #include "webrtc/base/byteorder.h"
45 #include "webrtc/base/common.h" 45 #include "webrtc/base/common.h"
46 #include "webrtc/base/logging.h" 46 #include "webrtc/base/logging.h"
47 #include "webrtc/base/nethelpers.h"
48 #include "webrtc/base/physicalsocketserver.h" 47 #include "webrtc/base/physicalsocketserver.h"
49 #include "webrtc/base/timeutils.h" 48 #include "webrtc/base/timeutils.h"
50 #include "webrtc/base/winping.h" 49 #include "webrtc/base/winping.h"
51 #include "webrtc/base/win32socketinit.h" 50 #include "webrtc/base/win32socketinit.h"
52 51
53 // stm: this will tell us if we are on OSX 52 // stm: this will tell us if we are on OSX
54 #ifdef HAVE_CONFIG_H 53 #ifdef HAVE_CONFIG_H
55 #include "config.h" 54 #include "config.h"
56 #endif 55 #endif
57 56
(...skipping 32 matching lines...) Expand 10 before | Expand all | Expand 10 after
90 68, // Official minimum 89 68, // Official minimum
91 0, // End of list marker 90 0, // End of list marker
92 }; 91 };
93 92
94 static const int IP_HEADER_SIZE = 20u; 93 static const int IP_HEADER_SIZE = 20u;
95 static const int IPV6_HEADER_SIZE = 40u; 94 static const int IPV6_HEADER_SIZE = 40u;
96 static const int ICMP_HEADER_SIZE = 8u; 95 static const int ICMP_HEADER_SIZE = 8u;
97 static const int ICMP_PING_TIMEOUT_MILLIS = 10000u; 96 static const int ICMP_PING_TIMEOUT_MILLIS = 10000u;
98 #endif 97 #endif
99 98
100 class PhysicalSocket : public AsyncSocket, public sigslot::has_slots<> { 99 PhysicalSocket::PhysicalSocket(PhysicalSocketServer* ss, SOCKET s)
101 public: 100 : ss_(ss), s_(s), enabled_events_(0), error_(0),
102 PhysicalSocket(PhysicalSocketServer* ss, SOCKET s = INVALID_SOCKET) 101 state_((s == INVALID_SOCKET) ? CS_CLOSED : CS_CONNECTED),
103 : ss_(ss), s_(s), enabled_events_(0), error_(0), 102 resolver_(nullptr) {
104 state_((s == INVALID_SOCKET) ? CS_CLOSED : CS_CONNECTED),
105 resolver_(NULL) {
106 #if defined(WEBRTC_WIN) 103 #if defined(WEBRTC_WIN)
107 // EnsureWinsockInit() ensures that winsock is initialized. The default 104 // EnsureWinsockInit() ensures that winsock is initialized. The default
108 // version of this function doesn't do anything because winsock is 105 // version of this function doesn't do anything because winsock is
109 // initialized by constructor of a static object. If neccessary libjingle 106 // initialized by constructor of a static object. If neccessary libjingle
110 // users can link it with a different version of this function by replacing 107 // users can link it with a different version of this function by replacing
111 // win32socketinit.cc. See win32socketinit.cc for more details. 108 // win32socketinit.cc. See win32socketinit.cc for more details.
112 EnsureWinsockInit(); 109 EnsureWinsockInit();
113 #endif 110 #endif
114 if (s_ != INVALID_SOCKET) { 111 if (s_ != INVALID_SOCKET) {
115 enabled_events_ = DE_READ | DE_WRITE; 112 enabled_events_ = DE_READ | DE_WRITE;
116 113
117 int type = SOCK_STREAM; 114 int type = SOCK_STREAM;
118 socklen_t len = sizeof(type); 115 socklen_t len = sizeof(type);
119 VERIFY(0 == getsockopt(s_, SOL_SOCKET, SO_TYPE, (SockOptArg)&type, &len)); 116 VERIFY(0 == getsockopt(s_, SOL_SOCKET, SO_TYPE, (SockOptArg)&type, &len));
120 udp_ = (SOCK_DGRAM == type);
121 }
122 }
123
124 ~PhysicalSocket() override {
125 Close();
126 }
127
128 // Creates the underlying OS socket (same as the "socket" function).
129 virtual bool Create(int family, int type) {
130 Close();
131 s_ = ::socket(family, type, 0);
132 udp_ = (SOCK_DGRAM == type); 117 udp_ = (SOCK_DGRAM == type);
133 UpdateLastError(); 118 }
134 if (udp_) 119 }
135 enabled_events_ = DE_READ | DE_WRITE; 120
136 return s_ != INVALID_SOCKET; 121 PhysicalSocket::~PhysicalSocket() {
137 } 122 Close();
138 123 }
139 SocketAddress GetLocalAddress() const override { 124
140 sockaddr_storage addr_storage = {0}; 125 bool PhysicalSocket::Create(int family, int type) {
141 socklen_t addrlen = sizeof(addr_storage); 126 Close();
142 sockaddr* addr = reinterpret_cast<sockaddr*>(&addr_storage); 127 s_ = ::socket(family, type, 0);
143 int result = ::getsockname(s_, addr, &addrlen); 128 udp_ = (SOCK_DGRAM == type);
144 SocketAddress address; 129 UpdateLastError();
145 if (result >= 0) { 130 if (udp_)
146 SocketAddressFromSockAddrStorage(addr_storage, &address); 131 enabled_events_ = DE_READ | DE_WRITE;
147 } else { 132 return s_ != INVALID_SOCKET;
148 LOG(LS_WARNING) << "GetLocalAddress: unable to get local addr, socket=" 133 }
149 << s_; 134
150 } 135 SocketAddress PhysicalSocket::GetLocalAddress() const {
151 return address; 136 sockaddr_storage addr_storage = {0};
152 } 137 socklen_t addrlen = sizeof(addr_storage);
153 138 sockaddr* addr = reinterpret_cast<sockaddr*>(&addr_storage);
154 SocketAddress GetRemoteAddress() const override { 139 int result = ::getsockname(s_, addr, &addrlen);
155 sockaddr_storage addr_storage = {0}; 140 SocketAddress address;
156 socklen_t addrlen = sizeof(addr_storage); 141 if (result >= 0) {
157 sockaddr* addr = reinterpret_cast<sockaddr*>(&addr_storage); 142 SocketAddressFromSockAddrStorage(addr_storage, &address);
158 int result = ::getpeername(s_, addr, &addrlen); 143 } else {
159 SocketAddress address; 144 LOG(LS_WARNING) << "GetLocalAddress: unable to get local addr, socket="
160 if (result >= 0) { 145 << s_;
161 SocketAddressFromSockAddrStorage(addr_storage, &address); 146 }
162 } else { 147 return address;
163 LOG(LS_WARNING) << "GetRemoteAddress: unable to get remote addr, socket=" 148 }
164 << s_; 149
165 } 150 SocketAddress PhysicalSocket::GetRemoteAddress() const {
166 return address; 151 sockaddr_storage addr_storage = {0};
167 } 152 socklen_t addrlen = sizeof(addr_storage);
168 153 sockaddr* addr = reinterpret_cast<sockaddr*>(&addr_storage);
169 int Bind(const SocketAddress& bind_addr) override { 154 int result = ::getpeername(s_, addr, &addrlen);
170 sockaddr_storage addr_storage; 155 SocketAddress address;
171 size_t len = bind_addr.ToSockAddrStorage(&addr_storage); 156 if (result >= 0) {
172 sockaddr* addr = reinterpret_cast<sockaddr*>(&addr_storage); 157 SocketAddressFromSockAddrStorage(addr_storage, &address);
173 int err = ::bind(s_, addr, static_cast<int>(len)); 158 } else {
174 UpdateLastError(); 159 LOG(LS_WARNING) << "GetRemoteAddress: unable to get remote addr, socket="
160 << s_;
161 }
162 return address;
163 }
164
165 int PhysicalSocket::Bind(const SocketAddress& bind_addr) {
166 sockaddr_storage addr_storage;
167 size_t len = bind_addr.ToSockAddrStorage(&addr_storage);
168 sockaddr* addr = reinterpret_cast<sockaddr*>(&addr_storage);
169 int err = ::bind(s_, addr, static_cast<int>(len));
170 UpdateLastError();
175 #if !defined(NDEBUG) 171 #if !defined(NDEBUG)
176 if (0 == err) { 172 if (0 == err) {
177 dbg_addr_ = "Bound @ "; 173 dbg_addr_ = "Bound @ ";
178 dbg_addr_.append(GetLocalAddress().ToString()); 174 dbg_addr_.append(GetLocalAddress().ToString());
179 } 175 }
180 #endif 176 #endif
181 return err; 177 return err;
182 } 178 }
183 179
184 int Connect(const SocketAddress& addr) override { 180 int PhysicalSocket::Connect(const SocketAddress& addr) {
185 // TODO: Implicit creation is required to reconnect... 181 // TODO(pthatcher): Implicit creation is required to reconnect...
186 // ...but should we make it more explicit? 182 // ...but should we make it more explicit?
187 if (state_ != CS_CLOSED) { 183 if (state_ != CS_CLOSED) {
188 SetError(EALREADY); 184 SetError(EALREADY);
189 return SOCKET_ERROR; 185 return SOCKET_ERROR;
190 } 186 }
191 if (addr.IsUnresolvedIP()) { 187 if (addr.IsUnresolvedIP()) {
192 LOG(LS_VERBOSE) << "Resolving addr in PhysicalSocket::Connect"; 188 LOG(LS_VERBOSE) << "Resolving addr in PhysicalSocket::Connect";
193 resolver_ = new AsyncResolver(); 189 resolver_ = new AsyncResolver();
194 resolver_->SignalDone.connect(this, &PhysicalSocket::OnResolveResult); 190 resolver_->SignalDone.connect(this, &PhysicalSocket::OnResolveResult);
195 resolver_->Start(addr); 191 resolver_->Start(addr);
196 state_ = CS_CONNECTING; 192 state_ = CS_CONNECTING;
193 return 0;
194 }
195
196 return DoConnect(addr);
197 }
198
199 int PhysicalSocket::DoConnect(const SocketAddress& connect_addr) {
200 if ((s_ == INVALID_SOCKET) &&
201 !Create(connect_addr.family(), SOCK_STREAM)) {
202 return SOCKET_ERROR;
203 }
204 sockaddr_storage addr_storage;
205 size_t len = connect_addr.ToSockAddrStorage(&addr_storage);
206 sockaddr* addr = reinterpret_cast<sockaddr*>(&addr_storage);
207 int err = ::connect(s_, addr, static_cast<int>(len));
208 UpdateLastError();
209 if (err == 0) {
210 state_ = CS_CONNECTED;
211 } else if (IsBlockingError(GetError())) {
212 state_ = CS_CONNECTING;
213 enabled_events_ |= DE_CONNECT;
214 } else {
215 return SOCKET_ERROR;
216 }
217
218 enabled_events_ |= DE_READ | DE_WRITE;
219 return 0;
220 }
221
222 int PhysicalSocket::GetError() const {
223 CritScope cs(&crit_);
224 return error_;
225 }
226
227 void PhysicalSocket::SetError(int error) {
228 CritScope cs(&crit_);
229 error_ = error;
230 }
231
232 AsyncSocket::ConnState PhysicalSocket::GetState() const {
233 return state_;
234 }
235
236 int PhysicalSocket::GetOption(Option opt, int* value) {
237 int slevel;
238 int sopt;
239 if (TranslateOption(opt, &slevel, &sopt) == -1)
240 return -1;
241 socklen_t optlen = sizeof(*value);
242 int ret = ::getsockopt(s_, slevel, sopt, (SockOptArg)value, &optlen);
243 if (ret != -1 && opt == OPT_DONTFRAGMENT) {
244 #if defined(WEBRTC_LINUX) && !defined(WEBRTC_ANDROID)
245 *value = (*value != IP_PMTUDISC_DONT) ? 1 : 0;
246 #endif
247 }
248 return ret;
249 }
250
251 int PhysicalSocket::SetOption(Option opt, int value) {
252 int slevel;
253 int sopt;
254 if (TranslateOption(opt, &slevel, &sopt) == -1)
255 return -1;
256 if (opt == OPT_DONTFRAGMENT) {
257 #if defined(WEBRTC_LINUX) && !defined(WEBRTC_ANDROID)
258 value = (value) ? IP_PMTUDISC_DO : IP_PMTUDISC_DONT;
259 #endif
260 }
261 return ::setsockopt(s_, slevel, sopt, (SockOptArg)&value, sizeof(value));
262 }
263
264 int PhysicalSocket::Send(const void* pv, size_t cb) {
265 int sent = ::send(s_, reinterpret_cast<const char *>(pv), (int)cb,
266 #if defined(WEBRTC_LINUX) && !defined(WEBRTC_ANDROID)
267 // Suppress SIGPIPE. Without this, attempting to send on a socket whose
268 // other end is closed will result in a SIGPIPE signal being raised to
269 // our process, which by default will terminate the process, which we
270 // don't want. By specifying this flag, we'll just get the error EPIPE
271 // instead and can handle the error gracefully.
272 MSG_NOSIGNAL
273 #else
274 0
275 #endif
276 );
277 UpdateLastError();
278 MaybeRemapSendError();
279 // We have seen minidumps where this may be false.
280 ASSERT(sent <= static_cast<int>(cb));
281 if ((sent < 0) && IsBlockingError(GetError())) {
282 enabled_events_ |= DE_WRITE;
283 }
284 return sent;
285 }
286
287 int PhysicalSocket::SendTo(const void* buffer,
288 size_t length,
289 const SocketAddress& addr) {
290 sockaddr_storage saddr;
291 size_t len = addr.ToSockAddrStorage(&saddr);
292 int sent = ::sendto(
293 s_, static_cast<const char *>(buffer), static_cast<int>(length),
294 #if defined(WEBRTC_LINUX) && !defined(WEBRTC_ANDROID)
295 // Suppress SIGPIPE. See above for explanation.
296 MSG_NOSIGNAL,
297 #else
298 0,
299 #endif
300 reinterpret_cast<sockaddr*>(&saddr), static_cast<int>(len));
301 UpdateLastError();
302 MaybeRemapSendError();
303 // We have seen minidumps where this may be false.
304 ASSERT(sent <= static_cast<int>(length));
305 if ((sent < 0) && IsBlockingError(GetError())) {
306 enabled_events_ |= DE_WRITE;
307 }
308 return sent;
309 }
310
311 int PhysicalSocket::Recv(void* buffer, size_t length) {
312 int received = ::recv(s_, static_cast<char*>(buffer),
313 static_cast<int>(length), 0);
314 if ((received == 0) && (length != 0)) {
315 // Note: on graceful shutdown, recv can return 0. In this case, we
316 // pretend it is blocking, and then signal close, so that simplifying
317 // assumptions can be made about Recv.
318 LOG(LS_WARNING) << "EOF from socket; deferring close event";
319 // Must turn this back on so that the select() loop will notice the close
320 // event.
321 enabled_events_ |= DE_READ;
322 SetError(EWOULDBLOCK);
323 return SOCKET_ERROR;
324 }
325 UpdateLastError();
326 int error = GetError();
327 bool success = (received >= 0) || IsBlockingError(error);
328 if (udp_ || success) {
329 enabled_events_ |= DE_READ;
330 }
331 if (!success) {
332 LOG_F(LS_VERBOSE) << "Error = " << error;
333 }
334 return received;
335 }
336
337 int PhysicalSocket::RecvFrom(void* buffer,
338 size_t length,
339 SocketAddress* out_addr) {
340 sockaddr_storage addr_storage;
341 socklen_t addr_len = sizeof(addr_storage);
342 sockaddr* addr = reinterpret_cast<sockaddr*>(&addr_storage);
343 int received = ::recvfrom(s_, static_cast<char*>(buffer),
344 static_cast<int>(length), 0, addr, &addr_len);
345 UpdateLastError();
346 if ((received >= 0) && (out_addr != nullptr))
347 SocketAddressFromSockAddrStorage(addr_storage, out_addr);
348 int error = GetError();
349 bool success = (received >= 0) || IsBlockingError(error);
350 if (udp_ || success) {
351 enabled_events_ |= DE_READ;
352 }
353 if (!success) {
354 LOG_F(LS_VERBOSE) << "Error = " << error;
355 }
356 return received;
357 }
358
359 int PhysicalSocket::Listen(int backlog) {
360 int err = ::listen(s_, backlog);
361 UpdateLastError();
362 if (err == 0) {
363 state_ = CS_CONNECTING;
364 enabled_events_ |= DE_ACCEPT;
365 #if !defined(NDEBUG)
366 dbg_addr_ = "Listening @ ";
367 dbg_addr_.append(GetLocalAddress().ToString());
368 #endif
369 }
370 return err;
371 }
372
373 AsyncSocket* PhysicalSocket::Accept(SocketAddress* out_addr) {
374 // Always re-subscribe DE_ACCEPT to make sure new incoming connections will
375 // trigger an event even if DoAccept returns an error here.
376 enabled_events_ |= DE_ACCEPT;
377 sockaddr_storage addr_storage;
378 socklen_t addr_len = sizeof(addr_storage);
379 sockaddr* addr = reinterpret_cast<sockaddr*>(&addr_storage);
380 SOCKET s = DoAccept(s_, addr, &addr_len);
381 UpdateLastError();
382 if (s == INVALID_SOCKET)
383 return nullptr;
384 if (out_addr != nullptr)
385 SocketAddressFromSockAddrStorage(addr_storage, out_addr);
386 return ss_->WrapSocket(s);
387 }
388
389 int PhysicalSocket::Close() {
390 if (s_ == INVALID_SOCKET)
391 return 0;
392 int err = ::closesocket(s_);
393 UpdateLastError();
394 s_ = INVALID_SOCKET;
395 state_ = CS_CLOSED;
396 enabled_events_ = 0;
397 if (resolver_) {
398 resolver_->Destroy(false);
399 resolver_ = nullptr;
400 }
401 return err;
402 }
403
404 int PhysicalSocket::EstimateMTU(uint16_t* mtu) {
405 SocketAddress addr = GetRemoteAddress();
406 if (addr.IsAnyIP()) {
407 SetError(ENOTCONN);
408 return -1;
409 }
410
411 #if defined(WEBRTC_WIN)
412 // Gets the interface MTU (TTL=1) for the interface used to reach |addr|.
413 WinPing ping;
414 if (!ping.IsValid()) {
415 SetError(EINVAL); // can't think of a better error ID
416 return -1;
417 }
418 int header_size = ICMP_HEADER_SIZE;
419 if (addr.family() == AF_INET6) {
420 header_size += IPV6_HEADER_SIZE;
421 } else if (addr.family() == AF_INET) {
422 header_size += IP_HEADER_SIZE;
423 }
424
425 for (int level = 0; PACKET_MAXIMUMS[level + 1] > 0; ++level) {
426 int32_t size = PACKET_MAXIMUMS[level] - header_size;
427 WinPing::PingResult result = ping.Ping(addr.ipaddr(), size,
428 ICMP_PING_TIMEOUT_MILLIS,
429 1, false);
430 if (result == WinPing::PING_FAIL) {
431 SetError(EINVAL); // can't think of a better error ID
432 return -1;
433 } else if (result != WinPing::PING_TOO_LARGE) {
434 *mtu = PACKET_MAXIMUMS[level];
197 return 0; 435 return 0;
198 } 436 }
199 437 }
200 return DoConnect(addr); 438
201 } 439 ASSERT(false);
202 440 return -1;
203 int DoConnect(const SocketAddress& connect_addr) { 441 #elif defined(WEBRTC_MAC)
204 if ((s_ == INVALID_SOCKET) && 442 // No simple way to do this on Mac OS X.
205 !Create(connect_addr.family(), SOCK_STREAM)) { 443 // SIOCGIFMTU would work if we knew which interface would be used, but
206 return SOCKET_ERROR; 444 // figuring that out is pretty complicated. For now we'll return an error
207 } 445 // and let the caller pick a default MTU.
208 sockaddr_storage addr_storage; 446 SetError(EINVAL);
209 size_t len = connect_addr.ToSockAddrStorage(&addr_storage); 447 return -1;
210 sockaddr* addr = reinterpret_cast<sockaddr*>(&addr_storage); 448 #elif defined(WEBRTC_LINUX)
211 int err = ::connect(s_, addr, static_cast<int>(len)); 449 // Gets the path MTU.
450 int value;
451 socklen_t vlen = sizeof(value);
452 int err = getsockopt(s_, IPPROTO_IP, IP_MTU, &value, &vlen);
453 if (err < 0) {
212 UpdateLastError(); 454 UpdateLastError();
213 if (err == 0) { 455 return err;
214 state_ = CS_CONNECTED; 456 }
215 } else if (IsBlockingError(GetError())) { 457
216 state_ = CS_CONNECTING; 458 ASSERT((0 <= value) && (value <= 65536));
217 enabled_events_ |= DE_CONNECT; 459 *mtu = value;
218 } else { 460 return 0;
219 return SOCKET_ERROR; 461 #elif defined(__native_client__)
220 } 462 // Most socket operations, including this, will fail in NaCl's sandbox.
221 463 error_ = EACCES;
222 enabled_events_ |= DE_READ | DE_WRITE; 464 return -1;
223 return 0; 465 #endif
224 } 466 }
225 467
226 int GetError() const override { 468
227 CritScope cs(&crit_); 469 SOCKET PhysicalSocket::DoAccept(SOCKET socket,
228 return error_; 470 sockaddr* addr,
229 } 471 socklen_t* addrlen) {
230 472 return ::accept(socket, addr, addrlen);
231 void SetError(int error) override { 473 }
232 CritScope cs(&crit_); 474
233 error_ = error; 475 void PhysicalSocket::OnResolveResult(AsyncResolverInterface* resolver) {
234 } 476 if (resolver != resolver_) {
235 477 return;
236 ConnState GetState() const override { return state_; } 478 }
237 479
238 int GetOption(Option opt, int* value) override { 480 int error = resolver_->GetError();
239 int slevel; 481 if (error == 0) {
240 int sopt; 482 error = DoConnect(resolver_->address());
241 if (TranslateOption(opt, &slevel, &sopt) == -1) 483 } else {
484 Close();
485 }
486
487 if (error) {
488 SetError(error);
489 SignalCloseEvent(this, error);
490 }
491 }
492
493 void PhysicalSocket::UpdateLastError() {
494 SetError(LAST_SYSTEM_ERROR);
495 }
496
497 void PhysicalSocket::MaybeRemapSendError() {
498 #if defined(WEBRTC_MAC)
499 // https://developer.apple.com/library/mac/documentation/Darwin/
500 // Reference/ManPages/man2/sendto.2.html
501 // ENOBUFS - The output queue for a network interface is full.
502 // This generally indicates that the interface has stopped sending,
503 // but may be caused by transient congestion.
504 if (GetError() == ENOBUFS) {
505 SetError(EWOULDBLOCK);
506 }
507 #endif
508 }
509
510 int PhysicalSocket::TranslateOption(Option opt, int* slevel, int* sopt) {
511 switch (opt) {
512 case OPT_DONTFRAGMENT:
513 #if defined(WEBRTC_WIN)
514 *slevel = IPPROTO_IP;
515 *sopt = IP_DONTFRAGMENT;
516 break;
517 #elif defined(WEBRTC_MAC) || defined(BSD) || defined(__native_client__)
518 LOG(LS_WARNING) << "Socket::OPT_DONTFRAGMENT not supported.";
242 return -1; 519 return -1;
243 socklen_t optlen = sizeof(*value); 520 #elif defined(WEBRTC_POSIX)
244 int ret = ::getsockopt(s_, slevel, sopt, (SockOptArg)value, &optlen); 521 *slevel = IPPROTO_IP;
245 if (ret != -1 && opt == OPT_DONTFRAGMENT) { 522 *sopt = IP_MTU_DISCOVER;
246 #if defined(WEBRTC_LINUX) && !defined(WEBRTC_ANDROID) 523 break;
247 *value = (*value != IP_PMTUDISC_DONT) ? 1 : 0; 524 #endif
248 #endif 525 case OPT_RCVBUF:
249 } 526 *slevel = SOL_SOCKET;
250 return ret; 527 *sopt = SO_RCVBUF;
251 } 528 break;
252 529 case OPT_SNDBUF:
253 int SetOption(Option opt, int value) override { 530 *slevel = SOL_SOCKET;
254 int slevel; 531 *sopt = SO_SNDBUF;
255 int sopt; 532 break;
256 if (TranslateOption(opt, &slevel, &sopt) == -1) 533 case OPT_NODELAY:
534 *slevel = IPPROTO_TCP;
535 *sopt = TCP_NODELAY;
536 break;
537 case OPT_DSCP:
538 LOG(LS_WARNING) << "Socket::OPT_DSCP not supported.";
257 return -1; 539 return -1;
258 if (opt == OPT_DONTFRAGMENT) { 540 case OPT_RTP_SENDTIME_EXTN_ID:
259 #if defined(WEBRTC_LINUX) && !defined(WEBRTC_ANDROID) 541 return -1; // No logging is necessary as this not a OS socket option.
260 value = (value) ? IP_PMTUDISC_DO : IP_PMTUDISC_DONT; 542 default:
261 #endif 543 ASSERT(false);
262 }
263 return ::setsockopt(s_, slevel, sopt, (SockOptArg)&value, sizeof(value));
264 }
265
266 int Send(const void* pv, size_t cb) override {
267 int sent = ::send(s_, reinterpret_cast<const char *>(pv), (int)cb,
268 #if defined(WEBRTC_LINUX) && !defined(WEBRTC_ANDROID)
269 // Suppress SIGPIPE. Without this, attempting to send on a socket whose
270 // other end is closed will result in a SIGPIPE signal being raised to
271 // our process, which by default will terminate the process, which we
272 // don't want. By specifying this flag, we'll just get the error EPIPE
273 // instead and can handle the error gracefully.
274 MSG_NOSIGNAL
275 #else
276 0
277 #endif
278 );
279 UpdateLastError();
280 MaybeRemapSendError();
281 // We have seen minidumps where this may be false.
282 ASSERT(sent <= static_cast<int>(cb));
283 if ((sent < 0) && IsBlockingError(GetError())) {
284 enabled_events_ |= DE_WRITE;
285 }
286 return sent;
287 }
288
289 int SendTo(const void* buffer,
290 size_t length,
291 const SocketAddress& addr) override {
292 sockaddr_storage saddr;
293 size_t len = addr.ToSockAddrStorage(&saddr);
294 int sent = ::sendto(
295 s_, static_cast<const char *>(buffer), static_cast<int>(length),
296 #if defined(WEBRTC_LINUX) && !defined(WEBRTC_ANDROID)
297 // Suppress SIGPIPE. See above for explanation.
298 MSG_NOSIGNAL,
299 #else
300 0,
301 #endif
302 reinterpret_cast<sockaddr*>(&saddr), static_cast<int>(len));
303 UpdateLastError();
304 MaybeRemapSendError();
305 // We have seen minidumps where this may be false.
306 ASSERT(sent <= static_cast<int>(length));
307 if ((sent < 0) && IsBlockingError(GetError())) {
308 enabled_events_ |= DE_WRITE;
309 }
310 return sent;
311 }
312
313 int Recv(void* buffer, size_t length) override {
314 int received = ::recv(s_, static_cast<char*>(buffer),
315 static_cast<int>(length), 0);
316 if ((received == 0) && (length != 0)) {
317 // Note: on graceful shutdown, recv can return 0. In this case, we
318 // pretend it is blocking, and then signal close, so that simplifying
319 // assumptions can be made about Recv.
320 LOG(LS_WARNING) << "EOF from socket; deferring close event";
321 // Must turn this back on so that the select() loop will notice the close
322 // event.
323 enabled_events_ |= DE_READ;
324 SetError(EWOULDBLOCK);
325 return SOCKET_ERROR;
326 }
327 UpdateLastError();
328 int error = GetError();
329 bool success = (received >= 0) || IsBlockingError(error);
330 if (udp_ || success) {
331 enabled_events_ |= DE_READ;
332 }
333 if (!success) {
334 LOG_F(LS_VERBOSE) << "Error = " << error;
335 }
336 return received;
337 }
338
339 int RecvFrom(void* buffer, size_t length, SocketAddress* out_addr) override {
340 sockaddr_storage addr_storage;
341 socklen_t addr_len = sizeof(addr_storage);
342 sockaddr* addr = reinterpret_cast<sockaddr*>(&addr_storage);
343 int received = ::recvfrom(s_, static_cast<char*>(buffer),
344 static_cast<int>(length), 0, addr, &addr_len);
345 UpdateLastError();
346 if ((received >= 0) && (out_addr != NULL))
347 SocketAddressFromSockAddrStorage(addr_storage, out_addr);
348 int error = GetError();
349 bool success = (received >= 0) || IsBlockingError(error);
350 if (udp_ || success) {
351 enabled_events_ |= DE_READ;
352 }
353 if (!success) {
354 LOG_F(LS_VERBOSE) << "Error = " << error;
355 }
356 return received;
357 }
358
359 int Listen(int backlog) override {
360 int err = ::listen(s_, backlog);
361 UpdateLastError();
362 if (err == 0) {
363 state_ = CS_CONNECTING;
364 enabled_events_ |= DE_ACCEPT;
365 #if !defined(NDEBUG)
366 dbg_addr_ = "Listening @ ";
367 dbg_addr_.append(GetLocalAddress().ToString());
368 #endif
369 }
370 return err;
371 }
372
373 AsyncSocket* Accept(SocketAddress* out_addr) override {
374 sockaddr_storage addr_storage;
375 socklen_t addr_len = sizeof(addr_storage);
376 sockaddr* addr = reinterpret_cast<sockaddr*>(&addr_storage);
377 SOCKET s = ::accept(s_, addr, &addr_len);
378 UpdateLastError();
379 if (s == INVALID_SOCKET)
380 return NULL;
381 enabled_events_ |= DE_ACCEPT;
382 if (out_addr != NULL)
383 SocketAddressFromSockAddrStorage(addr_storage, out_addr);
384 return ss_->WrapSocket(s);
385 }
386
387 int Close() override {
388 if (s_ == INVALID_SOCKET)
389 return 0;
390 int err = ::closesocket(s_);
391 UpdateLastError();
392 s_ = INVALID_SOCKET;
393 state_ = CS_CLOSED;
394 enabled_events_ = 0;
395 if (resolver_) {
396 resolver_->Destroy(false);
397 resolver_ = NULL;
398 }
399 return err;
400 }
401
402 int EstimateMTU(uint16_t* mtu) override {
403 SocketAddress addr = GetRemoteAddress();
404 if (addr.IsAnyIP()) {
405 SetError(ENOTCONN);
406 return -1; 544 return -1;
407 } 545 }
408 546 return 0;
409 #if defined(WEBRTC_WIN) 547 }
410 // Gets the interface MTU (TTL=1) for the interface used to reach |addr|.
411 WinPing ping;
412 if (!ping.IsValid()) {
413 SetError(EINVAL); // can't think of a better error ID
414 return -1;
415 }
416 int header_size = ICMP_HEADER_SIZE;
417 if (addr.family() == AF_INET6) {
418 header_size += IPV6_HEADER_SIZE;
419 } else if (addr.family() == AF_INET) {
420 header_size += IP_HEADER_SIZE;
421 }
422
423 for (int level = 0; PACKET_MAXIMUMS[level + 1] > 0; ++level) {
424 int32_t size = PACKET_MAXIMUMS[level] - header_size;
425 WinPing::PingResult result = ping.Ping(addr.ipaddr(), size,
426 ICMP_PING_TIMEOUT_MILLIS,
427 1, false);
428 if (result == WinPing::PING_FAIL) {
429 SetError(EINVAL); // can't think of a better error ID
430 return -1;
431 } else if (result != WinPing::PING_TOO_LARGE) {
432 *mtu = PACKET_MAXIMUMS[level];
433 return 0;
434 }
435 }
436
437 ASSERT(false);
438 return -1;
439 #elif defined(WEBRTC_MAC)
440 // No simple way to do this on Mac OS X.
441 // SIOCGIFMTU would work if we knew which interface would be used, but
442 // figuring that out is pretty complicated. For now we'll return an error
443 // and let the caller pick a default MTU.
444 SetError(EINVAL);
445 return -1;
446 #elif defined(WEBRTC_LINUX)
447 // Gets the path MTU.
448 int value;
449 socklen_t vlen = sizeof(value);
450 int err = getsockopt(s_, IPPROTO_IP, IP_MTU, &value, &vlen);
451 if (err < 0) {
452 UpdateLastError();
453 return err;
454 }
455
456 ASSERT((0 <= value) && (value <= 65536));
457 *mtu = value;
458 return 0;
459 #elif defined(__native_client__)
460 // Most socket operations, including this, will fail in NaCl's sandbox.
461 error_ = EACCES;
462 return -1;
463 #endif
464 }
465
466 SocketServer* socketserver() { return ss_; }
467
468 protected:
469 void OnResolveResult(AsyncResolverInterface* resolver) {
470 if (resolver != resolver_) {
471 return;
472 }
473
474 int error = resolver_->GetError();
475 if (error == 0) {
476 error = DoConnect(resolver_->address());
477 } else {
478 Close();
479 }
480
481 if (error) {
482 SetError(error);
483 SignalCloseEvent(this, error);
484 }
485 }
486
487 void UpdateLastError() {
488 SetError(LAST_SYSTEM_ERROR);
489 }
490
491 void MaybeRemapSendError() {
492 #if defined(WEBRTC_MAC)
493 // https://developer.apple.com/library/mac/documentation/Darwin/
494 // Reference/ManPages/man2/sendto.2.html
495 // ENOBUFS - The output queue for a network interface is full.
496 // This generally indicates that the interface has stopped sending,
497 // but may be caused by transient congestion.
498 if (GetError() == ENOBUFS) {
499 SetError(EWOULDBLOCK);
500 }
501 #endif
502 }
503
504 static int TranslateOption(Option opt, int* slevel, int* sopt) {
505 switch (opt) {
506 case OPT_DONTFRAGMENT:
507 #if defined(WEBRTC_WIN)
508 *slevel = IPPROTO_IP;
509 *sopt = IP_DONTFRAGMENT;
510 break;
511 #elif defined(WEBRTC_MAC) || defined(BSD) || defined(__native_client__)
512 LOG(LS_WARNING) << "Socket::OPT_DONTFRAGMENT not supported.";
513 return -1;
514 #elif defined(WEBRTC_POSIX)
515 *slevel = IPPROTO_IP;
516 *sopt = IP_MTU_DISCOVER;
517 break;
518 #endif
519 case OPT_RCVBUF:
520 *slevel = SOL_SOCKET;
521 *sopt = SO_RCVBUF;
522 break;
523 case OPT_SNDBUF:
524 *slevel = SOL_SOCKET;
525 *sopt = SO_SNDBUF;
526 break;
527 case OPT_NODELAY:
528 *slevel = IPPROTO_TCP;
529 *sopt = TCP_NODELAY;
530 break;
531 case OPT_DSCP:
532 LOG(LS_WARNING) << "Socket::OPT_DSCP not supported.";
533 return -1;
534 case OPT_RTP_SENDTIME_EXTN_ID:
535 return -1; // No logging is necessary as this not a OS socket option.
536 default:
537 ASSERT(false);
538 return -1;
539 }
540 return 0;
541 }
542
543 PhysicalSocketServer* ss_;
544 SOCKET s_;
545 uint8_t enabled_events_;
546 bool udp_;
547 int error_;
548 // Protects |error_| that is accessed from different threads.
549 mutable CriticalSection crit_;
550 ConnState state_;
551 AsyncResolver* resolver_;
552
553 #if !defined(NDEBUG)
554 std::string dbg_addr_;
555 #endif
556 };
557 548
558 #if defined(WEBRTC_POSIX) 549 #if defined(WEBRTC_POSIX)
559 class EventDispatcher : public Dispatcher { 550 class EventDispatcher : public Dispatcher {
560 public: 551 public:
561 EventDispatcher(PhysicalSocketServer* ss) : ss_(ss), fSignaled_(false) { 552 EventDispatcher(PhysicalSocketServer* ss) : ss_(ss), fSignaled_(false) {
562 if (pipe(afd_) < 0) 553 if (pipe(afd_) < 0)
563 LOG(LERROR) << "pipe failed"; 554 LOG(LERROR) << "pipe failed";
564 ss_->Add(this); 555 ss_->Add(this);
565 } 556 }
566 557
(...skipping 217 matching lines...) Expand 10 before | Expand all | Expand 10 after
784 } 775 }
785 776
786 private: 777 private:
787 typedef std::map<int, void (*)(int)> HandlerMap; 778 typedef std::map<int, void (*)(int)> HandlerMap;
788 779
789 HandlerMap handlers_; 780 HandlerMap handlers_;
790 // Our owner. 781 // Our owner.
791 PhysicalSocketServer *owner_; 782 PhysicalSocketServer *owner_;
792 }; 783 };
793 784
794 class SocketDispatcher : public Dispatcher, public PhysicalSocket { 785 SocketDispatcher::SocketDispatcher(PhysicalSocketServer *ss)
795 public: 786 : PhysicalSocket(ss) {
796 explicit SocketDispatcher(PhysicalSocketServer *ss) : PhysicalSocket(ss) { 787 }
797 }
798 SocketDispatcher(SOCKET s, PhysicalSocketServer *ss) : PhysicalSocket(ss, s) {
799 }
800 788
801 ~SocketDispatcher() override { 789 SocketDispatcher::SocketDispatcher(SOCKET s, PhysicalSocketServer *ss)
802 Close(); 790 : PhysicalSocket(ss, s) {
803 } 791 }
804 792
805 bool Initialize() { 793 SocketDispatcher::~SocketDispatcher() {
806 ss_->Add(this); 794 Close();
807 fcntl(s_, F_SETFL, fcntl(s_, F_GETFL, 0) | O_NONBLOCK); 795 }
796
797 bool SocketDispatcher::Initialize() {
798 ss_->Add(this);
799 fcntl(s_, F_SETFL, fcntl(s_, F_GETFL, 0) | O_NONBLOCK);
800 return true;
801 }
802
803 bool SocketDispatcher::Create(int type) {
804 return Create(AF_INET, type);
805 }
806
807 bool SocketDispatcher::Create(int family, int type) {
808 // Change the socket to be non-blocking.
809 if (!PhysicalSocket::Create(family, type))
810 return false;
811
812 return Initialize();
813 }
814
815 int SocketDispatcher::GetDescriptor() {
816 return s_;
817 }
818
819 bool SocketDispatcher::IsDescriptorClosed() {
820 // We don't have a reliable way of distinguishing end-of-stream
821 // from readability. So test on each readable call. Is this
822 // inefficient? Probably.
823 char ch;
824 ssize_t res = ::recv(s_, &ch, 1, MSG_PEEK);
825 if (res > 0) {
826 // Data available, so not closed.
827 return false;
828 } else if (res == 0) {
829 // EOF, so closed.
808 return true; 830 return true;
809 } 831 } else { // error
810 832 switch (errno) {
811 virtual bool Create(int type) { 833 // Returned if we've already closed s_.
812 return Create(AF_INET, type); 834 case EBADF:
813 } 835 // Returned during ungraceful peer shutdown.
814 836 case ECONNRESET:
815 bool Create(int family, int type) override { 837 return true;
816 // Change the socket to be non-blocking. 838 default:
817 if (!PhysicalSocket::Create(family, type)) 839 // Assume that all other errors are just blocking errors, meaning the
818 return false; 840 // connection is still good but we just can't read from it right now.
819 841 // This should only happen when connecting (and at most once), because
820 return Initialize(); 842 // in all other cases this function is only called if the file
821 } 843 // descriptor is already known to be in the readable state. However,
822 844 // it's not necessary a problem if we spuriously interpret a
823 int GetDescriptor() override { return s_; } 845 // "connection lost"-type error as a blocking error, because typically
824 846 // the next recv() will get EOF, so we'll still eventually notice that
825 bool IsDescriptorClosed() override { 847 // the socket is closed.
826 // We don't have a reliable way of distinguishing end-of-stream 848 LOG_ERR(LS_WARNING) << "Assuming benign blocking error";
827 // from readability. So test on each readable call. Is this 849 return false;
828 // inefficient? Probably.
829 char ch;
830 ssize_t res = ::recv(s_, &ch, 1, MSG_PEEK);
831 if (res > 0) {
832 // Data available, so not closed.
833 return false;
834 } else if (res == 0) {
835 // EOF, so closed.
836 return true;
837 } else { // error
838 switch (errno) {
839 // Returned if we've already closed s_.
840 case EBADF:
841 // Returned during ungraceful peer shutdown.
842 case ECONNRESET:
843 return true;
844 default:
845 // Assume that all other errors are just blocking errors, meaning the
846 // connection is still good but we just can't read from it right now.
847 // This should only happen when connecting (and at most once), because
848 // in all other cases this function is only called if the file
849 // descriptor is already known to be in the readable state. However,
850 // it's not necessary a problem if we spuriously interpret a
851 // "connection lost"-type error as a blocking error, because typically
852 // the next recv() will get EOF, so we'll still eventually notice that
853 // the socket is closed.
854 LOG_ERR(LS_WARNING) << "Assuming benign blocking error";
855 return false;
856 }
857 } 850 }
858 } 851 }
852 }
859 853
860 uint32_t GetRequestedEvents() override { return enabled_events_; } 854 uint32_t SocketDispatcher::GetRequestedEvents() {
855 return enabled_events_;
856 }
861 857
862 void OnPreEvent(uint32_t ff) override { 858 void SocketDispatcher::OnPreEvent(uint32_t ff) {
863 if ((ff & DE_CONNECT) != 0) 859 if ((ff & DE_CONNECT) != 0)
864 state_ = CS_CONNECTED; 860 state_ = CS_CONNECTED;
865 if ((ff & DE_CLOSE) != 0) 861 if ((ff & DE_CLOSE) != 0)
866 state_ = CS_CLOSED; 862 state_ = CS_CLOSED;
863 }
864
865 void SocketDispatcher::OnEvent(uint32_t ff, int err) {
866 // Make sure we deliver connect/accept first. Otherwise, consumers may see
867 // something like a READ followed by a CONNECT, which would be odd.
868 if ((ff & DE_CONNECT) != 0) {
869 enabled_events_ &= ~DE_CONNECT;
870 SignalConnectEvent(this);
867 } 871 }
872 if ((ff & DE_ACCEPT) != 0) {
873 enabled_events_ &= ~DE_ACCEPT;
874 SignalReadEvent(this);
875 }
876 if ((ff & DE_READ) != 0) {
877 enabled_events_ &= ~DE_READ;
878 SignalReadEvent(this);
879 }
880 if ((ff & DE_WRITE) != 0) {
881 enabled_events_ &= ~DE_WRITE;
882 SignalWriteEvent(this);
883 }
884 if ((ff & DE_CLOSE) != 0) {
885 // The socket is now dead to us, so stop checking it.
886 enabled_events_ = 0;
887 SignalCloseEvent(this, err);
888 }
889 }
868 890
869 void OnEvent(uint32_t ff, int err) override { 891 int SocketDispatcher::Close() {
870 // Make sure we deliver connect/accept first. Otherwise, consumers may see 892 if (s_ == INVALID_SOCKET)
871 // something like a READ followed by a CONNECT, which would be odd. 893 return 0;
872 if ((ff & DE_CONNECT) != 0) {
873 enabled_events_ &= ~DE_CONNECT;
874 SignalConnectEvent(this);
875 }
876 if ((ff & DE_ACCEPT) != 0) {
877 enabled_events_ &= ~DE_ACCEPT;
878 SignalReadEvent(this);
879 }
880 if ((ff & DE_READ) != 0) {
881 enabled_events_ &= ~DE_READ;
882 SignalReadEvent(this);
883 }
884 if ((ff & DE_WRITE) != 0) {
885 enabled_events_ &= ~DE_WRITE;
886 SignalWriteEvent(this);
887 }
888 if ((ff & DE_CLOSE) != 0) {
889 // The socket is now dead to us, so stop checking it.
890 enabled_events_ = 0;
891 SignalCloseEvent(this, err);
892 }
893 }
894 894
895 int Close() override { 895 ss_->Remove(this);
896 if (s_ == INVALID_SOCKET) 896 return PhysicalSocket::Close();
897 return 0; 897 }
898
899 ss_->Remove(this);
900 return PhysicalSocket::Close();
901 }
902 };
903 898
904 class FileDispatcher: public Dispatcher, public AsyncFile { 899 class FileDispatcher: public Dispatcher, public AsyncFile {
905 public: 900 public:
906 FileDispatcher(int fd, PhysicalSocketServer *ss) : ss_(ss), fd_(fd) { 901 FileDispatcher(int fd, PhysicalSocketServer *ss) : ss_(ss), fd_(fd) {
907 set_readable(true); 902 set_readable(true);
908 903
909 ss_->Add(this); 904 ss_->Add(this);
910 905
911 fcntl(fd_, F_SETFL, fcntl(fd_, F_GETFL, 0) | O_NONBLOCK); 906 fcntl(fd_, F_SETFL, fcntl(fd_, F_GETFL, 0) | O_NONBLOCK);
912 } 907 }
(...skipping 95 matching lines...) Expand 10 before | Expand all | Expand 10 after
1008 return INVALID_SOCKET; 1003 return INVALID_SOCKET;
1009 } 1004 }
1010 1005
1011 virtual bool CheckSignalClose() { return false; } 1006 virtual bool CheckSignalClose() { return false; }
1012 1007
1013 private: 1008 private:
1014 PhysicalSocketServer* ss_; 1009 PhysicalSocketServer* ss_;
1015 WSAEVENT hev_; 1010 WSAEVENT hev_;
1016 }; 1011 };
1017 1012
1018 class SocketDispatcher : public Dispatcher, public PhysicalSocket { 1013 SocketDispatcher::SocketDispatcher(PhysicalSocketServer* ss)
1019 public: 1014 : PhysicalSocket(ss),
1020 static int next_id_; 1015 id_(0),
1021 int id_; 1016 signal_close_(false) {
1022 bool signal_close_; 1017 }
1023 int signal_err_;
1024 1018
1025 SocketDispatcher(PhysicalSocketServer* ss) 1019 SocketDispatcher::SocketDispatcher(SOCKET s, PhysicalSocketServer* ss)
1026 : PhysicalSocket(ss), 1020 : PhysicalSocket(ss, s),
1027 id_(0), 1021 id_(0),
1028 signal_close_(false) { 1022 signal_close_(false) {
1023 }
1024
1025 SocketDispatcher::~SocketDispatcher() {
1026 Close();
1027 }
1028
1029 bool SocketDispatcher::Initialize() {
1030 ASSERT(s_ != INVALID_SOCKET);
1031 // Must be a non-blocking
1032 u_long argp = 1;
1033 ioctlsocket(s_, FIONBIO, &argp);
1034 ss_->Add(this);
1035 return true;
1036 }
1037
1038 bool SocketDispatcher::Create(int type) {
1039 return Create(AF_INET, type);
1040 }
1041
1042 bool SocketDispatcher::Create(int family, int type) {
1043 // Create socket
1044 if (!PhysicalSocket::Create(family, type))
1045 return false;
1046
1047 if (!Initialize())
1048 return false;
1049
1050 do { id_ = ++next_id_; } while (id_ == 0);
1051 return true;
1052 }
1053
1054 int SocketDispatcher::Close() {
1055 if (s_ == INVALID_SOCKET)
1056 return 0;
1057
1058 id_ = 0;
1059 signal_close_ = false;
1060 ss_->Remove(this);
1061 return PhysicalSocket::Close();
1062 }
1063
1064 uint32_t SocketDispatcher::GetRequestedEvents() {
1065 return enabled_events_;
1066 }
1067
1068 void SocketDispatcher::OnPreEvent(uint32_t ff) {
1069 if ((ff & DE_CONNECT) != 0)
1070 state_ = CS_CONNECTED;
1071 // We set CS_CLOSED from CheckSignalClose.
1072 }
1073
1074 void SocketDispatcher::OnEvent(uint32_t ff, int err) {
1075 int cache_id = id_;
1076 // Make sure we deliver connect/accept first. Otherwise, consumers may see
1077 // something like a READ followed by a CONNECT, which would be odd.
1078 if (((ff & DE_CONNECT) != 0) && (id_ == cache_id)) {
1079 if (ff != DE_CONNECT)
1080 LOG(LS_VERBOSE) << "Signalled with DE_CONNECT: " << ff;
1081 enabled_events_ &= ~DE_CONNECT;
1082 #if !defined(NDEBUG)
1083 dbg_addr_ = "Connected @ ";
1084 dbg_addr_.append(GetRemoteAddress().ToString());
1085 #endif
1086 SignalConnectEvent(this);
1029 } 1087 }
1088 if (((ff & DE_ACCEPT) != 0) && (id_ == cache_id)) {
1089 enabled_events_ &= ~DE_ACCEPT;
1090 SignalReadEvent(this);
1091 }
1092 if ((ff & DE_READ) != 0) {
1093 enabled_events_ &= ~DE_READ;
1094 SignalReadEvent(this);
1095 }
1096 if (((ff & DE_WRITE) != 0) && (id_ == cache_id)) {
1097 enabled_events_ &= ~DE_WRITE;
1098 SignalWriteEvent(this);
1099 }
1100 if (((ff & DE_CLOSE) != 0) && (id_ == cache_id)) {
1101 signal_close_ = true;
1102 signal_err_ = err;
1103 }
1104 }
1030 1105
1031 SocketDispatcher(SOCKET s, PhysicalSocketServer* ss) 1106 WSAEVENT SocketDispatcher::GetWSAEvent() {
1032 : PhysicalSocket(ss, s), 1107 return WSA_INVALID_EVENT;
1033 id_(0), 1108 }
1034 signal_close_(false) {
1035 }
1036 1109
1037 virtual ~SocketDispatcher() { 1110 SOCKET SocketDispatcher::GetSocket() {
1038 Close(); 1111 return s_;
1039 } 1112 }
1040 1113
1041 bool Initialize() { 1114 bool SocketDispatcher::CheckSignalClose() {
1042 ASSERT(s_ != INVALID_SOCKET); 1115 if (!signal_close_)
1043 // Must be a non-blocking 1116 return false;
1044 u_long argp = 1;
1045 ioctlsocket(s_, FIONBIO, &argp);
1046 ss_->Add(this);
1047 return true;
1048 }
1049 1117
1050 virtual bool Create(int type) { 1118 char ch;
1051 return Create(AF_INET, type); 1119 if (recv(s_, &ch, 1, MSG_PEEK) > 0)
1052 } 1120 return false;
1053 1121
1054 virtual bool Create(int family, int type) { 1122 state_ = CS_CLOSED;
1055 // Create socket 1123 signal_close_ = false;
1056 if (!PhysicalSocket::Create(family, type)) 1124 SignalCloseEvent(this, signal_err_);
1057 return false; 1125 return true;
1058 1126 }
1059 if (!Initialize())
1060 return false;
1061
1062 do { id_ = ++next_id_; } while (id_ == 0);
1063 return true;
1064 }
1065
1066 virtual int Close() {
1067 if (s_ == INVALID_SOCKET)
1068 return 0;
1069
1070 id_ = 0;
1071 signal_close_ = false;
1072 ss_->Remove(this);
1073 return PhysicalSocket::Close();
1074 }
1075
1076 virtual uint32_t GetRequestedEvents() { return enabled_events_; }
1077
1078 virtual void OnPreEvent(uint32_t ff) {
1079 if ((ff & DE_CONNECT) != 0)
1080 state_ = CS_CONNECTED;
1081 // We set CS_CLOSED from CheckSignalClose.
1082 }
1083
1084 virtual void OnEvent(uint32_t ff, int err) {
1085 int cache_id = id_;
1086 // Make sure we deliver connect/accept first. Otherwise, consumers may see
1087 // something like a READ followed by a CONNECT, which would be odd.
1088 if (((ff & DE_CONNECT) != 0) && (id_ == cache_id)) {
1089 if (ff != DE_CONNECT)
1090 LOG(LS_VERBOSE) << "Signalled with DE_CONNECT: " << ff;
1091 enabled_events_ &= ~DE_CONNECT;
1092 #if !defined(NDEBUG)
1093 dbg_addr_ = "Connected @ ";
1094 dbg_addr_.append(GetRemoteAddress().ToString());
1095 #endif
1096 SignalConnectEvent(this);
1097 }
1098 if (((ff & DE_ACCEPT) != 0) && (id_ == cache_id)) {
1099 enabled_events_ &= ~DE_ACCEPT;
1100 SignalReadEvent(this);
1101 }
1102 if ((ff & DE_READ) != 0) {
1103 enabled_events_ &= ~DE_READ;
1104 SignalReadEvent(this);
1105 }
1106 if (((ff & DE_WRITE) != 0) && (id_ == cache_id)) {
1107 enabled_events_ &= ~DE_WRITE;
1108 SignalWriteEvent(this);
1109 }
1110 if (((ff & DE_CLOSE) != 0) && (id_ == cache_id)) {
1111 signal_close_ = true;
1112 signal_err_ = err;
1113 }
1114 }
1115
1116 virtual WSAEVENT GetWSAEvent() {
1117 return WSA_INVALID_EVENT;
1118 }
1119
1120 virtual SOCKET GetSocket() {
1121 return s_;
1122 }
1123
1124 virtual bool CheckSignalClose() {
1125 if (!signal_close_)
1126 return false;
1127
1128 char ch;
1129 if (recv(s_, &ch, 1, MSG_PEEK) > 0)
1130 return false;
1131
1132 state_ = CS_CLOSED;
1133 signal_close_ = false;
1134 SignalCloseEvent(this, signal_err_);
1135 return true;
1136 }
1137 };
1138 1127
1139 int SocketDispatcher::next_id_ = 0; 1128 int SocketDispatcher::next_id_ = 0;
1140 1129
1141 #endif // WEBRTC_WIN 1130 #endif // WEBRTC_WIN
1142 1131
1143 // Sets the value of a boolean value to false when signaled. 1132 // Sets the value of a boolean value to false when signaled.
1144 class Signaler : public EventDispatcher { 1133 class Signaler : public EventDispatcher {
1145 public: 1134 public:
1146 Signaler(PhysicalSocketServer* ss, bool* pf) 1135 Signaler(PhysicalSocketServer* ss, bool* pf)
1147 : EventDispatcher(ss), pf_(pf) { 1136 : EventDispatcher(ss), pf_(pf) {
(...skipping 35 matching lines...) Expand 10 before | Expand all | Expand 10 after
1183 Socket* PhysicalSocketServer::CreateSocket(int type) { 1172 Socket* PhysicalSocketServer::CreateSocket(int type) {
1184 return CreateSocket(AF_INET, type); 1173 return CreateSocket(AF_INET, type);
1185 } 1174 }
1186 1175
1187 Socket* PhysicalSocketServer::CreateSocket(int family, int type) { 1176 Socket* PhysicalSocketServer::CreateSocket(int family, int type) {
1188 PhysicalSocket* socket = new PhysicalSocket(this); 1177 PhysicalSocket* socket = new PhysicalSocket(this);
1189 if (socket->Create(family, type)) { 1178 if (socket->Create(family, type)) {
1190 return socket; 1179 return socket;
1191 } else { 1180 } else {
1192 delete socket; 1181 delete socket;
1193 return 0; 1182 return nullptr;
1194 } 1183 }
1195 } 1184 }
1196 1185
1197 AsyncSocket* PhysicalSocketServer::CreateAsyncSocket(int type) { 1186 AsyncSocket* PhysicalSocketServer::CreateAsyncSocket(int type) {
1198 return CreateAsyncSocket(AF_INET, type); 1187 return CreateAsyncSocket(AF_INET, type);
1199 } 1188 }
1200 1189
1201 AsyncSocket* PhysicalSocketServer::CreateAsyncSocket(int family, int type) { 1190 AsyncSocket* PhysicalSocketServer::CreateAsyncSocket(int family, int type) {
1202 SocketDispatcher* dispatcher = new SocketDispatcher(this); 1191 SocketDispatcher* dispatcher = new SocketDispatcher(this);
1203 if (dispatcher->Create(family, type)) { 1192 if (dispatcher->Create(family, type)) {
1204 return dispatcher; 1193 return dispatcher;
1205 } else { 1194 } else {
1206 delete dispatcher; 1195 delete dispatcher;
1207 return 0; 1196 return nullptr;
1208 } 1197 }
1209 } 1198 }
1210 1199
1211 AsyncSocket* PhysicalSocketServer::WrapSocket(SOCKET s) { 1200 AsyncSocket* PhysicalSocketServer::WrapSocket(SOCKET s) {
1212 SocketDispatcher* dispatcher = new SocketDispatcher(s, this); 1201 SocketDispatcher* dispatcher = new SocketDispatcher(s, this);
1213 if (dispatcher->Initialize()) { 1202 if (dispatcher->Initialize()) {
1214 return dispatcher; 1203 return dispatcher;
1215 } else { 1204 } else {
1216 delete dispatcher; 1205 delete dispatcher;
1217 return 0; 1206 return nullptr;
1218 } 1207 }
1219 } 1208 }
1220 1209
1221 void PhysicalSocketServer::Add(Dispatcher *pdispatcher) { 1210 void PhysicalSocketServer::Add(Dispatcher *pdispatcher) {
1222 CritScope cs(&crit_); 1211 CritScope cs(&crit_);
1223 // Prevent duplicates. This can cause dead dispatchers to stick around. 1212 // Prevent duplicates. This can cause dead dispatchers to stick around.
1224 DispatcherList::iterator pos = std::find(dispatchers_.begin(), 1213 DispatcherList::iterator pos = std::find(dispatchers_.begin(),
1225 dispatchers_.end(), 1214 dispatchers_.end(),
1226 pdispatcher); 1215 pdispatcher);
1227 if (pos != dispatchers_.end()) 1216 if (pos != dispatchers_.end())
(...skipping 108 matching lines...) Expand 10 before | Expand all | Expand 10 after
1336 } else { 1325 } else {
1337 // We have signaled descriptors 1326 // We have signaled descriptors
1338 CritScope cr(&crit_); 1327 CritScope cr(&crit_);
1339 for (size_t i = 0; i < dispatchers_.size(); ++i) { 1328 for (size_t i = 0; i < dispatchers_.size(); ++i) {
1340 Dispatcher *pdispatcher = dispatchers_[i]; 1329 Dispatcher *pdispatcher = dispatchers_[i];
1341 int fd = pdispatcher->GetDescriptor(); 1330 int fd = pdispatcher->GetDescriptor();
1342 uint32_t ff = 0; 1331 uint32_t ff = 0;
1343 int errcode = 0; 1332 int errcode = 0;
1344 1333
1345 // Reap any error code, which can be signaled through reads or writes. 1334 // Reap any error code, which can be signaled through reads or writes.
1346 // TODO: Should we set errcode if getsockopt fails? 1335 // TODO(pthatcher): Should we set errcode if getsockopt fails?
1347 if (FD_ISSET(fd, &fdsRead) || FD_ISSET(fd, &fdsWrite)) { 1336 if (FD_ISSET(fd, &fdsRead) || FD_ISSET(fd, &fdsWrite)) {
1348 socklen_t len = sizeof(errcode); 1337 socklen_t len = sizeof(errcode);
1349 ::getsockopt(fd, SOL_SOCKET, SO_ERROR, &errcode, &len); 1338 ::getsockopt(fd, SOL_SOCKET, SO_ERROR, &errcode, &len);
1350 } 1339 }
1351 1340
1352 // Check readable descriptors. If we're waiting on an accept, signal 1341 // Check readable descriptors. If we're waiting on an accept, signal
1353 // that. Otherwise we're waiting for data, check to see if we're 1342 // that. Otherwise we're waiting for data, check to see if we're
1354 // readable or really closed. 1343 // readable or really closed.
1355 // TODO: Only peek at TCP descriptors. 1344 // TODO(pthatcher): Only peek at TCP descriptors.
1356 if (FD_ISSET(fd, &fdsRead)) { 1345 if (FD_ISSET(fd, &fdsRead)) {
1357 FD_CLR(fd, &fdsRead); 1346 FD_CLR(fd, &fdsRead);
1358 if (pdispatcher->GetRequestedEvents() & DE_ACCEPT) { 1347 if (pdispatcher->GetRequestedEvents() & DE_ACCEPT) {
1359 ff |= DE_ACCEPT; 1348 ff |= DE_ACCEPT;
1360 } else if (errcode || pdispatcher->IsDescriptorClosed()) { 1349 } else if (errcode || pdispatcher->IsDescriptorClosed()) {
1361 ff |= DE_CLOSE; 1350 ff |= DE_CLOSE;
1362 } else { 1351 } else {
1363 ff |= DE_READ; 1352 ff |= DE_READ;
1364 } 1353 }
1365 } 1354 }
(...skipping 153 matching lines...) Expand 10 before | Expand all | Expand 10 after
1519 1508
1520 // Wait for one of the events to signal 1509 // Wait for one of the events to signal
1521 DWORD dw = WSAWaitForMultipleEvents(static_cast<DWORD>(events.size()), 1510 DWORD dw = WSAWaitForMultipleEvents(static_cast<DWORD>(events.size()),
1522 &events[0], 1511 &events[0],
1523 false, 1512 false,
1524 cmsNext, 1513 cmsNext,
1525 false); 1514 false);
1526 1515
1527 if (dw == WSA_WAIT_FAILED) { 1516 if (dw == WSA_WAIT_FAILED) {
1528 // Failed? 1517 // Failed?
1529 // TODO: need a better strategy than this! 1518 // TODO(pthatcher): need a better strategy than this!
1530 WSAGetLastError(); 1519 WSAGetLastError();
1531 ASSERT(false); 1520 ASSERT(false);
1532 return false; 1521 return false;
1533 } else if (dw == WSA_WAIT_TIMEOUT) { 1522 } else if (dw == WSA_WAIT_TIMEOUT) {
1534 // Timeout? 1523 // Timeout?
1535 return true; 1524 return true;
1536 } else { 1525 } else {
1537 // Figure out which one it is and call it 1526 // Figure out which one it is and call it
1538 CritScope cr(&crit_); 1527 CritScope cr(&crit_);
1539 int index = dw - WSA_WAIT_EVENT_0; 1528 int index = dw - WSA_WAIT_EVENT_0;
(...skipping 88 matching lines...) Expand 10 before | Expand all | Expand 10 after
1628 break; 1617 break;
1629 } 1618 }
1630 } 1619 }
1631 1620
1632 // Done 1621 // Done
1633 return true; 1622 return true;
1634 } 1623 }
1635 #endif // WEBRTC_WIN 1624 #endif // WEBRTC_WIN
1636 1625
1637 } // namespace rtc 1626 } // namespace rtc
OLDNEW
« no previous file with comments | « webrtc/base/physicalsocketserver.h ('k') | webrtc/base/physicalsocketserver_unittest.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698