OLD | NEW |
1 /* | 1 /* |
2 * Copyright 2004 The WebRTC Project Authors. All rights reserved. | 2 * Copyright 2004 The WebRTC Project Authors. All rights reserved. |
3 * | 3 * |
4 * Use of this source code is governed by a BSD-style license | 4 * Use of this source code is governed by a BSD-style license |
5 * that can be found in the LICENSE file in the root of the source | 5 * that can be found in the LICENSE file in the root of the source |
6 * tree. An additional intellectual property rights grant can be found | 6 * tree. An additional intellectual property rights grant can be found |
7 * in the file PATENTS. All contributing project authors may | 7 * in the file PATENTS. All contributing project authors may |
8 * be found in the AUTHORS file in the root of the source tree. | 8 * be found in the AUTHORS file in the root of the source tree. |
9 */ | 9 */ |
10 | 10 |
(...skipping 26 matching lines...) Expand all Loading... |
37 #endif | 37 #endif |
38 | 38 |
39 #include <algorithm> | 39 #include <algorithm> |
40 #include <map> | 40 #include <map> |
41 | 41 |
42 #include "webrtc/base/arraysize.h" | 42 #include "webrtc/base/arraysize.h" |
43 #include "webrtc/base/basictypes.h" | 43 #include "webrtc/base/basictypes.h" |
44 #include "webrtc/base/byteorder.h" | 44 #include "webrtc/base/byteorder.h" |
45 #include "webrtc/base/common.h" | 45 #include "webrtc/base/common.h" |
46 #include "webrtc/base/logging.h" | 46 #include "webrtc/base/logging.h" |
47 #include "webrtc/base/nethelpers.h" | |
48 #include "webrtc/base/physicalsocketserver.h" | 47 #include "webrtc/base/physicalsocketserver.h" |
49 #include "webrtc/base/timeutils.h" | 48 #include "webrtc/base/timeutils.h" |
50 #include "webrtc/base/winping.h" | 49 #include "webrtc/base/winping.h" |
51 #include "webrtc/base/win32socketinit.h" | 50 #include "webrtc/base/win32socketinit.h" |
52 | 51 |
53 // stm: this will tell us if we are on OSX | 52 // stm: this will tell us if we are on OSX |
54 #ifdef HAVE_CONFIG_H | 53 #ifdef HAVE_CONFIG_H |
55 #include "config.h" | 54 #include "config.h" |
56 #endif | 55 #endif |
57 | 56 |
(...skipping 32 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
90 68, // Official minimum | 89 68, // Official minimum |
91 0, // End of list marker | 90 0, // End of list marker |
92 }; | 91 }; |
93 | 92 |
94 static const int IP_HEADER_SIZE = 20u; | 93 static const int IP_HEADER_SIZE = 20u; |
95 static const int IPV6_HEADER_SIZE = 40u; | 94 static const int IPV6_HEADER_SIZE = 40u; |
96 static const int ICMP_HEADER_SIZE = 8u; | 95 static const int ICMP_HEADER_SIZE = 8u; |
97 static const int ICMP_PING_TIMEOUT_MILLIS = 10000u; | 96 static const int ICMP_PING_TIMEOUT_MILLIS = 10000u; |
98 #endif | 97 #endif |
99 | 98 |
100 class PhysicalSocket : public AsyncSocket, public sigslot::has_slots<> { | 99 PhysicalSocket::PhysicalSocket(PhysicalSocketServer* ss, SOCKET s) |
101 public: | 100 : ss_(ss), s_(s), enabled_events_(0), error_(0), |
102 PhysicalSocket(PhysicalSocketServer* ss, SOCKET s = INVALID_SOCKET) | 101 state_((s == INVALID_SOCKET) ? CS_CLOSED : CS_CONNECTED), |
103 : ss_(ss), s_(s), enabled_events_(0), error_(0), | 102 resolver_(nullptr) { |
104 state_((s == INVALID_SOCKET) ? CS_CLOSED : CS_CONNECTED), | |
105 resolver_(NULL) { | |
106 #if defined(WEBRTC_WIN) | 103 #if defined(WEBRTC_WIN) |
107 // EnsureWinsockInit() ensures that winsock is initialized. The default | 104 // EnsureWinsockInit() ensures that winsock is initialized. The default |
108 // version of this function doesn't do anything because winsock is | 105 // version of this function doesn't do anything because winsock is |
109 // initialized by constructor of a static object. If neccessary libjingle | 106 // initialized by constructor of a static object. If neccessary libjingle |
110 // users can link it with a different version of this function by replacing | 107 // users can link it with a different version of this function by replacing |
111 // win32socketinit.cc. See win32socketinit.cc for more details. | 108 // win32socketinit.cc. See win32socketinit.cc for more details. |
112 EnsureWinsockInit(); | 109 EnsureWinsockInit(); |
113 #endif | 110 #endif |
114 if (s_ != INVALID_SOCKET) { | 111 if (s_ != INVALID_SOCKET) { |
115 enabled_events_ = DE_READ | DE_WRITE; | 112 enabled_events_ = DE_READ | DE_WRITE; |
116 | 113 |
117 int type = SOCK_STREAM; | 114 int type = SOCK_STREAM; |
118 socklen_t len = sizeof(type); | 115 socklen_t len = sizeof(type); |
119 VERIFY(0 == getsockopt(s_, SOL_SOCKET, SO_TYPE, (SockOptArg)&type, &len)); | 116 VERIFY(0 == getsockopt(s_, SOL_SOCKET, SO_TYPE, (SockOptArg)&type, &len)); |
120 udp_ = (SOCK_DGRAM == type); | |
121 } | |
122 } | |
123 | |
124 ~PhysicalSocket() override { | |
125 Close(); | |
126 } | |
127 | |
128 // Creates the underlying OS socket (same as the "socket" function). | |
129 virtual bool Create(int family, int type) { | |
130 Close(); | |
131 s_ = ::socket(family, type, 0); | |
132 udp_ = (SOCK_DGRAM == type); | 117 udp_ = (SOCK_DGRAM == type); |
133 UpdateLastError(); | 118 } |
134 if (udp_) | 119 } |
135 enabled_events_ = DE_READ | DE_WRITE; | 120 |
136 return s_ != INVALID_SOCKET; | 121 PhysicalSocket::~PhysicalSocket() { |
137 } | 122 Close(); |
138 | 123 } |
139 SocketAddress GetLocalAddress() const override { | 124 |
140 sockaddr_storage addr_storage = {0}; | 125 bool PhysicalSocket::Create(int family, int type) { |
141 socklen_t addrlen = sizeof(addr_storage); | 126 Close(); |
142 sockaddr* addr = reinterpret_cast<sockaddr*>(&addr_storage); | 127 s_ = ::socket(family, type, 0); |
143 int result = ::getsockname(s_, addr, &addrlen); | 128 udp_ = (SOCK_DGRAM == type); |
144 SocketAddress address; | 129 UpdateLastError(); |
145 if (result >= 0) { | 130 if (udp_) |
146 SocketAddressFromSockAddrStorage(addr_storage, &address); | 131 enabled_events_ = DE_READ | DE_WRITE; |
147 } else { | 132 return s_ != INVALID_SOCKET; |
148 LOG(LS_WARNING) << "GetLocalAddress: unable to get local addr, socket=" | 133 } |
149 << s_; | 134 |
150 } | 135 SocketAddress PhysicalSocket::GetLocalAddress() const { |
151 return address; | 136 sockaddr_storage addr_storage = {0}; |
152 } | 137 socklen_t addrlen = sizeof(addr_storage); |
153 | 138 sockaddr* addr = reinterpret_cast<sockaddr*>(&addr_storage); |
154 SocketAddress GetRemoteAddress() const override { | 139 int result = ::getsockname(s_, addr, &addrlen); |
155 sockaddr_storage addr_storage = {0}; | 140 SocketAddress address; |
156 socklen_t addrlen = sizeof(addr_storage); | 141 if (result >= 0) { |
157 sockaddr* addr = reinterpret_cast<sockaddr*>(&addr_storage); | 142 SocketAddressFromSockAddrStorage(addr_storage, &address); |
158 int result = ::getpeername(s_, addr, &addrlen); | 143 } else { |
159 SocketAddress address; | 144 LOG(LS_WARNING) << "GetLocalAddress: unable to get local addr, socket=" |
160 if (result >= 0) { | 145 << s_; |
161 SocketAddressFromSockAddrStorage(addr_storage, &address); | 146 } |
162 } else { | 147 return address; |
163 LOG(LS_WARNING) << "GetRemoteAddress: unable to get remote addr, socket=" | 148 } |
164 << s_; | 149 |
165 } | 150 SocketAddress PhysicalSocket::GetRemoteAddress() const { |
166 return address; | 151 sockaddr_storage addr_storage = {0}; |
167 } | 152 socklen_t addrlen = sizeof(addr_storage); |
168 | 153 sockaddr* addr = reinterpret_cast<sockaddr*>(&addr_storage); |
169 int Bind(const SocketAddress& bind_addr) override { | 154 int result = ::getpeername(s_, addr, &addrlen); |
170 sockaddr_storage addr_storage; | 155 SocketAddress address; |
171 size_t len = bind_addr.ToSockAddrStorage(&addr_storage); | 156 if (result >= 0) { |
172 sockaddr* addr = reinterpret_cast<sockaddr*>(&addr_storage); | 157 SocketAddressFromSockAddrStorage(addr_storage, &address); |
173 int err = ::bind(s_, addr, static_cast<int>(len)); | 158 } else { |
174 UpdateLastError(); | 159 LOG(LS_WARNING) << "GetRemoteAddress: unable to get remote addr, socket=" |
| 160 << s_; |
| 161 } |
| 162 return address; |
| 163 } |
| 164 |
| 165 int PhysicalSocket::Bind(const SocketAddress& bind_addr) { |
| 166 sockaddr_storage addr_storage; |
| 167 size_t len = bind_addr.ToSockAddrStorage(&addr_storage); |
| 168 sockaddr* addr = reinterpret_cast<sockaddr*>(&addr_storage); |
| 169 int err = ::bind(s_, addr, static_cast<int>(len)); |
| 170 UpdateLastError(); |
175 #if !defined(NDEBUG) | 171 #if !defined(NDEBUG) |
176 if (0 == err) { | 172 if (0 == err) { |
177 dbg_addr_ = "Bound @ "; | 173 dbg_addr_ = "Bound @ "; |
178 dbg_addr_.append(GetLocalAddress().ToString()); | 174 dbg_addr_.append(GetLocalAddress().ToString()); |
179 } | 175 } |
180 #endif | 176 #endif |
181 return err; | 177 return err; |
182 } | 178 } |
183 | 179 |
184 int Connect(const SocketAddress& addr) override { | 180 int PhysicalSocket::Connect(const SocketAddress& addr) { |
185 // TODO: Implicit creation is required to reconnect... | 181 // TODO(pthatcher): Implicit creation is required to reconnect... |
186 // ...but should we make it more explicit? | 182 // ...but should we make it more explicit? |
187 if (state_ != CS_CLOSED) { | 183 if (state_ != CS_CLOSED) { |
188 SetError(EALREADY); | 184 SetError(EALREADY); |
189 return SOCKET_ERROR; | 185 return SOCKET_ERROR; |
190 } | 186 } |
191 if (addr.IsUnresolvedIP()) { | 187 if (addr.IsUnresolvedIP()) { |
192 LOG(LS_VERBOSE) << "Resolving addr in PhysicalSocket::Connect"; | 188 LOG(LS_VERBOSE) << "Resolving addr in PhysicalSocket::Connect"; |
193 resolver_ = new AsyncResolver(); | 189 resolver_ = new AsyncResolver(); |
194 resolver_->SignalDone.connect(this, &PhysicalSocket::OnResolveResult); | 190 resolver_->SignalDone.connect(this, &PhysicalSocket::OnResolveResult); |
195 resolver_->Start(addr); | 191 resolver_->Start(addr); |
196 state_ = CS_CONNECTING; | 192 state_ = CS_CONNECTING; |
| 193 return 0; |
| 194 } |
| 195 |
| 196 return DoConnect(addr); |
| 197 } |
| 198 |
| 199 int PhysicalSocket::DoConnect(const SocketAddress& connect_addr) { |
| 200 if ((s_ == INVALID_SOCKET) && |
| 201 !Create(connect_addr.family(), SOCK_STREAM)) { |
| 202 return SOCKET_ERROR; |
| 203 } |
| 204 sockaddr_storage addr_storage; |
| 205 size_t len = connect_addr.ToSockAddrStorage(&addr_storage); |
| 206 sockaddr* addr = reinterpret_cast<sockaddr*>(&addr_storage); |
| 207 int err = ::connect(s_, addr, static_cast<int>(len)); |
| 208 UpdateLastError(); |
| 209 if (err == 0) { |
| 210 state_ = CS_CONNECTED; |
| 211 } else if (IsBlockingError(GetError())) { |
| 212 state_ = CS_CONNECTING; |
| 213 enabled_events_ |= DE_CONNECT; |
| 214 } else { |
| 215 return SOCKET_ERROR; |
| 216 } |
| 217 |
| 218 enabled_events_ |= DE_READ | DE_WRITE; |
| 219 return 0; |
| 220 } |
| 221 |
| 222 int PhysicalSocket::GetError() const { |
| 223 CritScope cs(&crit_); |
| 224 return error_; |
| 225 } |
| 226 |
| 227 void PhysicalSocket::SetError(int error) { |
| 228 CritScope cs(&crit_); |
| 229 error_ = error; |
| 230 } |
| 231 |
| 232 AsyncSocket::ConnState PhysicalSocket::GetState() const { |
| 233 return state_; |
| 234 } |
| 235 |
| 236 int PhysicalSocket::GetOption(Option opt, int* value) { |
| 237 int slevel; |
| 238 int sopt; |
| 239 if (TranslateOption(opt, &slevel, &sopt) == -1) |
| 240 return -1; |
| 241 socklen_t optlen = sizeof(*value); |
| 242 int ret = ::getsockopt(s_, slevel, sopt, (SockOptArg)value, &optlen); |
| 243 if (ret != -1 && opt == OPT_DONTFRAGMENT) { |
| 244 #if defined(WEBRTC_LINUX) && !defined(WEBRTC_ANDROID) |
| 245 *value = (*value != IP_PMTUDISC_DONT) ? 1 : 0; |
| 246 #endif |
| 247 } |
| 248 return ret; |
| 249 } |
| 250 |
| 251 int PhysicalSocket::SetOption(Option opt, int value) { |
| 252 int slevel; |
| 253 int sopt; |
| 254 if (TranslateOption(opt, &slevel, &sopt) == -1) |
| 255 return -1; |
| 256 if (opt == OPT_DONTFRAGMENT) { |
| 257 #if defined(WEBRTC_LINUX) && !defined(WEBRTC_ANDROID) |
| 258 value = (value) ? IP_PMTUDISC_DO : IP_PMTUDISC_DONT; |
| 259 #endif |
| 260 } |
| 261 return ::setsockopt(s_, slevel, sopt, (SockOptArg)&value, sizeof(value)); |
| 262 } |
| 263 |
| 264 int PhysicalSocket::Send(const void* pv, size_t cb) { |
| 265 int sent = ::send(s_, reinterpret_cast<const char *>(pv), (int)cb, |
| 266 #if defined(WEBRTC_LINUX) && !defined(WEBRTC_ANDROID) |
| 267 // Suppress SIGPIPE. Without this, attempting to send on a socket whose |
| 268 // other end is closed will result in a SIGPIPE signal being raised to |
| 269 // our process, which by default will terminate the process, which we |
| 270 // don't want. By specifying this flag, we'll just get the error EPIPE |
| 271 // instead and can handle the error gracefully. |
| 272 MSG_NOSIGNAL |
| 273 #else |
| 274 0 |
| 275 #endif |
| 276 ); |
| 277 UpdateLastError(); |
| 278 MaybeRemapSendError(); |
| 279 // We have seen minidumps where this may be false. |
| 280 ASSERT(sent <= static_cast<int>(cb)); |
| 281 if ((sent < 0) && IsBlockingError(GetError())) { |
| 282 enabled_events_ |= DE_WRITE; |
| 283 } |
| 284 return sent; |
| 285 } |
| 286 |
| 287 int PhysicalSocket::SendTo(const void* buffer, |
| 288 size_t length, |
| 289 const SocketAddress& addr) { |
| 290 sockaddr_storage saddr; |
| 291 size_t len = addr.ToSockAddrStorage(&saddr); |
| 292 int sent = ::sendto( |
| 293 s_, static_cast<const char *>(buffer), static_cast<int>(length), |
| 294 #if defined(WEBRTC_LINUX) && !defined(WEBRTC_ANDROID) |
| 295 // Suppress SIGPIPE. See above for explanation. |
| 296 MSG_NOSIGNAL, |
| 297 #else |
| 298 0, |
| 299 #endif |
| 300 reinterpret_cast<sockaddr*>(&saddr), static_cast<int>(len)); |
| 301 UpdateLastError(); |
| 302 MaybeRemapSendError(); |
| 303 // We have seen minidumps where this may be false. |
| 304 ASSERT(sent <= static_cast<int>(length)); |
| 305 if ((sent < 0) && IsBlockingError(GetError())) { |
| 306 enabled_events_ |= DE_WRITE; |
| 307 } |
| 308 return sent; |
| 309 } |
| 310 |
| 311 int PhysicalSocket::Recv(void* buffer, size_t length) { |
| 312 int received = ::recv(s_, static_cast<char*>(buffer), |
| 313 static_cast<int>(length), 0); |
| 314 if ((received == 0) && (length != 0)) { |
| 315 // Note: on graceful shutdown, recv can return 0. In this case, we |
| 316 // pretend it is blocking, and then signal close, so that simplifying |
| 317 // assumptions can be made about Recv. |
| 318 LOG(LS_WARNING) << "EOF from socket; deferring close event"; |
| 319 // Must turn this back on so that the select() loop will notice the close |
| 320 // event. |
| 321 enabled_events_ |= DE_READ; |
| 322 SetError(EWOULDBLOCK); |
| 323 return SOCKET_ERROR; |
| 324 } |
| 325 UpdateLastError(); |
| 326 int error = GetError(); |
| 327 bool success = (received >= 0) || IsBlockingError(error); |
| 328 if (udp_ || success) { |
| 329 enabled_events_ |= DE_READ; |
| 330 } |
| 331 if (!success) { |
| 332 LOG_F(LS_VERBOSE) << "Error = " << error; |
| 333 } |
| 334 return received; |
| 335 } |
| 336 |
| 337 int PhysicalSocket::RecvFrom(void* buffer, |
| 338 size_t length, |
| 339 SocketAddress* out_addr) { |
| 340 sockaddr_storage addr_storage; |
| 341 socklen_t addr_len = sizeof(addr_storage); |
| 342 sockaddr* addr = reinterpret_cast<sockaddr*>(&addr_storage); |
| 343 int received = ::recvfrom(s_, static_cast<char*>(buffer), |
| 344 static_cast<int>(length), 0, addr, &addr_len); |
| 345 UpdateLastError(); |
| 346 if ((received >= 0) && (out_addr != nullptr)) |
| 347 SocketAddressFromSockAddrStorage(addr_storage, out_addr); |
| 348 int error = GetError(); |
| 349 bool success = (received >= 0) || IsBlockingError(error); |
| 350 if (udp_ || success) { |
| 351 enabled_events_ |= DE_READ; |
| 352 } |
| 353 if (!success) { |
| 354 LOG_F(LS_VERBOSE) << "Error = " << error; |
| 355 } |
| 356 return received; |
| 357 } |
| 358 |
| 359 int PhysicalSocket::Listen(int backlog) { |
| 360 int err = ::listen(s_, backlog); |
| 361 UpdateLastError(); |
| 362 if (err == 0) { |
| 363 state_ = CS_CONNECTING; |
| 364 enabled_events_ |= DE_ACCEPT; |
| 365 #if !defined(NDEBUG) |
| 366 dbg_addr_ = "Listening @ "; |
| 367 dbg_addr_.append(GetLocalAddress().ToString()); |
| 368 #endif |
| 369 } |
| 370 return err; |
| 371 } |
| 372 |
| 373 AsyncSocket* PhysicalSocket::Accept(SocketAddress* out_addr) { |
| 374 // Always re-subscribe DE_ACCEPT to make sure new incoming connections will |
| 375 // trigger an event even if DoAccept returns an error here. |
| 376 enabled_events_ |= DE_ACCEPT; |
| 377 sockaddr_storage addr_storage; |
| 378 socklen_t addr_len = sizeof(addr_storage); |
| 379 sockaddr* addr = reinterpret_cast<sockaddr*>(&addr_storage); |
| 380 SOCKET s = DoAccept(s_, addr, &addr_len); |
| 381 UpdateLastError(); |
| 382 if (s == INVALID_SOCKET) |
| 383 return nullptr; |
| 384 if (out_addr != nullptr) |
| 385 SocketAddressFromSockAddrStorage(addr_storage, out_addr); |
| 386 return ss_->WrapSocket(s); |
| 387 } |
| 388 |
| 389 int PhysicalSocket::Close() { |
| 390 if (s_ == INVALID_SOCKET) |
| 391 return 0; |
| 392 int err = ::closesocket(s_); |
| 393 UpdateLastError(); |
| 394 s_ = INVALID_SOCKET; |
| 395 state_ = CS_CLOSED; |
| 396 enabled_events_ = 0; |
| 397 if (resolver_) { |
| 398 resolver_->Destroy(false); |
| 399 resolver_ = nullptr; |
| 400 } |
| 401 return err; |
| 402 } |
| 403 |
| 404 int PhysicalSocket::EstimateMTU(uint16_t* mtu) { |
| 405 SocketAddress addr = GetRemoteAddress(); |
| 406 if (addr.IsAnyIP()) { |
| 407 SetError(ENOTCONN); |
| 408 return -1; |
| 409 } |
| 410 |
| 411 #if defined(WEBRTC_WIN) |
| 412 // Gets the interface MTU (TTL=1) for the interface used to reach |addr|. |
| 413 WinPing ping; |
| 414 if (!ping.IsValid()) { |
| 415 SetError(EINVAL); // can't think of a better error ID |
| 416 return -1; |
| 417 } |
| 418 int header_size = ICMP_HEADER_SIZE; |
| 419 if (addr.family() == AF_INET6) { |
| 420 header_size += IPV6_HEADER_SIZE; |
| 421 } else if (addr.family() == AF_INET) { |
| 422 header_size += IP_HEADER_SIZE; |
| 423 } |
| 424 |
| 425 for (int level = 0; PACKET_MAXIMUMS[level + 1] > 0; ++level) { |
| 426 int32_t size = PACKET_MAXIMUMS[level] - header_size; |
| 427 WinPing::PingResult result = ping.Ping(addr.ipaddr(), size, |
| 428 ICMP_PING_TIMEOUT_MILLIS, |
| 429 1, false); |
| 430 if (result == WinPing::PING_FAIL) { |
| 431 SetError(EINVAL); // can't think of a better error ID |
| 432 return -1; |
| 433 } else if (result != WinPing::PING_TOO_LARGE) { |
| 434 *mtu = PACKET_MAXIMUMS[level]; |
197 return 0; | 435 return 0; |
198 } | 436 } |
199 | 437 } |
200 return DoConnect(addr); | 438 |
201 } | 439 ASSERT(false); |
202 | 440 return -1; |
203 int DoConnect(const SocketAddress& connect_addr) { | 441 #elif defined(WEBRTC_MAC) |
204 if ((s_ == INVALID_SOCKET) && | 442 // No simple way to do this on Mac OS X. |
205 !Create(connect_addr.family(), SOCK_STREAM)) { | 443 // SIOCGIFMTU would work if we knew which interface would be used, but |
206 return SOCKET_ERROR; | 444 // figuring that out is pretty complicated. For now we'll return an error |
207 } | 445 // and let the caller pick a default MTU. |
208 sockaddr_storage addr_storage; | 446 SetError(EINVAL); |
209 size_t len = connect_addr.ToSockAddrStorage(&addr_storage); | 447 return -1; |
210 sockaddr* addr = reinterpret_cast<sockaddr*>(&addr_storage); | 448 #elif defined(WEBRTC_LINUX) |
211 int err = ::connect(s_, addr, static_cast<int>(len)); | 449 // Gets the path MTU. |
| 450 int value; |
| 451 socklen_t vlen = sizeof(value); |
| 452 int err = getsockopt(s_, IPPROTO_IP, IP_MTU, &value, &vlen); |
| 453 if (err < 0) { |
212 UpdateLastError(); | 454 UpdateLastError(); |
213 if (err == 0) { | 455 return err; |
214 state_ = CS_CONNECTED; | 456 } |
215 } else if (IsBlockingError(GetError())) { | 457 |
216 state_ = CS_CONNECTING; | 458 ASSERT((0 <= value) && (value <= 65536)); |
217 enabled_events_ |= DE_CONNECT; | 459 *mtu = value; |
218 } else { | 460 return 0; |
219 return SOCKET_ERROR; | 461 #elif defined(__native_client__) |
220 } | 462 // Most socket operations, including this, will fail in NaCl's sandbox. |
221 | 463 error_ = EACCES; |
222 enabled_events_ |= DE_READ | DE_WRITE; | 464 return -1; |
223 return 0; | 465 #endif |
224 } | 466 } |
225 | 467 |
226 int GetError() const override { | 468 |
227 CritScope cs(&crit_); | 469 SOCKET PhysicalSocket::DoAccept(SOCKET socket, |
228 return error_; | 470 sockaddr* addr, |
229 } | 471 socklen_t* addrlen) { |
230 | 472 return ::accept(socket, addr, addrlen); |
231 void SetError(int error) override { | 473 } |
232 CritScope cs(&crit_); | 474 |
233 error_ = error; | 475 void PhysicalSocket::OnResolveResult(AsyncResolverInterface* resolver) { |
234 } | 476 if (resolver != resolver_) { |
235 | 477 return; |
236 ConnState GetState() const override { return state_; } | 478 } |
237 | 479 |
238 int GetOption(Option opt, int* value) override { | 480 int error = resolver_->GetError(); |
239 int slevel; | 481 if (error == 0) { |
240 int sopt; | 482 error = DoConnect(resolver_->address()); |
241 if (TranslateOption(opt, &slevel, &sopt) == -1) | 483 } else { |
| 484 Close(); |
| 485 } |
| 486 |
| 487 if (error) { |
| 488 SetError(error); |
| 489 SignalCloseEvent(this, error); |
| 490 } |
| 491 } |
| 492 |
| 493 void PhysicalSocket::UpdateLastError() { |
| 494 SetError(LAST_SYSTEM_ERROR); |
| 495 } |
| 496 |
| 497 void PhysicalSocket::MaybeRemapSendError() { |
| 498 #if defined(WEBRTC_MAC) |
| 499 // https://developer.apple.com/library/mac/documentation/Darwin/ |
| 500 // Reference/ManPages/man2/sendto.2.html |
| 501 // ENOBUFS - The output queue for a network interface is full. |
| 502 // This generally indicates that the interface has stopped sending, |
| 503 // but may be caused by transient congestion. |
| 504 if (GetError() == ENOBUFS) { |
| 505 SetError(EWOULDBLOCK); |
| 506 } |
| 507 #endif |
| 508 } |
| 509 |
| 510 int PhysicalSocket::TranslateOption(Option opt, int* slevel, int* sopt) { |
| 511 switch (opt) { |
| 512 case OPT_DONTFRAGMENT: |
| 513 #if defined(WEBRTC_WIN) |
| 514 *slevel = IPPROTO_IP; |
| 515 *sopt = IP_DONTFRAGMENT; |
| 516 break; |
| 517 #elif defined(WEBRTC_MAC) || defined(BSD) || defined(__native_client__) |
| 518 LOG(LS_WARNING) << "Socket::OPT_DONTFRAGMENT not supported."; |
242 return -1; | 519 return -1; |
243 socklen_t optlen = sizeof(*value); | 520 #elif defined(WEBRTC_POSIX) |
244 int ret = ::getsockopt(s_, slevel, sopt, (SockOptArg)value, &optlen); | 521 *slevel = IPPROTO_IP; |
245 if (ret != -1 && opt == OPT_DONTFRAGMENT) { | 522 *sopt = IP_MTU_DISCOVER; |
246 #if defined(WEBRTC_LINUX) && !defined(WEBRTC_ANDROID) | 523 break; |
247 *value = (*value != IP_PMTUDISC_DONT) ? 1 : 0; | 524 #endif |
248 #endif | 525 case OPT_RCVBUF: |
249 } | 526 *slevel = SOL_SOCKET; |
250 return ret; | 527 *sopt = SO_RCVBUF; |
251 } | 528 break; |
252 | 529 case OPT_SNDBUF: |
253 int SetOption(Option opt, int value) override { | 530 *slevel = SOL_SOCKET; |
254 int slevel; | 531 *sopt = SO_SNDBUF; |
255 int sopt; | 532 break; |
256 if (TranslateOption(opt, &slevel, &sopt) == -1) | 533 case OPT_NODELAY: |
| 534 *slevel = IPPROTO_TCP; |
| 535 *sopt = TCP_NODELAY; |
| 536 break; |
| 537 case OPT_DSCP: |
| 538 LOG(LS_WARNING) << "Socket::OPT_DSCP not supported."; |
257 return -1; | 539 return -1; |
258 if (opt == OPT_DONTFRAGMENT) { | 540 case OPT_RTP_SENDTIME_EXTN_ID: |
259 #if defined(WEBRTC_LINUX) && !defined(WEBRTC_ANDROID) | 541 return -1; // No logging is necessary as this not a OS socket option. |
260 value = (value) ? IP_PMTUDISC_DO : IP_PMTUDISC_DONT; | 542 default: |
261 #endif | 543 ASSERT(false); |
262 } | |
263 return ::setsockopt(s_, slevel, sopt, (SockOptArg)&value, sizeof(value)); | |
264 } | |
265 | |
266 int Send(const void* pv, size_t cb) override { | |
267 int sent = ::send(s_, reinterpret_cast<const char *>(pv), (int)cb, | |
268 #if defined(WEBRTC_LINUX) && !defined(WEBRTC_ANDROID) | |
269 // Suppress SIGPIPE. Without this, attempting to send on a socket whose | |
270 // other end is closed will result in a SIGPIPE signal being raised to | |
271 // our process, which by default will terminate the process, which we | |
272 // don't want. By specifying this flag, we'll just get the error EPIPE | |
273 // instead and can handle the error gracefully. | |
274 MSG_NOSIGNAL | |
275 #else | |
276 0 | |
277 #endif | |
278 ); | |
279 UpdateLastError(); | |
280 MaybeRemapSendError(); | |
281 // We have seen minidumps where this may be false. | |
282 ASSERT(sent <= static_cast<int>(cb)); | |
283 if ((sent < 0) && IsBlockingError(GetError())) { | |
284 enabled_events_ |= DE_WRITE; | |
285 } | |
286 return sent; | |
287 } | |
288 | |
289 int SendTo(const void* buffer, | |
290 size_t length, | |
291 const SocketAddress& addr) override { | |
292 sockaddr_storage saddr; | |
293 size_t len = addr.ToSockAddrStorage(&saddr); | |
294 int sent = ::sendto( | |
295 s_, static_cast<const char *>(buffer), static_cast<int>(length), | |
296 #if defined(WEBRTC_LINUX) && !defined(WEBRTC_ANDROID) | |
297 // Suppress SIGPIPE. See above for explanation. | |
298 MSG_NOSIGNAL, | |
299 #else | |
300 0, | |
301 #endif | |
302 reinterpret_cast<sockaddr*>(&saddr), static_cast<int>(len)); | |
303 UpdateLastError(); | |
304 MaybeRemapSendError(); | |
305 // We have seen minidumps where this may be false. | |
306 ASSERT(sent <= static_cast<int>(length)); | |
307 if ((sent < 0) && IsBlockingError(GetError())) { | |
308 enabled_events_ |= DE_WRITE; | |
309 } | |
310 return sent; | |
311 } | |
312 | |
313 int Recv(void* buffer, size_t length) override { | |
314 int received = ::recv(s_, static_cast<char*>(buffer), | |
315 static_cast<int>(length), 0); | |
316 if ((received == 0) && (length != 0)) { | |
317 // Note: on graceful shutdown, recv can return 0. In this case, we | |
318 // pretend it is blocking, and then signal close, so that simplifying | |
319 // assumptions can be made about Recv. | |
320 LOG(LS_WARNING) << "EOF from socket; deferring close event"; | |
321 // Must turn this back on so that the select() loop will notice the close | |
322 // event. | |
323 enabled_events_ |= DE_READ; | |
324 SetError(EWOULDBLOCK); | |
325 return SOCKET_ERROR; | |
326 } | |
327 UpdateLastError(); | |
328 int error = GetError(); | |
329 bool success = (received >= 0) || IsBlockingError(error); | |
330 if (udp_ || success) { | |
331 enabled_events_ |= DE_READ; | |
332 } | |
333 if (!success) { | |
334 LOG_F(LS_VERBOSE) << "Error = " << error; | |
335 } | |
336 return received; | |
337 } | |
338 | |
339 int RecvFrom(void* buffer, size_t length, SocketAddress* out_addr) override { | |
340 sockaddr_storage addr_storage; | |
341 socklen_t addr_len = sizeof(addr_storage); | |
342 sockaddr* addr = reinterpret_cast<sockaddr*>(&addr_storage); | |
343 int received = ::recvfrom(s_, static_cast<char*>(buffer), | |
344 static_cast<int>(length), 0, addr, &addr_len); | |
345 UpdateLastError(); | |
346 if ((received >= 0) && (out_addr != NULL)) | |
347 SocketAddressFromSockAddrStorage(addr_storage, out_addr); | |
348 int error = GetError(); | |
349 bool success = (received >= 0) || IsBlockingError(error); | |
350 if (udp_ || success) { | |
351 enabled_events_ |= DE_READ; | |
352 } | |
353 if (!success) { | |
354 LOG_F(LS_VERBOSE) << "Error = " << error; | |
355 } | |
356 return received; | |
357 } | |
358 | |
359 int Listen(int backlog) override { | |
360 int err = ::listen(s_, backlog); | |
361 UpdateLastError(); | |
362 if (err == 0) { | |
363 state_ = CS_CONNECTING; | |
364 enabled_events_ |= DE_ACCEPT; | |
365 #if !defined(NDEBUG) | |
366 dbg_addr_ = "Listening @ "; | |
367 dbg_addr_.append(GetLocalAddress().ToString()); | |
368 #endif | |
369 } | |
370 return err; | |
371 } | |
372 | |
373 AsyncSocket* Accept(SocketAddress* out_addr) override { | |
374 sockaddr_storage addr_storage; | |
375 socklen_t addr_len = sizeof(addr_storage); | |
376 sockaddr* addr = reinterpret_cast<sockaddr*>(&addr_storage); | |
377 SOCKET s = ::accept(s_, addr, &addr_len); | |
378 UpdateLastError(); | |
379 if (s == INVALID_SOCKET) | |
380 return NULL; | |
381 enabled_events_ |= DE_ACCEPT; | |
382 if (out_addr != NULL) | |
383 SocketAddressFromSockAddrStorage(addr_storage, out_addr); | |
384 return ss_->WrapSocket(s); | |
385 } | |
386 | |
387 int Close() override { | |
388 if (s_ == INVALID_SOCKET) | |
389 return 0; | |
390 int err = ::closesocket(s_); | |
391 UpdateLastError(); | |
392 s_ = INVALID_SOCKET; | |
393 state_ = CS_CLOSED; | |
394 enabled_events_ = 0; | |
395 if (resolver_) { | |
396 resolver_->Destroy(false); | |
397 resolver_ = NULL; | |
398 } | |
399 return err; | |
400 } | |
401 | |
402 int EstimateMTU(uint16_t* mtu) override { | |
403 SocketAddress addr = GetRemoteAddress(); | |
404 if (addr.IsAnyIP()) { | |
405 SetError(ENOTCONN); | |
406 return -1; | 544 return -1; |
407 } | 545 } |
408 | 546 return 0; |
409 #if defined(WEBRTC_WIN) | 547 } |
410 // Gets the interface MTU (TTL=1) for the interface used to reach |addr|. | |
411 WinPing ping; | |
412 if (!ping.IsValid()) { | |
413 SetError(EINVAL); // can't think of a better error ID | |
414 return -1; | |
415 } | |
416 int header_size = ICMP_HEADER_SIZE; | |
417 if (addr.family() == AF_INET6) { | |
418 header_size += IPV6_HEADER_SIZE; | |
419 } else if (addr.family() == AF_INET) { | |
420 header_size += IP_HEADER_SIZE; | |
421 } | |
422 | |
423 for (int level = 0; PACKET_MAXIMUMS[level + 1] > 0; ++level) { | |
424 int32_t size = PACKET_MAXIMUMS[level] - header_size; | |
425 WinPing::PingResult result = ping.Ping(addr.ipaddr(), size, | |
426 ICMP_PING_TIMEOUT_MILLIS, | |
427 1, false); | |
428 if (result == WinPing::PING_FAIL) { | |
429 SetError(EINVAL); // can't think of a better error ID | |
430 return -1; | |
431 } else if (result != WinPing::PING_TOO_LARGE) { | |
432 *mtu = PACKET_MAXIMUMS[level]; | |
433 return 0; | |
434 } | |
435 } | |
436 | |
437 ASSERT(false); | |
438 return -1; | |
439 #elif defined(WEBRTC_MAC) | |
440 // No simple way to do this on Mac OS X. | |
441 // SIOCGIFMTU would work if we knew which interface would be used, but | |
442 // figuring that out is pretty complicated. For now we'll return an error | |
443 // and let the caller pick a default MTU. | |
444 SetError(EINVAL); | |
445 return -1; | |
446 #elif defined(WEBRTC_LINUX) | |
447 // Gets the path MTU. | |
448 int value; | |
449 socklen_t vlen = sizeof(value); | |
450 int err = getsockopt(s_, IPPROTO_IP, IP_MTU, &value, &vlen); | |
451 if (err < 0) { | |
452 UpdateLastError(); | |
453 return err; | |
454 } | |
455 | |
456 ASSERT((0 <= value) && (value <= 65536)); | |
457 *mtu = value; | |
458 return 0; | |
459 #elif defined(__native_client__) | |
460 // Most socket operations, including this, will fail in NaCl's sandbox. | |
461 error_ = EACCES; | |
462 return -1; | |
463 #endif | |
464 } | |
465 | |
466 SocketServer* socketserver() { return ss_; } | |
467 | |
468 protected: | |
469 void OnResolveResult(AsyncResolverInterface* resolver) { | |
470 if (resolver != resolver_) { | |
471 return; | |
472 } | |
473 | |
474 int error = resolver_->GetError(); | |
475 if (error == 0) { | |
476 error = DoConnect(resolver_->address()); | |
477 } else { | |
478 Close(); | |
479 } | |
480 | |
481 if (error) { | |
482 SetError(error); | |
483 SignalCloseEvent(this, error); | |
484 } | |
485 } | |
486 | |
487 void UpdateLastError() { | |
488 SetError(LAST_SYSTEM_ERROR); | |
489 } | |
490 | |
491 void MaybeRemapSendError() { | |
492 #if defined(WEBRTC_MAC) | |
493 // https://developer.apple.com/library/mac/documentation/Darwin/ | |
494 // Reference/ManPages/man2/sendto.2.html | |
495 // ENOBUFS - The output queue for a network interface is full. | |
496 // This generally indicates that the interface has stopped sending, | |
497 // but may be caused by transient congestion. | |
498 if (GetError() == ENOBUFS) { | |
499 SetError(EWOULDBLOCK); | |
500 } | |
501 #endif | |
502 } | |
503 | |
504 static int TranslateOption(Option opt, int* slevel, int* sopt) { | |
505 switch (opt) { | |
506 case OPT_DONTFRAGMENT: | |
507 #if defined(WEBRTC_WIN) | |
508 *slevel = IPPROTO_IP; | |
509 *sopt = IP_DONTFRAGMENT; | |
510 break; | |
511 #elif defined(WEBRTC_MAC) || defined(BSD) || defined(__native_client__) | |
512 LOG(LS_WARNING) << "Socket::OPT_DONTFRAGMENT not supported."; | |
513 return -1; | |
514 #elif defined(WEBRTC_POSIX) | |
515 *slevel = IPPROTO_IP; | |
516 *sopt = IP_MTU_DISCOVER; | |
517 break; | |
518 #endif | |
519 case OPT_RCVBUF: | |
520 *slevel = SOL_SOCKET; | |
521 *sopt = SO_RCVBUF; | |
522 break; | |
523 case OPT_SNDBUF: | |
524 *slevel = SOL_SOCKET; | |
525 *sopt = SO_SNDBUF; | |
526 break; | |
527 case OPT_NODELAY: | |
528 *slevel = IPPROTO_TCP; | |
529 *sopt = TCP_NODELAY; | |
530 break; | |
531 case OPT_DSCP: | |
532 LOG(LS_WARNING) << "Socket::OPT_DSCP not supported."; | |
533 return -1; | |
534 case OPT_RTP_SENDTIME_EXTN_ID: | |
535 return -1; // No logging is necessary as this not a OS socket option. | |
536 default: | |
537 ASSERT(false); | |
538 return -1; | |
539 } | |
540 return 0; | |
541 } | |
542 | |
543 PhysicalSocketServer* ss_; | |
544 SOCKET s_; | |
545 uint8_t enabled_events_; | |
546 bool udp_; | |
547 int error_; | |
548 // Protects |error_| that is accessed from different threads. | |
549 mutable CriticalSection crit_; | |
550 ConnState state_; | |
551 AsyncResolver* resolver_; | |
552 | |
553 #if !defined(NDEBUG) | |
554 std::string dbg_addr_; | |
555 #endif | |
556 }; | |
557 | 548 |
558 #if defined(WEBRTC_POSIX) | 549 #if defined(WEBRTC_POSIX) |
559 class EventDispatcher : public Dispatcher { | 550 class EventDispatcher : public Dispatcher { |
560 public: | 551 public: |
561 EventDispatcher(PhysicalSocketServer* ss) : ss_(ss), fSignaled_(false) { | 552 EventDispatcher(PhysicalSocketServer* ss) : ss_(ss), fSignaled_(false) { |
562 if (pipe(afd_) < 0) | 553 if (pipe(afd_) < 0) |
563 LOG(LERROR) << "pipe failed"; | 554 LOG(LERROR) << "pipe failed"; |
564 ss_->Add(this); | 555 ss_->Add(this); |
565 } | 556 } |
566 | 557 |
(...skipping 217 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
784 } | 775 } |
785 | 776 |
786 private: | 777 private: |
787 typedef std::map<int, void (*)(int)> HandlerMap; | 778 typedef std::map<int, void (*)(int)> HandlerMap; |
788 | 779 |
789 HandlerMap handlers_; | 780 HandlerMap handlers_; |
790 // Our owner. | 781 // Our owner. |
791 PhysicalSocketServer *owner_; | 782 PhysicalSocketServer *owner_; |
792 }; | 783 }; |
793 | 784 |
794 class SocketDispatcher : public Dispatcher, public PhysicalSocket { | 785 SocketDispatcher::SocketDispatcher(PhysicalSocketServer *ss) |
795 public: | 786 : PhysicalSocket(ss) { |
796 explicit SocketDispatcher(PhysicalSocketServer *ss) : PhysicalSocket(ss) { | 787 } |
797 } | |
798 SocketDispatcher(SOCKET s, PhysicalSocketServer *ss) : PhysicalSocket(ss, s) { | |
799 } | |
800 | 788 |
801 ~SocketDispatcher() override { | 789 SocketDispatcher::SocketDispatcher(SOCKET s, PhysicalSocketServer *ss) |
802 Close(); | 790 : PhysicalSocket(ss, s) { |
803 } | 791 } |
804 | 792 |
805 bool Initialize() { | 793 SocketDispatcher::~SocketDispatcher() { |
806 ss_->Add(this); | 794 Close(); |
807 fcntl(s_, F_SETFL, fcntl(s_, F_GETFL, 0) | O_NONBLOCK); | 795 } |
| 796 |
| 797 bool SocketDispatcher::Initialize() { |
| 798 ss_->Add(this); |
| 799 fcntl(s_, F_SETFL, fcntl(s_, F_GETFL, 0) | O_NONBLOCK); |
| 800 return true; |
| 801 } |
| 802 |
| 803 bool SocketDispatcher::Create(int type) { |
| 804 return Create(AF_INET, type); |
| 805 } |
| 806 |
| 807 bool SocketDispatcher::Create(int family, int type) { |
| 808 // Change the socket to be non-blocking. |
| 809 if (!PhysicalSocket::Create(family, type)) |
| 810 return false; |
| 811 |
| 812 return Initialize(); |
| 813 } |
| 814 |
| 815 int SocketDispatcher::GetDescriptor() { |
| 816 return s_; |
| 817 } |
| 818 |
| 819 bool SocketDispatcher::IsDescriptorClosed() { |
| 820 // We don't have a reliable way of distinguishing end-of-stream |
| 821 // from readability. So test on each readable call. Is this |
| 822 // inefficient? Probably. |
| 823 char ch; |
| 824 ssize_t res = ::recv(s_, &ch, 1, MSG_PEEK); |
| 825 if (res > 0) { |
| 826 // Data available, so not closed. |
| 827 return false; |
| 828 } else if (res == 0) { |
| 829 // EOF, so closed. |
808 return true; | 830 return true; |
809 } | 831 } else { // error |
810 | 832 switch (errno) { |
811 virtual bool Create(int type) { | 833 // Returned if we've already closed s_. |
812 return Create(AF_INET, type); | 834 case EBADF: |
813 } | 835 // Returned during ungraceful peer shutdown. |
814 | 836 case ECONNRESET: |
815 bool Create(int family, int type) override { | 837 return true; |
816 // Change the socket to be non-blocking. | 838 default: |
817 if (!PhysicalSocket::Create(family, type)) | 839 // Assume that all other errors are just blocking errors, meaning the |
818 return false; | 840 // connection is still good but we just can't read from it right now. |
819 | 841 // This should only happen when connecting (and at most once), because |
820 return Initialize(); | 842 // in all other cases this function is only called if the file |
821 } | 843 // descriptor is already known to be in the readable state. However, |
822 | 844 // it's not necessary a problem if we spuriously interpret a |
823 int GetDescriptor() override { return s_; } | 845 // "connection lost"-type error as a blocking error, because typically |
824 | 846 // the next recv() will get EOF, so we'll still eventually notice that |
825 bool IsDescriptorClosed() override { | 847 // the socket is closed. |
826 // We don't have a reliable way of distinguishing end-of-stream | 848 LOG_ERR(LS_WARNING) << "Assuming benign blocking error"; |
827 // from readability. So test on each readable call. Is this | 849 return false; |
828 // inefficient? Probably. | |
829 char ch; | |
830 ssize_t res = ::recv(s_, &ch, 1, MSG_PEEK); | |
831 if (res > 0) { | |
832 // Data available, so not closed. | |
833 return false; | |
834 } else if (res == 0) { | |
835 // EOF, so closed. | |
836 return true; | |
837 } else { // error | |
838 switch (errno) { | |
839 // Returned if we've already closed s_. | |
840 case EBADF: | |
841 // Returned during ungraceful peer shutdown. | |
842 case ECONNRESET: | |
843 return true; | |
844 default: | |
845 // Assume that all other errors are just blocking errors, meaning the | |
846 // connection is still good but we just can't read from it right now. | |
847 // This should only happen when connecting (and at most once), because | |
848 // in all other cases this function is only called if the file | |
849 // descriptor is already known to be in the readable state. However, | |
850 // it's not necessary a problem if we spuriously interpret a | |
851 // "connection lost"-type error as a blocking error, because typically | |
852 // the next recv() will get EOF, so we'll still eventually notice that | |
853 // the socket is closed. | |
854 LOG_ERR(LS_WARNING) << "Assuming benign blocking error"; | |
855 return false; | |
856 } | |
857 } | 850 } |
858 } | 851 } |
| 852 } |
859 | 853 |
860 uint32_t GetRequestedEvents() override { return enabled_events_; } | 854 uint32_t SocketDispatcher::GetRequestedEvents() { |
| 855 return enabled_events_; |
| 856 } |
861 | 857 |
862 void OnPreEvent(uint32_t ff) override { | 858 void SocketDispatcher::OnPreEvent(uint32_t ff) { |
863 if ((ff & DE_CONNECT) != 0) | 859 if ((ff & DE_CONNECT) != 0) |
864 state_ = CS_CONNECTED; | 860 state_ = CS_CONNECTED; |
865 if ((ff & DE_CLOSE) != 0) | 861 if ((ff & DE_CLOSE) != 0) |
866 state_ = CS_CLOSED; | 862 state_ = CS_CLOSED; |
| 863 } |
| 864 |
| 865 void SocketDispatcher::OnEvent(uint32_t ff, int err) { |
| 866 // Make sure we deliver connect/accept first. Otherwise, consumers may see |
| 867 // something like a READ followed by a CONNECT, which would be odd. |
| 868 if ((ff & DE_CONNECT) != 0) { |
| 869 enabled_events_ &= ~DE_CONNECT; |
| 870 SignalConnectEvent(this); |
867 } | 871 } |
| 872 if ((ff & DE_ACCEPT) != 0) { |
| 873 enabled_events_ &= ~DE_ACCEPT; |
| 874 SignalReadEvent(this); |
| 875 } |
| 876 if ((ff & DE_READ) != 0) { |
| 877 enabled_events_ &= ~DE_READ; |
| 878 SignalReadEvent(this); |
| 879 } |
| 880 if ((ff & DE_WRITE) != 0) { |
| 881 enabled_events_ &= ~DE_WRITE; |
| 882 SignalWriteEvent(this); |
| 883 } |
| 884 if ((ff & DE_CLOSE) != 0) { |
| 885 // The socket is now dead to us, so stop checking it. |
| 886 enabled_events_ = 0; |
| 887 SignalCloseEvent(this, err); |
| 888 } |
| 889 } |
868 | 890 |
869 void OnEvent(uint32_t ff, int err) override { | 891 int SocketDispatcher::Close() { |
870 // Make sure we deliver connect/accept first. Otherwise, consumers may see | 892 if (s_ == INVALID_SOCKET) |
871 // something like a READ followed by a CONNECT, which would be odd. | 893 return 0; |
872 if ((ff & DE_CONNECT) != 0) { | |
873 enabled_events_ &= ~DE_CONNECT; | |
874 SignalConnectEvent(this); | |
875 } | |
876 if ((ff & DE_ACCEPT) != 0) { | |
877 enabled_events_ &= ~DE_ACCEPT; | |
878 SignalReadEvent(this); | |
879 } | |
880 if ((ff & DE_READ) != 0) { | |
881 enabled_events_ &= ~DE_READ; | |
882 SignalReadEvent(this); | |
883 } | |
884 if ((ff & DE_WRITE) != 0) { | |
885 enabled_events_ &= ~DE_WRITE; | |
886 SignalWriteEvent(this); | |
887 } | |
888 if ((ff & DE_CLOSE) != 0) { | |
889 // The socket is now dead to us, so stop checking it. | |
890 enabled_events_ = 0; | |
891 SignalCloseEvent(this, err); | |
892 } | |
893 } | |
894 | 894 |
895 int Close() override { | 895 ss_->Remove(this); |
896 if (s_ == INVALID_SOCKET) | 896 return PhysicalSocket::Close(); |
897 return 0; | 897 } |
898 | |
899 ss_->Remove(this); | |
900 return PhysicalSocket::Close(); | |
901 } | |
902 }; | |
903 | 898 |
904 class FileDispatcher: public Dispatcher, public AsyncFile { | 899 class FileDispatcher: public Dispatcher, public AsyncFile { |
905 public: | 900 public: |
906 FileDispatcher(int fd, PhysicalSocketServer *ss) : ss_(ss), fd_(fd) { | 901 FileDispatcher(int fd, PhysicalSocketServer *ss) : ss_(ss), fd_(fd) { |
907 set_readable(true); | 902 set_readable(true); |
908 | 903 |
909 ss_->Add(this); | 904 ss_->Add(this); |
910 | 905 |
911 fcntl(fd_, F_SETFL, fcntl(fd_, F_GETFL, 0) | O_NONBLOCK); | 906 fcntl(fd_, F_SETFL, fcntl(fd_, F_GETFL, 0) | O_NONBLOCK); |
912 } | 907 } |
(...skipping 95 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
1008 return INVALID_SOCKET; | 1003 return INVALID_SOCKET; |
1009 } | 1004 } |
1010 | 1005 |
1011 virtual bool CheckSignalClose() { return false; } | 1006 virtual bool CheckSignalClose() { return false; } |
1012 | 1007 |
1013 private: | 1008 private: |
1014 PhysicalSocketServer* ss_; | 1009 PhysicalSocketServer* ss_; |
1015 WSAEVENT hev_; | 1010 WSAEVENT hev_; |
1016 }; | 1011 }; |
1017 | 1012 |
1018 class SocketDispatcher : public Dispatcher, public PhysicalSocket { | 1013 SocketDispatcher::SocketDispatcher(PhysicalSocketServer* ss) |
1019 public: | 1014 : PhysicalSocket(ss), |
1020 static int next_id_; | 1015 id_(0), |
1021 int id_; | 1016 signal_close_(false) { |
1022 bool signal_close_; | 1017 } |
1023 int signal_err_; | |
1024 | 1018 |
1025 SocketDispatcher(PhysicalSocketServer* ss) | 1019 SocketDispatcher::SocketDispatcher(SOCKET s, PhysicalSocketServer* ss) |
1026 : PhysicalSocket(ss), | 1020 : PhysicalSocket(ss, s), |
1027 id_(0), | 1021 id_(0), |
1028 signal_close_(false) { | 1022 signal_close_(false) { |
| 1023 } |
| 1024 |
| 1025 SocketDispatcher::~SocketDispatcher() { |
| 1026 Close(); |
| 1027 } |
| 1028 |
| 1029 bool SocketDispatcher::Initialize() { |
| 1030 ASSERT(s_ != INVALID_SOCKET); |
| 1031 // Must be a non-blocking |
| 1032 u_long argp = 1; |
| 1033 ioctlsocket(s_, FIONBIO, &argp); |
| 1034 ss_->Add(this); |
| 1035 return true; |
| 1036 } |
| 1037 |
| 1038 bool SocketDispatcher::Create(int type) { |
| 1039 return Create(AF_INET, type); |
| 1040 } |
| 1041 |
| 1042 bool SocketDispatcher::Create(int family, int type) { |
| 1043 // Create socket |
| 1044 if (!PhysicalSocket::Create(family, type)) |
| 1045 return false; |
| 1046 |
| 1047 if (!Initialize()) |
| 1048 return false; |
| 1049 |
| 1050 do { id_ = ++next_id_; } while (id_ == 0); |
| 1051 return true; |
| 1052 } |
| 1053 |
| 1054 int SocketDispatcher::Close() { |
| 1055 if (s_ == INVALID_SOCKET) |
| 1056 return 0; |
| 1057 |
| 1058 id_ = 0; |
| 1059 signal_close_ = false; |
| 1060 ss_->Remove(this); |
| 1061 return PhysicalSocket::Close(); |
| 1062 } |
| 1063 |
| 1064 uint32_t SocketDispatcher::GetRequestedEvents() { |
| 1065 return enabled_events_; |
| 1066 } |
| 1067 |
| 1068 void SocketDispatcher::OnPreEvent(uint32_t ff) { |
| 1069 if ((ff & DE_CONNECT) != 0) |
| 1070 state_ = CS_CONNECTED; |
| 1071 // We set CS_CLOSED from CheckSignalClose. |
| 1072 } |
| 1073 |
| 1074 void SocketDispatcher::OnEvent(uint32_t ff, int err) { |
| 1075 int cache_id = id_; |
| 1076 // Make sure we deliver connect/accept first. Otherwise, consumers may see |
| 1077 // something like a READ followed by a CONNECT, which would be odd. |
| 1078 if (((ff & DE_CONNECT) != 0) && (id_ == cache_id)) { |
| 1079 if (ff != DE_CONNECT) |
| 1080 LOG(LS_VERBOSE) << "Signalled with DE_CONNECT: " << ff; |
| 1081 enabled_events_ &= ~DE_CONNECT; |
| 1082 #if !defined(NDEBUG) |
| 1083 dbg_addr_ = "Connected @ "; |
| 1084 dbg_addr_.append(GetRemoteAddress().ToString()); |
| 1085 #endif |
| 1086 SignalConnectEvent(this); |
1029 } | 1087 } |
| 1088 if (((ff & DE_ACCEPT) != 0) && (id_ == cache_id)) { |
| 1089 enabled_events_ &= ~DE_ACCEPT; |
| 1090 SignalReadEvent(this); |
| 1091 } |
| 1092 if ((ff & DE_READ) != 0) { |
| 1093 enabled_events_ &= ~DE_READ; |
| 1094 SignalReadEvent(this); |
| 1095 } |
| 1096 if (((ff & DE_WRITE) != 0) && (id_ == cache_id)) { |
| 1097 enabled_events_ &= ~DE_WRITE; |
| 1098 SignalWriteEvent(this); |
| 1099 } |
| 1100 if (((ff & DE_CLOSE) != 0) && (id_ == cache_id)) { |
| 1101 signal_close_ = true; |
| 1102 signal_err_ = err; |
| 1103 } |
| 1104 } |
1030 | 1105 |
1031 SocketDispatcher(SOCKET s, PhysicalSocketServer* ss) | 1106 WSAEVENT SocketDispatcher::GetWSAEvent() { |
1032 : PhysicalSocket(ss, s), | 1107 return WSA_INVALID_EVENT; |
1033 id_(0), | 1108 } |
1034 signal_close_(false) { | |
1035 } | |
1036 | 1109 |
1037 virtual ~SocketDispatcher() { | 1110 SOCKET SocketDispatcher::GetSocket() { |
1038 Close(); | 1111 return s_; |
1039 } | 1112 } |
1040 | 1113 |
1041 bool Initialize() { | 1114 bool SocketDispatcher::CheckSignalClose() { |
1042 ASSERT(s_ != INVALID_SOCKET); | 1115 if (!signal_close_) |
1043 // Must be a non-blocking | 1116 return false; |
1044 u_long argp = 1; | |
1045 ioctlsocket(s_, FIONBIO, &argp); | |
1046 ss_->Add(this); | |
1047 return true; | |
1048 } | |
1049 | 1117 |
1050 virtual bool Create(int type) { | 1118 char ch; |
1051 return Create(AF_INET, type); | 1119 if (recv(s_, &ch, 1, MSG_PEEK) > 0) |
1052 } | 1120 return false; |
1053 | 1121 |
1054 virtual bool Create(int family, int type) { | 1122 state_ = CS_CLOSED; |
1055 // Create socket | 1123 signal_close_ = false; |
1056 if (!PhysicalSocket::Create(family, type)) | 1124 SignalCloseEvent(this, signal_err_); |
1057 return false; | 1125 return true; |
1058 | 1126 } |
1059 if (!Initialize()) | |
1060 return false; | |
1061 | |
1062 do { id_ = ++next_id_; } while (id_ == 0); | |
1063 return true; | |
1064 } | |
1065 | |
1066 virtual int Close() { | |
1067 if (s_ == INVALID_SOCKET) | |
1068 return 0; | |
1069 | |
1070 id_ = 0; | |
1071 signal_close_ = false; | |
1072 ss_->Remove(this); | |
1073 return PhysicalSocket::Close(); | |
1074 } | |
1075 | |
1076 virtual uint32_t GetRequestedEvents() { return enabled_events_; } | |
1077 | |
1078 virtual void OnPreEvent(uint32_t ff) { | |
1079 if ((ff & DE_CONNECT) != 0) | |
1080 state_ = CS_CONNECTED; | |
1081 // We set CS_CLOSED from CheckSignalClose. | |
1082 } | |
1083 | |
1084 virtual void OnEvent(uint32_t ff, int err) { | |
1085 int cache_id = id_; | |
1086 // Make sure we deliver connect/accept first. Otherwise, consumers may see | |
1087 // something like a READ followed by a CONNECT, which would be odd. | |
1088 if (((ff & DE_CONNECT) != 0) && (id_ == cache_id)) { | |
1089 if (ff != DE_CONNECT) | |
1090 LOG(LS_VERBOSE) << "Signalled with DE_CONNECT: " << ff; | |
1091 enabled_events_ &= ~DE_CONNECT; | |
1092 #if !defined(NDEBUG) | |
1093 dbg_addr_ = "Connected @ "; | |
1094 dbg_addr_.append(GetRemoteAddress().ToString()); | |
1095 #endif | |
1096 SignalConnectEvent(this); | |
1097 } | |
1098 if (((ff & DE_ACCEPT) != 0) && (id_ == cache_id)) { | |
1099 enabled_events_ &= ~DE_ACCEPT; | |
1100 SignalReadEvent(this); | |
1101 } | |
1102 if ((ff & DE_READ) != 0) { | |
1103 enabled_events_ &= ~DE_READ; | |
1104 SignalReadEvent(this); | |
1105 } | |
1106 if (((ff & DE_WRITE) != 0) && (id_ == cache_id)) { | |
1107 enabled_events_ &= ~DE_WRITE; | |
1108 SignalWriteEvent(this); | |
1109 } | |
1110 if (((ff & DE_CLOSE) != 0) && (id_ == cache_id)) { | |
1111 signal_close_ = true; | |
1112 signal_err_ = err; | |
1113 } | |
1114 } | |
1115 | |
1116 virtual WSAEVENT GetWSAEvent() { | |
1117 return WSA_INVALID_EVENT; | |
1118 } | |
1119 | |
1120 virtual SOCKET GetSocket() { | |
1121 return s_; | |
1122 } | |
1123 | |
1124 virtual bool CheckSignalClose() { | |
1125 if (!signal_close_) | |
1126 return false; | |
1127 | |
1128 char ch; | |
1129 if (recv(s_, &ch, 1, MSG_PEEK) > 0) | |
1130 return false; | |
1131 | |
1132 state_ = CS_CLOSED; | |
1133 signal_close_ = false; | |
1134 SignalCloseEvent(this, signal_err_); | |
1135 return true; | |
1136 } | |
1137 }; | |
1138 | 1127 |
1139 int SocketDispatcher::next_id_ = 0; | 1128 int SocketDispatcher::next_id_ = 0; |
1140 | 1129 |
1141 #endif // WEBRTC_WIN | 1130 #endif // WEBRTC_WIN |
1142 | 1131 |
1143 // Sets the value of a boolean value to false when signaled. | 1132 // Sets the value of a boolean value to false when signaled. |
1144 class Signaler : public EventDispatcher { | 1133 class Signaler : public EventDispatcher { |
1145 public: | 1134 public: |
1146 Signaler(PhysicalSocketServer* ss, bool* pf) | 1135 Signaler(PhysicalSocketServer* ss, bool* pf) |
1147 : EventDispatcher(ss), pf_(pf) { | 1136 : EventDispatcher(ss), pf_(pf) { |
(...skipping 35 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
1183 Socket* PhysicalSocketServer::CreateSocket(int type) { | 1172 Socket* PhysicalSocketServer::CreateSocket(int type) { |
1184 return CreateSocket(AF_INET, type); | 1173 return CreateSocket(AF_INET, type); |
1185 } | 1174 } |
1186 | 1175 |
1187 Socket* PhysicalSocketServer::CreateSocket(int family, int type) { | 1176 Socket* PhysicalSocketServer::CreateSocket(int family, int type) { |
1188 PhysicalSocket* socket = new PhysicalSocket(this); | 1177 PhysicalSocket* socket = new PhysicalSocket(this); |
1189 if (socket->Create(family, type)) { | 1178 if (socket->Create(family, type)) { |
1190 return socket; | 1179 return socket; |
1191 } else { | 1180 } else { |
1192 delete socket; | 1181 delete socket; |
1193 return 0; | 1182 return nullptr; |
1194 } | 1183 } |
1195 } | 1184 } |
1196 | 1185 |
1197 AsyncSocket* PhysicalSocketServer::CreateAsyncSocket(int type) { | 1186 AsyncSocket* PhysicalSocketServer::CreateAsyncSocket(int type) { |
1198 return CreateAsyncSocket(AF_INET, type); | 1187 return CreateAsyncSocket(AF_INET, type); |
1199 } | 1188 } |
1200 | 1189 |
1201 AsyncSocket* PhysicalSocketServer::CreateAsyncSocket(int family, int type) { | 1190 AsyncSocket* PhysicalSocketServer::CreateAsyncSocket(int family, int type) { |
1202 SocketDispatcher* dispatcher = new SocketDispatcher(this); | 1191 SocketDispatcher* dispatcher = new SocketDispatcher(this); |
1203 if (dispatcher->Create(family, type)) { | 1192 if (dispatcher->Create(family, type)) { |
1204 return dispatcher; | 1193 return dispatcher; |
1205 } else { | 1194 } else { |
1206 delete dispatcher; | 1195 delete dispatcher; |
1207 return 0; | 1196 return nullptr; |
1208 } | 1197 } |
1209 } | 1198 } |
1210 | 1199 |
1211 AsyncSocket* PhysicalSocketServer::WrapSocket(SOCKET s) { | 1200 AsyncSocket* PhysicalSocketServer::WrapSocket(SOCKET s) { |
1212 SocketDispatcher* dispatcher = new SocketDispatcher(s, this); | 1201 SocketDispatcher* dispatcher = new SocketDispatcher(s, this); |
1213 if (dispatcher->Initialize()) { | 1202 if (dispatcher->Initialize()) { |
1214 return dispatcher; | 1203 return dispatcher; |
1215 } else { | 1204 } else { |
1216 delete dispatcher; | 1205 delete dispatcher; |
1217 return 0; | 1206 return nullptr; |
1218 } | 1207 } |
1219 } | 1208 } |
1220 | 1209 |
1221 void PhysicalSocketServer::Add(Dispatcher *pdispatcher) { | 1210 void PhysicalSocketServer::Add(Dispatcher *pdispatcher) { |
1222 CritScope cs(&crit_); | 1211 CritScope cs(&crit_); |
1223 // Prevent duplicates. This can cause dead dispatchers to stick around. | 1212 // Prevent duplicates. This can cause dead dispatchers to stick around. |
1224 DispatcherList::iterator pos = std::find(dispatchers_.begin(), | 1213 DispatcherList::iterator pos = std::find(dispatchers_.begin(), |
1225 dispatchers_.end(), | 1214 dispatchers_.end(), |
1226 pdispatcher); | 1215 pdispatcher); |
1227 if (pos != dispatchers_.end()) | 1216 if (pos != dispatchers_.end()) |
(...skipping 108 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
1336 } else { | 1325 } else { |
1337 // We have signaled descriptors | 1326 // We have signaled descriptors |
1338 CritScope cr(&crit_); | 1327 CritScope cr(&crit_); |
1339 for (size_t i = 0; i < dispatchers_.size(); ++i) { | 1328 for (size_t i = 0; i < dispatchers_.size(); ++i) { |
1340 Dispatcher *pdispatcher = dispatchers_[i]; | 1329 Dispatcher *pdispatcher = dispatchers_[i]; |
1341 int fd = pdispatcher->GetDescriptor(); | 1330 int fd = pdispatcher->GetDescriptor(); |
1342 uint32_t ff = 0; | 1331 uint32_t ff = 0; |
1343 int errcode = 0; | 1332 int errcode = 0; |
1344 | 1333 |
1345 // Reap any error code, which can be signaled through reads or writes. | 1334 // Reap any error code, which can be signaled through reads or writes. |
1346 // TODO: Should we set errcode if getsockopt fails? | 1335 // TODO(pthatcher): Should we set errcode if getsockopt fails? |
1347 if (FD_ISSET(fd, &fdsRead) || FD_ISSET(fd, &fdsWrite)) { | 1336 if (FD_ISSET(fd, &fdsRead) || FD_ISSET(fd, &fdsWrite)) { |
1348 socklen_t len = sizeof(errcode); | 1337 socklen_t len = sizeof(errcode); |
1349 ::getsockopt(fd, SOL_SOCKET, SO_ERROR, &errcode, &len); | 1338 ::getsockopt(fd, SOL_SOCKET, SO_ERROR, &errcode, &len); |
1350 } | 1339 } |
1351 | 1340 |
1352 // Check readable descriptors. If we're waiting on an accept, signal | 1341 // Check readable descriptors. If we're waiting on an accept, signal |
1353 // that. Otherwise we're waiting for data, check to see if we're | 1342 // that. Otherwise we're waiting for data, check to see if we're |
1354 // readable or really closed. | 1343 // readable or really closed. |
1355 // TODO: Only peek at TCP descriptors. | 1344 // TODO(pthatcher): Only peek at TCP descriptors. |
1356 if (FD_ISSET(fd, &fdsRead)) { | 1345 if (FD_ISSET(fd, &fdsRead)) { |
1357 FD_CLR(fd, &fdsRead); | 1346 FD_CLR(fd, &fdsRead); |
1358 if (pdispatcher->GetRequestedEvents() & DE_ACCEPT) { | 1347 if (pdispatcher->GetRequestedEvents() & DE_ACCEPT) { |
1359 ff |= DE_ACCEPT; | 1348 ff |= DE_ACCEPT; |
1360 } else if (errcode || pdispatcher->IsDescriptorClosed()) { | 1349 } else if (errcode || pdispatcher->IsDescriptorClosed()) { |
1361 ff |= DE_CLOSE; | 1350 ff |= DE_CLOSE; |
1362 } else { | 1351 } else { |
1363 ff |= DE_READ; | 1352 ff |= DE_READ; |
1364 } | 1353 } |
1365 } | 1354 } |
(...skipping 153 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
1519 | 1508 |
1520 // Wait for one of the events to signal | 1509 // Wait for one of the events to signal |
1521 DWORD dw = WSAWaitForMultipleEvents(static_cast<DWORD>(events.size()), | 1510 DWORD dw = WSAWaitForMultipleEvents(static_cast<DWORD>(events.size()), |
1522 &events[0], | 1511 &events[0], |
1523 false, | 1512 false, |
1524 cmsNext, | 1513 cmsNext, |
1525 false); | 1514 false); |
1526 | 1515 |
1527 if (dw == WSA_WAIT_FAILED) { | 1516 if (dw == WSA_WAIT_FAILED) { |
1528 // Failed? | 1517 // Failed? |
1529 // TODO: need a better strategy than this! | 1518 // TODO(pthatcher): need a better strategy than this! |
1530 WSAGetLastError(); | 1519 WSAGetLastError(); |
1531 ASSERT(false); | 1520 ASSERT(false); |
1532 return false; | 1521 return false; |
1533 } else if (dw == WSA_WAIT_TIMEOUT) { | 1522 } else if (dw == WSA_WAIT_TIMEOUT) { |
1534 // Timeout? | 1523 // Timeout? |
1535 return true; | 1524 return true; |
1536 } else { | 1525 } else { |
1537 // Figure out which one it is and call it | 1526 // Figure out which one it is and call it |
1538 CritScope cr(&crit_); | 1527 CritScope cr(&crit_); |
1539 int index = dw - WSA_WAIT_EVENT_0; | 1528 int index = dw - WSA_WAIT_EVENT_0; |
(...skipping 88 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
1628 break; | 1617 break; |
1629 } | 1618 } |
1630 } | 1619 } |
1631 | 1620 |
1632 // Done | 1621 // Done |
1633 return true; | 1622 return true; |
1634 } | 1623 } |
1635 #endif // WEBRTC_WIN | 1624 #endif // WEBRTC_WIN |
1636 | 1625 |
1637 } // namespace rtc | 1626 } // namespace rtc |
OLD | NEW |