OLD | NEW |
---|---|
1 /* | 1 /* |
2 * Copyright 2004 The WebRTC Project Authors. All rights reserved. | 2 * Copyright 2004 The WebRTC Project Authors. All rights reserved. |
3 * | 3 * |
4 * Use of this source code is governed by a BSD-style license | 4 * Use of this source code is governed by a BSD-style license |
5 * that can be found in the LICENSE file in the root of the source | 5 * that can be found in the LICENSE file in the root of the source |
6 * tree. An additional intellectual property rights grant can be found | 6 * tree. An additional intellectual property rights grant can be found |
7 * in the file PATENTS. All contributing project authors may | 7 * in the file PATENTS. All contributing project authors may |
8 * be found in the AUTHORS file in the root of the source tree. | 8 * be found in the AUTHORS file in the root of the source tree. |
9 */ | 9 */ |
10 | 10 |
(...skipping 47 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
58 MSG_ID_CONNECT, | 58 MSG_ID_CONNECT, |
59 MSG_ID_DISCONNECT, | 59 MSG_ID_DISCONNECT, |
60 }; | 60 }; |
61 | 61 |
62 // Packets are passed between sockets as messages. We copy the data just like | 62 // Packets are passed between sockets as messages. We copy the data just like |
63 // the kernel does. | 63 // the kernel does. |
64 class Packet : public MessageData { | 64 class Packet : public MessageData { |
65 public: | 65 public: |
66 Packet(const char* data, size_t size, const SocketAddress& from) | 66 Packet(const char* data, size_t size, const SocketAddress& from) |
67 : size_(size), consumed_(0), from_(from) { | 67 : size_(size), consumed_(0), from_(from) { |
68 ASSERT(NULL != data); | 68 RTC_DCHECK(NULL != data); |
69 data_ = new char[size_]; | 69 data_ = new char[size_]; |
70 memcpy(data_, data, size_); | 70 memcpy(data_, data, size_); |
71 } | 71 } |
72 | 72 |
73 ~Packet() override { | 73 ~Packet() override { |
74 delete[] data_; | 74 delete[] data_; |
75 } | 75 } |
76 | 76 |
77 const char* data() const { return data_ + consumed_; } | 77 const char* data() const { return data_ + consumed_; } |
78 size_t size() const { return size_ - consumed_; } | 78 size_t size() const { return size_ - consumed_; } |
79 const SocketAddress& from() const { return from_; } | 79 const SocketAddress& from() const { return from_; } |
80 | 80 |
81 // Remove the first size bytes from the data. | 81 // Remove the first size bytes from the data. |
82 void Consume(size_t size) { | 82 void Consume(size_t size) { |
83 ASSERT(size + consumed_ < size_); | 83 RTC_DCHECK(size + consumed_ < size_); |
84 consumed_ += size; | 84 consumed_ += size; |
85 } | 85 } |
86 | 86 |
87 private: | 87 private: |
88 char* data_; | 88 char* data_; |
89 size_t size_, consumed_; | 89 size_t size_, consumed_; |
90 SocketAddress from_; | 90 SocketAddress from_; |
91 }; | 91 }; |
92 | 92 |
93 struct MessageAddress : public MessageData { | 93 struct MessageAddress : public MessageData { |
94 explicit MessageAddress(const SocketAddress& a) : addr(a) { } | 94 explicit MessageAddress(const SocketAddress& a) : addr(a) { } |
95 SocketAddress addr; | 95 SocketAddress addr; |
96 }; | 96 }; |
97 | 97 |
98 VirtualSocket::VirtualSocket(VirtualSocketServer* server, | 98 VirtualSocket::VirtualSocket(VirtualSocketServer* server, |
99 int family, | 99 int family, |
100 int type, | 100 int type, |
101 bool async) | 101 bool async) |
102 : server_(server), | 102 : server_(server), |
103 type_(type), | 103 type_(type), |
104 async_(async), | 104 async_(async), |
105 state_(CS_CLOSED), | 105 state_(CS_CLOSED), |
106 error_(0), | 106 error_(0), |
107 listen_queue_(NULL), | 107 listen_queue_(NULL), |
108 write_enabled_(false), | |
109 network_size_(0), | 108 network_size_(0), |
110 recv_buffer_size_(0), | 109 recv_buffer_size_(0), |
111 bound_(false), | 110 bound_(false), |
112 was_any_(false) { | 111 was_any_(false) { |
113 ASSERT((type_ == SOCK_DGRAM) || (type_ == SOCK_STREAM)); | 112 RTC_DCHECK((type_ == SOCK_DGRAM) || (type_ == SOCK_STREAM)); |
114 ASSERT(async_ || (type_ != SOCK_STREAM)); // We only support async streams | 113 RTC_DCHECK(async_ || |
114 (type_ != SOCK_STREAM)); // We only support async streams | |
115 server->SignalReadyToSend.connect(this, | |
116 &VirtualSocket::OnSocketServerReadyToSend); | |
115 } | 117 } |
116 | 118 |
117 VirtualSocket::~VirtualSocket() { | 119 VirtualSocket::~VirtualSocket() { |
118 Close(); | 120 Close(); |
119 | 121 |
120 for (RecvBuffer::iterator it = recv_buffer_.begin(); it != recv_buffer_.end(); | 122 for (RecvBuffer::iterator it = recv_buffer_.begin(); it != recv_buffer_.end(); |
121 ++it) { | 123 ++it) { |
122 delete *it; | 124 delete *it; |
123 } | 125 } |
124 } | 126 } |
(...skipping 77 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
202 // Remove mapping for both directions. | 204 // Remove mapping for both directions. |
203 server_->RemoveConnection(remote_addr_, local_addr_); | 205 server_->RemoveConnection(remote_addr_, local_addr_); |
204 server_->RemoveConnection(local_addr_, remote_addr_); | 206 server_->RemoveConnection(local_addr_, remote_addr_); |
205 } | 207 } |
206 // Cancel potential connects | 208 // Cancel potential connects |
207 MessageList msgs; | 209 MessageList msgs; |
208 if (server_->msg_queue_) { | 210 if (server_->msg_queue_) { |
209 server_->msg_queue_->Clear(this, MSG_ID_CONNECT, &msgs); | 211 server_->msg_queue_->Clear(this, MSG_ID_CONNECT, &msgs); |
210 } | 212 } |
211 for (MessageList::iterator it = msgs.begin(); it != msgs.end(); ++it) { | 213 for (MessageList::iterator it = msgs.begin(); it != msgs.end(); ++it) { |
212 ASSERT(NULL != it->pdata); | 214 RTC_DCHECK(NULL != it->pdata); |
213 MessageAddress* data = static_cast<MessageAddress*>(it->pdata); | 215 MessageAddress* data = static_cast<MessageAddress*>(it->pdata); |
214 | 216 |
215 // Lookup remote side. | 217 // Lookup remote side. |
216 VirtualSocket* socket = | 218 VirtualSocket* socket = |
217 server_->LookupConnection(local_addr_, data->addr); | 219 server_->LookupConnection(local_addr_, data->addr); |
218 if (socket) { | 220 if (socket) { |
219 // Server socket, remote side is a socket retreived by | 221 // Server socket, remote side is a socket retreived by |
220 // accept. Accepted sockets are not bound so we will not | 222 // accept. Accepted sockets are not bound so we will not |
221 // find it by looking in the bindings table. | 223 // find it by looking in the bindings table. |
222 server_->Disconnect(socket); | 224 server_->Disconnect(socket); |
(...skipping 77 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
300 } else { | 302 } else { |
301 recv_buffer_.pop_front(); | 303 recv_buffer_.pop_front(); |
302 delete packet; | 304 delete packet; |
303 } | 305 } |
304 | 306 |
305 if (SOCK_STREAM == type_) { | 307 if (SOCK_STREAM == type_) { |
306 bool was_full = (recv_buffer_size_ == server_->recv_buffer_capacity_); | 308 bool was_full = (recv_buffer_size_ == server_->recv_buffer_capacity_); |
307 recv_buffer_size_ -= data_read; | 309 recv_buffer_size_ -= data_read; |
308 if (was_full) { | 310 if (was_full) { |
309 VirtualSocket* sender = server_->LookupBinding(remote_addr_); | 311 VirtualSocket* sender = server_->LookupBinding(remote_addr_); |
310 ASSERT(NULL != sender); | 312 RTC_DCHECK(NULL != sender); |
311 server_->SendTcp(sender); | 313 server_->SendTcp(sender); |
312 } | 314 } |
313 } | 315 } |
314 | 316 |
315 return static_cast<int>(data_read); | 317 return static_cast<int>(data_read); |
316 } | 318 } |
317 | 319 |
318 int VirtualSocket::Listen(int backlog) { | 320 int VirtualSocket::Listen(int backlog) { |
319 ASSERT(SOCK_STREAM == type_); | 321 RTC_DCHECK(SOCK_STREAM == type_); |
320 ASSERT(CS_CLOSED == state_); | 322 RTC_DCHECK(CS_CLOSED == state_); |
321 if (local_addr_.IsNil()) { | 323 if (local_addr_.IsNil()) { |
322 error_ = EINVAL; | 324 error_ = EINVAL; |
323 return -1; | 325 return -1; |
324 } | 326 } |
325 ASSERT(NULL == listen_queue_); | 327 RTC_DCHECK(NULL == listen_queue_); |
326 listen_queue_ = new ListenQueue; | 328 listen_queue_ = new ListenQueue; |
327 state_ = CS_CONNECTING; | 329 state_ = CS_CONNECTING; |
328 return 0; | 330 return 0; |
329 } | 331 } |
330 | 332 |
331 VirtualSocket* VirtualSocket::Accept(SocketAddress* paddr) { | 333 VirtualSocket* VirtualSocket::Accept(SocketAddress* paddr) { |
332 if (NULL == listen_queue_) { | 334 if (NULL == listen_queue_) { |
333 error_ = EINVAL; | 335 error_ = EINVAL; |
334 return NULL; | 336 return NULL; |
335 } | 337 } |
(...skipping 49 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
385 | 387 |
386 int VirtualSocket::EstimateMTU(uint16_t* mtu) { | 388 int VirtualSocket::EstimateMTU(uint16_t* mtu) { |
387 if (CS_CONNECTED != state_) | 389 if (CS_CONNECTED != state_) |
388 return ENOTCONN; | 390 return ENOTCONN; |
389 else | 391 else |
390 return 65536; | 392 return 65536; |
391 } | 393 } |
392 | 394 |
393 void VirtualSocket::OnMessage(Message* pmsg) { | 395 void VirtualSocket::OnMessage(Message* pmsg) { |
394 if (pmsg->message_id == MSG_ID_PACKET) { | 396 if (pmsg->message_id == MSG_ID_PACKET) { |
395 // ASSERT(!local_addr_.IsAnyIP()); | 397 // RTC_DCHECK(!local_addr_.IsAnyIP()); |
skvlad
2016/08/26 22:39:04
Do you happen to know why this is commented out? S
Taylor Brandstetter
2016/08/26 23:01:03
It appears this line has been commented out from t
| |
396 ASSERT(NULL != pmsg->pdata); | 398 RTC_DCHECK(NULL != pmsg->pdata); |
397 Packet* packet = static_cast<Packet*>(pmsg->pdata); | 399 Packet* packet = static_cast<Packet*>(pmsg->pdata); |
398 | 400 |
399 recv_buffer_.push_back(packet); | 401 recv_buffer_.push_back(packet); |
400 | 402 |
401 if (async_) { | 403 if (async_) { |
402 SignalReadEvent(this); | 404 SignalReadEvent(this); |
403 } | 405 } |
404 } else if (pmsg->message_id == MSG_ID_CONNECT) { | 406 } else if (pmsg->message_id == MSG_ID_CONNECT) { |
405 ASSERT(NULL != pmsg->pdata); | 407 RTC_DCHECK(NULL != pmsg->pdata); |
406 MessageAddress* data = static_cast<MessageAddress*>(pmsg->pdata); | 408 MessageAddress* data = static_cast<MessageAddress*>(pmsg->pdata); |
407 if (listen_queue_ != NULL) { | 409 if (listen_queue_ != NULL) { |
408 listen_queue_->push_back(data->addr); | 410 listen_queue_->push_back(data->addr); |
409 if (async_) { | 411 if (async_) { |
410 SignalReadEvent(this); | 412 SignalReadEvent(this); |
411 } | 413 } |
412 } else if ((SOCK_STREAM == type_) && (CS_CONNECTING == state_)) { | 414 } else if ((SOCK_STREAM == type_) && (CS_CONNECTING == state_)) { |
413 CompleteConnect(data->addr, true); | 415 CompleteConnect(data->addr, true); |
414 } else { | 416 } else { |
415 LOG(LS_VERBOSE) << "Socket at " << local_addr_ << " is not listening"; | 417 LOG(LS_VERBOSE) << "Socket at " << local_addr_ << " is not listening"; |
416 server_->Disconnect(server_->LookupBinding(data->addr)); | 418 server_->Disconnect(server_->LookupBinding(data->addr)); |
417 } | 419 } |
418 delete data; | 420 delete data; |
419 } else if (pmsg->message_id == MSG_ID_DISCONNECT) { | 421 } else if (pmsg->message_id == MSG_ID_DISCONNECT) { |
420 ASSERT(SOCK_STREAM == type_); | 422 RTC_DCHECK(SOCK_STREAM == type_); |
421 if (CS_CLOSED != state_) { | 423 if (CS_CLOSED != state_) { |
422 int error = (CS_CONNECTING == state_) ? ECONNREFUSED : 0; | 424 int error = (CS_CONNECTING == state_) ? ECONNREFUSED : 0; |
423 state_ = CS_CLOSED; | 425 state_ = CS_CLOSED; |
424 remote_addr_.Clear(); | 426 remote_addr_.Clear(); |
425 if (async_) { | 427 if (async_) { |
426 SignalCloseEvent(this, error); | 428 SignalCloseEvent(this, error); |
427 } | 429 } |
428 } | 430 } |
429 } else if (pmsg->message_id == MSG_ID_ADDRESS_BOUND) { | 431 } else if (pmsg->message_id == MSG_ID_ADDRESS_BOUND) { |
430 SignalAddressReady(this, GetLocalAddress()); | 432 SignalAddressReady(this, GetLocalAddress()); |
431 } else { | 433 } else { |
432 ASSERT(false); | 434 RTC_DCHECK(false); |
433 } | 435 } |
434 } | 436 } |
435 | 437 |
436 int VirtualSocket::InitiateConnect(const SocketAddress& addr, bool use_delay) { | 438 int VirtualSocket::InitiateConnect(const SocketAddress& addr, bool use_delay) { |
437 if (!remote_addr_.IsNil()) { | 439 if (!remote_addr_.IsNil()) { |
438 error_ = (CS_CONNECTED == state_) ? EISCONN : EINPROGRESS; | 440 error_ = (CS_CONNECTED == state_) ? EISCONN : EINPROGRESS; |
439 return -1; | 441 return -1; |
440 } | 442 } |
441 if (local_addr_.IsNil()) { | 443 if (local_addr_.IsNil()) { |
442 // If there's no local address set, grab a random one in the correct AF. | 444 // If there's no local address set, grab a random one in the correct AF. |
(...skipping 15 matching lines...) Expand all Loading... | |
458 if (result != 0) { | 460 if (result != 0) { |
459 error_ = EHOSTUNREACH; | 461 error_ = EHOSTUNREACH; |
460 return -1; | 462 return -1; |
461 } | 463 } |
462 state_ = CS_CONNECTING; | 464 state_ = CS_CONNECTING; |
463 } | 465 } |
464 return 0; | 466 return 0; |
465 } | 467 } |
466 | 468 |
467 void VirtualSocket::CompleteConnect(const SocketAddress& addr, bool notify) { | 469 void VirtualSocket::CompleteConnect(const SocketAddress& addr, bool notify) { |
468 ASSERT(CS_CONNECTING == state_); | 470 RTC_DCHECK(CS_CONNECTING == state_); |
469 remote_addr_ = addr; | 471 remote_addr_ = addr; |
470 state_ = CS_CONNECTED; | 472 state_ = CS_CONNECTED; |
471 server_->AddConnection(remote_addr_, local_addr_, this); | 473 server_->AddConnection(remote_addr_, local_addr_, this); |
472 if (async_ && notify) { | 474 if (async_ && notify) { |
473 SignalConnectEvent(this); | 475 SignalConnectEvent(this); |
474 } | 476 } |
475 } | 477 } |
476 | 478 |
477 int VirtualSocket::SendUdp(const void* pv, | 479 int VirtualSocket::SendUdp(const void* pv, |
478 size_t cb, | 480 size_t cb, |
479 const SocketAddress& addr) { | 481 const SocketAddress& addr) { |
480 // If we have not been assigned a local port, then get one. | 482 // If we have not been assigned a local port, then get one. |
481 if (local_addr_.IsNil()) { | 483 if (local_addr_.IsNil()) { |
482 local_addr_ = EmptySocketAddressWithFamily(addr.ipaddr().family()); | 484 local_addr_ = EmptySocketAddressWithFamily(addr.ipaddr().family()); |
483 int result = server_->Bind(this, &local_addr_); | 485 int result = server_->Bind(this, &local_addr_); |
484 if (result != 0) { | 486 if (result != 0) { |
485 local_addr_.Clear(); | 487 local_addr_.Clear(); |
486 error_ = EADDRINUSE; | 488 error_ = EADDRINUSE; |
487 return result; | 489 return result; |
488 } | 490 } |
489 } | 491 } |
490 | 492 |
491 // Send the data in a message to the appropriate socket. | 493 // Send the data in a message to the appropriate socket. |
492 return server_->SendUdp(this, static_cast<const char*>(pv), cb, addr); | 494 return server_->SendUdp(this, static_cast<const char*>(pv), cb, addr); |
493 } | 495 } |
494 | 496 |
495 int VirtualSocket::SendTcp(const void* pv, size_t cb) { | 497 int VirtualSocket::SendTcp(const void* pv, size_t cb) { |
496 size_t capacity = server_->send_buffer_capacity_ - send_buffer_.size(); | 498 size_t capacity = server_->send_buffer_capacity_ - send_buffer_.size(); |
497 if (0 == capacity) { | 499 if (0 == capacity) { |
498 write_enabled_ = true; | 500 ready_to_send_ = false; |
499 error_ = EWOULDBLOCK; | 501 error_ = EWOULDBLOCK; |
500 return -1; | 502 return -1; |
501 } | 503 } |
502 size_t consumed = std::min(cb, capacity); | 504 size_t consumed = std::min(cb, capacity); |
503 const char* cpv = static_cast<const char*>(pv); | 505 const char* cpv = static_cast<const char*>(pv); |
504 send_buffer_.insert(send_buffer_.end(), cpv, cpv + consumed); | 506 send_buffer_.insert(send_buffer_.end(), cpv, cpv + consumed); |
505 server_->SendTcp(this); | 507 server_->SendTcp(this); |
506 return static_cast<int>(consumed); | 508 return static_cast<int>(consumed); |
507 } | 509 } |
508 | 510 |
511 void VirtualSocket::OnSocketServerReadyToSend() { | |
512 if (ready_to_send_) { | |
513 // This socket didn't encounter EWOULDBLOCK, so there's nothing to do. | |
514 return; | |
515 } | |
516 if (type_ == SOCK_DGRAM) { | |
517 ready_to_send_ = true; | |
518 SignalWriteEvent(this); | |
519 } else { | |
520 RTC_DCHECK(type_ == SOCK_STREAM); | |
521 // This will attempt to empty the full send buffer, and will fire | |
522 // SignalWriteEvent if successful. | |
523 server_->SendTcp(this); | |
524 } | |
525 } | |
526 | |
509 VirtualSocketServer::VirtualSocketServer(SocketServer* ss) | 527 VirtualSocketServer::VirtualSocketServer(SocketServer* ss) |
510 : server_(ss), | 528 : server_(ss), |
511 server_owned_(false), | 529 server_owned_(false), |
512 msg_queue_(NULL), | 530 msg_queue_(NULL), |
513 stop_on_idle_(false), | 531 stop_on_idle_(false), |
514 network_delay_(0), | 532 network_delay_(0), |
515 next_ipv4_(kInitialNextIPv4), | 533 next_ipv4_(kInitialNextIPv4), |
516 next_ipv6_(kInitialNextIPv6), | 534 next_ipv6_(kInitialNextIPv6), |
517 next_port_(kFirstEphemeralPort), | 535 next_port_(kFirstEphemeralPort), |
518 bindings_(new AddressMap()), | 536 bindings_(new AddressMap()), |
(...skipping 41 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
560 uint16_t VirtualSocketServer::GetNextPort() { | 578 uint16_t VirtualSocketServer::GetNextPort() { |
561 uint16_t port = next_port_; | 579 uint16_t port = next_port_; |
562 if (next_port_ < kLastEphemeralPort) { | 580 if (next_port_ < kLastEphemeralPort) { |
563 ++next_port_; | 581 ++next_port_; |
564 } else { | 582 } else { |
565 next_port_ = kFirstEphemeralPort; | 583 next_port_ = kFirstEphemeralPort; |
566 } | 584 } |
567 return port; | 585 return port; |
568 } | 586 } |
569 | 587 |
588 void VirtualSocketServer::SetSendingBlocked(bool blocked) { | |
589 if (blocked == sending_blocked_) { | |
590 // Unchanged; nothing to do. | |
591 return; | |
592 } | |
593 sending_blocked_ = blocked; | |
594 if (!sending_blocked_) { | |
595 // Sending was blocked, but is now unblocked. This signal gives sockets a | |
596 // chance to fire SignalWriteEvent, and for TCP, send buffered data. | |
597 SignalReadyToSend(); | |
598 } | |
599 } | |
600 | |
570 Socket* VirtualSocketServer::CreateSocket(int type) { | 601 Socket* VirtualSocketServer::CreateSocket(int type) { |
571 return CreateSocket(AF_INET, type); | 602 return CreateSocket(AF_INET, type); |
572 } | 603 } |
573 | 604 |
574 Socket* VirtualSocketServer::CreateSocket(int family, int type) { | 605 Socket* VirtualSocketServer::CreateSocket(int family, int type) { |
575 return CreateSocketInternal(family, type); | 606 return CreateSocketInternal(family, type); |
576 } | 607 } |
577 | 608 |
578 AsyncSocket* VirtualSocketServer::CreateAsyncSocket(int type) { | 609 AsyncSocket* VirtualSocketServer::CreateAsyncSocket(int type) { |
579 return CreateAsyncSocket(AF_INET, type); | 610 return CreateAsyncSocket(AF_INET, type); |
(...skipping 11 matching lines...) Expand all Loading... | |
591 | 622 |
592 void VirtualSocketServer::SetMessageQueue(MessageQueue* msg_queue) { | 623 void VirtualSocketServer::SetMessageQueue(MessageQueue* msg_queue) { |
593 msg_queue_ = msg_queue; | 624 msg_queue_ = msg_queue; |
594 if (msg_queue_) { | 625 if (msg_queue_) { |
595 msg_queue_->SignalQueueDestroyed.connect(this, | 626 msg_queue_->SignalQueueDestroyed.connect(this, |
596 &VirtualSocketServer::OnMessageQueueDestroyed); | 627 &VirtualSocketServer::OnMessageQueueDestroyed); |
597 } | 628 } |
598 } | 629 } |
599 | 630 |
600 bool VirtualSocketServer::Wait(int cmsWait, bool process_io) { | 631 bool VirtualSocketServer::Wait(int cmsWait, bool process_io) { |
601 ASSERT(msg_queue_ == Thread::Current()); | 632 RTC_DCHECK(msg_queue_ == Thread::Current()); |
602 if (stop_on_idle_ && Thread::Current()->empty()) { | 633 if (stop_on_idle_ && Thread::Current()->empty()) { |
603 return false; | 634 return false; |
604 } | 635 } |
605 return socketserver()->Wait(cmsWait, process_io); | 636 return socketserver()->Wait(cmsWait, process_io); |
606 } | 637 } |
607 | 638 |
608 void VirtualSocketServer::WakeUp() { | 639 void VirtualSocketServer::WakeUp() { |
609 socketserver()->WakeUp(); | 640 socketserver()->WakeUp(); |
610 } | 641 } |
611 | 642 |
612 bool VirtualSocketServer::ProcessMessagesUntilIdle() { | 643 bool VirtualSocketServer::ProcessMessagesUntilIdle() { |
613 ASSERT(msg_queue_ == Thread::Current()); | 644 RTC_DCHECK(msg_queue_ == Thread::Current()); |
614 stop_on_idle_ = true; | 645 stop_on_idle_ = true; |
615 while (!msg_queue_->empty()) { | 646 while (!msg_queue_->empty()) { |
616 Message msg; | 647 Message msg; |
617 if (msg_queue_->Get(&msg, Thread::kForever)) { | 648 if (msg_queue_->Get(&msg, Thread::kForever)) { |
618 msg_queue_->Dispatch(&msg); | 649 msg_queue_->Dispatch(&msg); |
619 } | 650 } |
620 } | 651 } |
621 stop_on_idle_ = false; | 652 stop_on_idle_ = false; |
622 return !msg_queue_->IsQuitting(); | 653 return !msg_queue_->IsQuitting(); |
623 } | 654 } |
(...skipping 13 matching lines...) Expand all Loading... | |
637 socket->SignalCloseEvent(socket, 0); | 668 socket->SignalCloseEvent(socket, 0); |
638 | 669 |
639 // Trigger the remote connection's close event. | 670 // Trigger the remote connection's close event. |
640 socket->Close(); | 671 socket->Close(); |
641 | 672 |
642 return true; | 673 return true; |
643 } | 674 } |
644 | 675 |
645 int VirtualSocketServer::Bind(VirtualSocket* socket, | 676 int VirtualSocketServer::Bind(VirtualSocket* socket, |
646 const SocketAddress& addr) { | 677 const SocketAddress& addr) { |
647 ASSERT(NULL != socket); | 678 RTC_DCHECK(NULL != socket); |
648 // Address must be completely specified at this point | 679 // Address must be completely specified at this point |
649 ASSERT(!IPIsUnspec(addr.ipaddr())); | 680 RTC_DCHECK(!IPIsUnspec(addr.ipaddr())); |
650 ASSERT(addr.port() != 0); | 681 RTC_DCHECK(addr.port() != 0); |
651 | 682 |
652 // Normalize the address (turns v6-mapped addresses into v4-addresses). | 683 // Normalize the address (turns v6-mapped addresses into v4-addresses). |
653 SocketAddress normalized(addr.ipaddr().Normalized(), addr.port()); | 684 SocketAddress normalized(addr.ipaddr().Normalized(), addr.port()); |
654 | 685 |
655 AddressMap::value_type entry(normalized, socket); | 686 AddressMap::value_type entry(normalized, socket); |
656 return bindings_->insert(entry).second ? 0 : -1; | 687 return bindings_->insert(entry).second ? 0 : -1; |
657 } | 688 } |
658 | 689 |
659 int VirtualSocketServer::Bind(VirtualSocket* socket, SocketAddress* addr) { | 690 int VirtualSocketServer::Bind(VirtualSocket* socket, SocketAddress* addr) { |
660 ASSERT(NULL != socket); | 691 RTC_DCHECK(NULL != socket); |
661 | 692 |
662 if (!IPIsUnspec(addr->ipaddr())) { | 693 if (!IPIsUnspec(addr->ipaddr())) { |
663 addr->SetIP(addr->ipaddr().Normalized()); | 694 addr->SetIP(addr->ipaddr().Normalized()); |
664 } else { | 695 } else { |
665 ASSERT(false); | 696 RTC_DCHECK(false); |
666 } | 697 } |
667 | 698 |
668 if (addr->port() == 0) { | 699 if (addr->port() == 0) { |
669 for (int i = 0; i < kEphemeralPortCount; ++i) { | 700 for (int i = 0; i < kEphemeralPortCount; ++i) { |
670 addr->SetPort(GetNextPort()); | 701 addr->SetPort(GetNextPort()); |
671 if (bindings_->find(*addr) == bindings_->end()) { | 702 if (bindings_->find(*addr) == bindings_->end()) { |
672 break; | 703 break; |
673 } | 704 } |
674 } | 705 } |
675 } | 706 } |
(...skipping 20 matching lines...) Expand all Loading... | |
696 return LookupBinding(sock_addr); | 727 return LookupBinding(sock_addr); |
697 } | 728 } |
698 | 729 |
699 return nullptr; | 730 return nullptr; |
700 } | 731 } |
701 | 732 |
702 int VirtualSocketServer::Unbind(const SocketAddress& addr, | 733 int VirtualSocketServer::Unbind(const SocketAddress& addr, |
703 VirtualSocket* socket) { | 734 VirtualSocket* socket) { |
704 SocketAddress normalized(addr.ipaddr().Normalized(), | 735 SocketAddress normalized(addr.ipaddr().Normalized(), |
705 addr.port()); | 736 addr.port()); |
706 ASSERT((*bindings_)[normalized] == socket); | 737 RTC_DCHECK((*bindings_)[normalized] == socket); |
707 bindings_->erase(bindings_->find(normalized)); | 738 bindings_->erase(bindings_->find(normalized)); |
708 return 0; | 739 return 0; |
709 } | 740 } |
710 | 741 |
711 void VirtualSocketServer::AddConnection(const SocketAddress& local, | 742 void VirtualSocketServer::AddConnection(const SocketAddress& local, |
712 const SocketAddress& remote, | 743 const SocketAddress& remote, |
713 VirtualSocket* remote_socket) { | 744 VirtualSocket* remote_socket) { |
714 // Add this socket pair to our routing table. This will allow | 745 // Add this socket pair to our routing table. This will allow |
715 // multiple clients to connect to the same server address. | 746 // multiple clients to connect to the same server address. |
716 SocketAddress local_normalized(local.ipaddr().Normalized(), | 747 SocketAddress local_normalized(local.ipaddr().Normalized(), |
(...skipping 60 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
777 // Remove the mapping. | 808 // Remove the mapping. |
778 msg_queue_->PostDelayed(RTC_FROM_HERE, delay, socket, MSG_ID_DISCONNECT); | 809 msg_queue_->PostDelayed(RTC_FROM_HERE, delay, socket, MSG_ID_DISCONNECT); |
779 return true; | 810 return true; |
780 } | 811 } |
781 return false; | 812 return false; |
782 } | 813 } |
783 | 814 |
784 int VirtualSocketServer::SendUdp(VirtualSocket* socket, | 815 int VirtualSocketServer::SendUdp(VirtualSocket* socket, |
785 const char* data, size_t data_size, | 816 const char* data, size_t data_size, |
786 const SocketAddress& remote_addr) { | 817 const SocketAddress& remote_addr) { |
818 if (sending_blocked_) { | |
819 CritScope cs(&socket->crit_); | |
820 socket->ready_to_send_ = false; | |
821 socket->error_ = EWOULDBLOCK; | |
822 return -1; | |
823 } | |
824 | |
787 // See if we want to drop this packet. | 825 // See if we want to drop this packet. |
788 if (Random() < drop_prob_) { | 826 if (Random() < drop_prob_) { |
789 LOG(LS_VERBOSE) << "Dropping packet: bad luck"; | 827 LOG(LS_VERBOSE) << "Dropping packet: bad luck"; |
790 return static_cast<int>(data_size); | 828 return static_cast<int>(data_size); |
791 } | 829 } |
792 | 830 |
793 VirtualSocket* recipient = LookupBinding(remote_addr); | 831 VirtualSocket* recipient = LookupBinding(remote_addr); |
794 if (!recipient) { | 832 if (!recipient) { |
795 // Make a fake recipient for address family checking. | 833 // Make a fake recipient for address family checking. |
796 std::unique_ptr<VirtualSocket> dummy_socket( | 834 std::unique_ptr<VirtualSocket> dummy_socket( |
797 CreateSocketInternal(AF_INET, SOCK_DGRAM)); | 835 CreateSocketInternal(AF_INET, SOCK_DGRAM)); |
798 dummy_socket->SetLocalAddress(remote_addr); | 836 dummy_socket->SetLocalAddress(remote_addr); |
799 if (!CanInteractWith(socket, dummy_socket.get())) { | 837 if (!CanInteractWith(socket, dummy_socket.get())) { |
800 LOG(LS_VERBOSE) << "Incompatible address families: " | 838 LOG(LS_VERBOSE) << "Incompatible address families: " |
801 << socket->GetLocalAddress() << " and " << remote_addr; | 839 << socket->GetLocalAddress() << " and " << remote_addr; |
802 return -1; | 840 return -1; |
803 } | 841 } |
804 LOG(LS_VERBOSE) << "No one listening at " << remote_addr; | 842 LOG(LS_VERBOSE) << "No one listening at " << remote_addr; |
805 return static_cast<int>(data_size); | 843 return static_cast<int>(data_size); |
806 } | 844 } |
807 | 845 |
808 if (!CanInteractWith(socket, recipient)) { | 846 if (!CanInteractWith(socket, recipient)) { |
809 LOG(LS_VERBOSE) << "Incompatible address families: " | 847 LOG(LS_VERBOSE) << "Incompatible address families: " |
810 << socket->GetLocalAddress() << " and " << remote_addr; | 848 << socket->GetLocalAddress() << " and " << remote_addr; |
811 return -1; | 849 return -1; |
812 } | 850 } |
813 | 851 |
814 CritScope cs(&socket->crit_); | 852 { |
853 CritScope cs(&socket->crit_); | |
815 | 854 |
816 int64_t cur_time = TimeMillis(); | 855 int64_t cur_time = TimeMillis(); |
817 PurgeNetworkPackets(socket, cur_time); | 856 PurgeNetworkPackets(socket, cur_time); |
818 | 857 |
819 // Determine whether we have enough bandwidth to accept this packet. To do | 858 // Determine whether we have enough bandwidth to accept this packet. To do |
820 // this, we need to update the send queue. Once we know it's current size, | 859 // this, we need to update the send queue. Once we know it's current size, |
821 // we know whether we can fit this packet. | 860 // we know whether we can fit this packet. |
822 // | 861 // |
823 // NOTE: There are better algorithms for maintaining such a queue (such as | 862 // NOTE: There are better algorithms for maintaining such a queue (such as |
824 // "Derivative Random Drop"); however, this algorithm is a more accurate | 863 // "Derivative Random Drop"); however, this algorithm is a more accurate |
825 // simulation of what a normal network would do. | 864 // simulation of what a normal network would do. |
826 | 865 |
827 size_t packet_size = data_size + UDP_HEADER_SIZE; | 866 size_t packet_size = data_size + UDP_HEADER_SIZE; |
828 if (socket->network_size_ + packet_size > network_capacity_) { | 867 if (socket->network_size_ + packet_size > network_capacity_) { |
829 LOG(LS_VERBOSE) << "Dropping packet: network capacity exceeded"; | 868 LOG(LS_VERBOSE) << "Dropping packet: network capacity exceeded"; |
869 return static_cast<int>(data_size); | |
870 } | |
871 | |
872 AddPacketToNetwork(socket, recipient, cur_time, data, data_size, | |
873 UDP_HEADER_SIZE, false); | |
874 | |
830 return static_cast<int>(data_size); | 875 return static_cast<int>(data_size); |
831 } | 876 } |
832 | |
833 AddPacketToNetwork(socket, recipient, cur_time, data, data_size, | |
834 UDP_HEADER_SIZE, false); | |
835 | |
836 return static_cast<int>(data_size); | |
837 } | 877 } |
838 | 878 |
839 void VirtualSocketServer::SendTcp(VirtualSocket* socket) { | 879 void VirtualSocketServer::SendTcp(VirtualSocket* socket) { |
880 if (sending_blocked_) { | |
881 // Eventually the socket's buffer will fill and VirtualSocket::SendTcp will | |
882 // set EWOULDBLOCK. | |
883 return; | |
884 } | |
885 | |
840 // TCP can't send more data than will fill up the receiver's buffer. | 886 // TCP can't send more data than will fill up the receiver's buffer. |
841 // We track the data that is in the buffer plus data in flight using the | 887 // We track the data that is in the buffer plus data in flight using the |
842 // recipient's recv_buffer_size_. Anything beyond that must be stored in the | 888 // recipient's recv_buffer_size_. Anything beyond that must be stored in the |
843 // sender's buffer. We will trigger the buffered data to be sent when data | 889 // sender's buffer. We will trigger the buffered data to be sent when data |
844 // is read from the recv_buffer. | 890 // is read from the recv_buffer. |
845 | 891 |
846 // Lookup the local/remote pair in the connections table. | 892 // Lookup the local/remote pair in the connections table. |
847 VirtualSocket* recipient = LookupConnection(socket->local_addr_, | 893 VirtualSocket* recipient = LookupConnection(socket->local_addr_, |
848 socket->remote_addr_); | 894 socket->remote_addr_); |
849 if (!recipient) { | 895 if (!recipient) { |
(...skipping 22 matching lines...) Expand all Loading... | |
872 // Avoid undefined access beyond the last element of the vector. | 918 // Avoid undefined access beyond the last element of the vector. |
873 // This only happens when new_buffer_size is 0. | 919 // This only happens when new_buffer_size is 0. |
874 if (data_size < socket->send_buffer_.size()) { | 920 if (data_size < socket->send_buffer_.size()) { |
875 // memmove is required for potentially overlapping source/destination. | 921 // memmove is required for potentially overlapping source/destination. |
876 memmove(&socket->send_buffer_[0], &socket->send_buffer_[data_size], | 922 memmove(&socket->send_buffer_[0], &socket->send_buffer_[data_size], |
877 new_buffer_size); | 923 new_buffer_size); |
878 } | 924 } |
879 socket->send_buffer_.resize(new_buffer_size); | 925 socket->send_buffer_.resize(new_buffer_size); |
880 } | 926 } |
881 | 927 |
882 if (socket->write_enabled_ | 928 if (!socket->ready_to_send_ && |
883 && (socket->send_buffer_.size() < send_buffer_capacity_)) { | 929 (socket->send_buffer_.size() < send_buffer_capacity_)) { |
884 socket->write_enabled_ = false; | 930 socket->ready_to_send_ = true; |
885 socket->SignalWriteEvent(socket); | 931 socket->SignalWriteEvent(socket); |
886 } | 932 } |
887 } | 933 } |
888 | 934 |
889 void VirtualSocketServer::AddPacketToNetwork(VirtualSocket* sender, | 935 void VirtualSocketServer::AddPacketToNetwork(VirtualSocket* sender, |
890 VirtualSocket* recipient, | 936 VirtualSocket* recipient, |
891 int64_t cur_time, | 937 int64_t cur_time, |
892 const char* data, | 938 const char* data, |
893 size_t data_size, | 939 size_t data_size, |
894 size_t header_size, | 940 size_t header_size, |
(...skipping 29 matching lines...) Expand all Loading... | |
924 ts = std::max(ts, network_delay_); | 970 ts = std::max(ts, network_delay_); |
925 } | 971 } |
926 msg_queue_->PostAt(RTC_FROM_HERE, ts, recipient, MSG_ID_PACKET, p); | 972 msg_queue_->PostAt(RTC_FROM_HERE, ts, recipient, MSG_ID_PACKET, p); |
927 network_delay_ = std::max(ts, network_delay_); | 973 network_delay_ = std::max(ts, network_delay_); |
928 } | 974 } |
929 | 975 |
930 void VirtualSocketServer::PurgeNetworkPackets(VirtualSocket* socket, | 976 void VirtualSocketServer::PurgeNetworkPackets(VirtualSocket* socket, |
931 int64_t cur_time) { | 977 int64_t cur_time) { |
932 while (!socket->network_.empty() && | 978 while (!socket->network_.empty() && |
933 (socket->network_.front().done_time <= cur_time)) { | 979 (socket->network_.front().done_time <= cur_time)) { |
934 ASSERT(socket->network_size_ >= socket->network_.front().size); | 980 RTC_DCHECK(socket->network_size_ >= socket->network_.front().size); |
935 socket->network_size_ -= socket->network_.front().size; | 981 socket->network_size_ -= socket->network_.front().size; |
936 socket->network_.pop_front(); | 982 socket->network_.pop_front(); |
937 } | 983 } |
938 } | 984 } |
939 | 985 |
940 uint32_t VirtualSocketServer::SendDelay(uint32_t size) { | 986 uint32_t VirtualSocketServer::SendDelay(uint32_t size) { |
941 if (bandwidth_ == 0) | 987 if (bandwidth_ == 0) |
942 return 0; | 988 return 0; |
943 else | 989 else |
944 return 1000 * size / bandwidth_; | 990 return 1000 * size / bandwidth_; |
(...skipping 84 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
1029 } | 1075 } |
1030 bool operator()(double v1, const VirtualSocketServer::Point& p2) { | 1076 bool operator()(double v1, const VirtualSocketServer::Point& p2) { |
1031 return v1 < p2.first; | 1077 return v1 < p2.first; |
1032 } | 1078 } |
1033 bool operator()(const VirtualSocketServer::Point& p1, double v2) { | 1079 bool operator()(const VirtualSocketServer::Point& p1, double v2) { |
1034 return p1.first < v2; | 1080 return p1.first < v2; |
1035 } | 1081 } |
1036 }; | 1082 }; |
1037 | 1083 |
1038 VirtualSocketServer::Function* VirtualSocketServer::Accumulate(Function* f) { | 1084 VirtualSocketServer::Function* VirtualSocketServer::Accumulate(Function* f) { |
1039 ASSERT(f->size() >= 1); | 1085 RTC_DCHECK(f->size() >= 1); |
1040 double v = 0; | 1086 double v = 0; |
1041 for (Function::size_type i = 0; i < f->size() - 1; ++i) { | 1087 for (Function::size_type i = 0; i < f->size() - 1; ++i) { |
1042 double dx = (*f)[i + 1].first - (*f)[i].first; | 1088 double dx = (*f)[i + 1].first - (*f)[i].first; |
1043 double avgy = ((*f)[i + 1].second + (*f)[i].second) / 2; | 1089 double avgy = ((*f)[i + 1].second + (*f)[i].second) / 2; |
1044 (*f)[i].second = v; | 1090 (*f)[i].second = v; |
1045 v = v + dx * avgy; | 1091 v = v + dx * avgy; |
1046 } | 1092 } |
1047 (*f)[f->size()-1].second = v; | 1093 (*f)[f->size()-1].second = v; |
1048 return f; | 1094 return f; |
1049 } | 1095 } |
(...skipping 21 matching lines...) Expand all Loading... | |
1071 delete f; | 1117 delete f; |
1072 return g; | 1118 return g; |
1073 } | 1119 } |
1074 | 1120 |
1075 double VirtualSocketServer::Evaluate(Function* f, double x) { | 1121 double VirtualSocketServer::Evaluate(Function* f, double x) { |
1076 Function::iterator iter = | 1122 Function::iterator iter = |
1077 std::lower_bound(f->begin(), f->end(), x, FunctionDomainCmp()); | 1123 std::lower_bound(f->begin(), f->end(), x, FunctionDomainCmp()); |
1078 if (iter == f->begin()) { | 1124 if (iter == f->begin()) { |
1079 return (*f)[0].second; | 1125 return (*f)[0].second; |
1080 } else if (iter == f->end()) { | 1126 } else if (iter == f->end()) { |
1081 ASSERT(f->size() >= 1); | 1127 RTC_DCHECK(f->size() >= 1); |
1082 return (*f)[f->size() - 1].second; | 1128 return (*f)[f->size() - 1].second; |
1083 } else if (iter->first == x) { | 1129 } else if (iter->first == x) { |
1084 return iter->second; | 1130 return iter->second; |
1085 } else { | 1131 } else { |
1086 double x1 = (iter - 1)->first; | 1132 double x1 = (iter - 1)->first; |
1087 double y1 = (iter - 1)->second; | 1133 double y1 = (iter - 1)->second; |
1088 double x2 = iter->first; | 1134 double x2 = iter->first; |
1089 double y2 = iter->second; | 1135 double y2 = iter->second; |
1090 return y1 + (y2 - y1) * (x - x1) / (x2 - x1); | 1136 return y1 + (y2 - y1) * (x - x1) / (x2 - x1); |
1091 } | 1137 } |
(...skipping 52 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
1144 void VirtualSocketServer::SetDefaultRoute(const IPAddress& from_addr) { | 1190 void VirtualSocketServer::SetDefaultRoute(const IPAddress& from_addr) { |
1145 RTC_DCHECK(!IPIsAny(from_addr)); | 1191 RTC_DCHECK(!IPIsAny(from_addr)); |
1146 if (from_addr.family() == AF_INET) { | 1192 if (from_addr.family() == AF_INET) { |
1147 default_route_v4_ = from_addr; | 1193 default_route_v4_ = from_addr; |
1148 } else if (from_addr.family() == AF_INET6) { | 1194 } else if (from_addr.family() == AF_INET6) { |
1149 default_route_v6_ = from_addr; | 1195 default_route_v6_ = from_addr; |
1150 } | 1196 } |
1151 } | 1197 } |
1152 | 1198 |
1153 } // namespace rtc | 1199 } // namespace rtc |
OLD | NEW |