OLD | NEW |
1 /* | 1 /* |
2 * Copyright 2004 The WebRTC Project Authors. All rights reserved. | 2 * Copyright 2004 The WebRTC Project Authors. All rights reserved. |
3 * | 3 * |
4 * Use of this source code is governed by a BSD-style license | 4 * Use of this source code is governed by a BSD-style license |
5 * that can be found in the LICENSE file in the root of the source | 5 * that can be found in the LICENSE file in the root of the source |
6 * tree. An additional intellectual property rights grant can be found | 6 * tree. An additional intellectual property rights grant can be found |
7 * in the file PATENTS. All contributing project authors may | 7 * in the file PATENTS. All contributing project authors may |
8 * be found in the AUTHORS file in the root of the source tree. | 8 * be found in the AUTHORS file in the root of the source tree. |
9 */ | 9 */ |
10 | 10 |
(...skipping 47 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
58 MSG_ID_CONNECT, | 58 MSG_ID_CONNECT, |
59 MSG_ID_DISCONNECT, | 59 MSG_ID_DISCONNECT, |
60 }; | 60 }; |
61 | 61 |
62 // Packets are passed between sockets as messages. We copy the data just like | 62 // Packets are passed between sockets as messages. We copy the data just like |
63 // the kernel does. | 63 // the kernel does. |
64 class Packet : public MessageData { | 64 class Packet : public MessageData { |
65 public: | 65 public: |
66 Packet(const char* data, size_t size, const SocketAddress& from) | 66 Packet(const char* data, size_t size, const SocketAddress& from) |
67 : size_(size), consumed_(0), from_(from) { | 67 : size_(size), consumed_(0), from_(from) { |
68 ASSERT(NULL != data); | 68 RTC_DCHECK(NULL != data); |
69 data_ = new char[size_]; | 69 data_ = new char[size_]; |
70 memcpy(data_, data, size_); | 70 memcpy(data_, data, size_); |
71 } | 71 } |
72 | 72 |
73 ~Packet() override { | 73 ~Packet() override { |
74 delete[] data_; | 74 delete[] data_; |
75 } | 75 } |
76 | 76 |
77 const char* data() const { return data_ + consumed_; } | 77 const char* data() const { return data_ + consumed_; } |
78 size_t size() const { return size_ - consumed_; } | 78 size_t size() const { return size_ - consumed_; } |
79 const SocketAddress& from() const { return from_; } | 79 const SocketAddress& from() const { return from_; } |
80 | 80 |
81 // Remove the first size bytes from the data. | 81 // Remove the first size bytes from the data. |
82 void Consume(size_t size) { | 82 void Consume(size_t size) { |
83 ASSERT(size + consumed_ < size_); | 83 RTC_DCHECK(size + consumed_ < size_); |
84 consumed_ += size; | 84 consumed_ += size; |
85 } | 85 } |
86 | 86 |
87 private: | 87 private: |
88 char* data_; | 88 char* data_; |
89 size_t size_, consumed_; | 89 size_t size_, consumed_; |
90 SocketAddress from_; | 90 SocketAddress from_; |
91 }; | 91 }; |
92 | 92 |
93 struct MessageAddress : public MessageData { | 93 struct MessageAddress : public MessageData { |
94 explicit MessageAddress(const SocketAddress& a) : addr(a) { } | 94 explicit MessageAddress(const SocketAddress& a) : addr(a) { } |
95 SocketAddress addr; | 95 SocketAddress addr; |
96 }; | 96 }; |
97 | 97 |
98 VirtualSocket::VirtualSocket(VirtualSocketServer* server, | 98 VirtualSocket::VirtualSocket(VirtualSocketServer* server, |
99 int family, | 99 int family, |
100 int type, | 100 int type, |
101 bool async) | 101 bool async) |
102 : server_(server), | 102 : server_(server), |
103 type_(type), | 103 type_(type), |
104 async_(async), | 104 async_(async), |
105 state_(CS_CLOSED), | 105 state_(CS_CLOSED), |
106 error_(0), | 106 error_(0), |
107 listen_queue_(NULL), | 107 listen_queue_(NULL), |
108 write_enabled_(false), | |
109 network_size_(0), | 108 network_size_(0), |
110 recv_buffer_size_(0), | 109 recv_buffer_size_(0), |
111 bound_(false), | 110 bound_(false), |
112 was_any_(false) { | 111 was_any_(false) { |
113 ASSERT((type_ == SOCK_DGRAM) || (type_ == SOCK_STREAM)); | 112 RTC_DCHECK((type_ == SOCK_DGRAM) || (type_ == SOCK_STREAM)); |
114 ASSERT(async_ || (type_ != SOCK_STREAM)); // We only support async streams | 113 RTC_DCHECK(async_ || |
| 114 (type_ != SOCK_STREAM)); // We only support async streams |
| 115 server->SignalReadyToSend.connect(this, |
| 116 &VirtualSocket::OnSocketServerReadyToSend); |
115 } | 117 } |
116 | 118 |
117 VirtualSocket::~VirtualSocket() { | 119 VirtualSocket::~VirtualSocket() { |
118 Close(); | 120 Close(); |
119 | 121 |
120 for (RecvBuffer::iterator it = recv_buffer_.begin(); it != recv_buffer_.end(); | 122 for (RecvBuffer::iterator it = recv_buffer_.begin(); it != recv_buffer_.end(); |
121 ++it) { | 123 ++it) { |
122 delete *it; | 124 delete *it; |
123 } | 125 } |
124 } | 126 } |
(...skipping 77 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
202 // Remove mapping for both directions. | 204 // Remove mapping for both directions. |
203 server_->RemoveConnection(remote_addr_, local_addr_); | 205 server_->RemoveConnection(remote_addr_, local_addr_); |
204 server_->RemoveConnection(local_addr_, remote_addr_); | 206 server_->RemoveConnection(local_addr_, remote_addr_); |
205 } | 207 } |
206 // Cancel potential connects | 208 // Cancel potential connects |
207 MessageList msgs; | 209 MessageList msgs; |
208 if (server_->msg_queue_) { | 210 if (server_->msg_queue_) { |
209 server_->msg_queue_->Clear(this, MSG_ID_CONNECT, &msgs); | 211 server_->msg_queue_->Clear(this, MSG_ID_CONNECT, &msgs); |
210 } | 212 } |
211 for (MessageList::iterator it = msgs.begin(); it != msgs.end(); ++it) { | 213 for (MessageList::iterator it = msgs.begin(); it != msgs.end(); ++it) { |
212 ASSERT(NULL != it->pdata); | 214 RTC_DCHECK(NULL != it->pdata); |
213 MessageAddress* data = static_cast<MessageAddress*>(it->pdata); | 215 MessageAddress* data = static_cast<MessageAddress*>(it->pdata); |
214 | 216 |
215 // Lookup remote side. | 217 // Lookup remote side. |
216 VirtualSocket* socket = | 218 VirtualSocket* socket = |
217 server_->LookupConnection(local_addr_, data->addr); | 219 server_->LookupConnection(local_addr_, data->addr); |
218 if (socket) { | 220 if (socket) { |
219 // Server socket, remote side is a socket retreived by | 221 // Server socket, remote side is a socket retreived by |
220 // accept. Accepted sockets are not bound so we will not | 222 // accept. Accepted sockets are not bound so we will not |
221 // find it by looking in the bindings table. | 223 // find it by looking in the bindings table. |
222 server_->Disconnect(socket); | 224 server_->Disconnect(socket); |
(...skipping 77 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
300 } else { | 302 } else { |
301 recv_buffer_.pop_front(); | 303 recv_buffer_.pop_front(); |
302 delete packet; | 304 delete packet; |
303 } | 305 } |
304 | 306 |
305 if (SOCK_STREAM == type_) { | 307 if (SOCK_STREAM == type_) { |
306 bool was_full = (recv_buffer_size_ == server_->recv_buffer_capacity_); | 308 bool was_full = (recv_buffer_size_ == server_->recv_buffer_capacity_); |
307 recv_buffer_size_ -= data_read; | 309 recv_buffer_size_ -= data_read; |
308 if (was_full) { | 310 if (was_full) { |
309 VirtualSocket* sender = server_->LookupBinding(remote_addr_); | 311 VirtualSocket* sender = server_->LookupBinding(remote_addr_); |
310 ASSERT(NULL != sender); | 312 RTC_DCHECK(NULL != sender); |
311 server_->SendTcp(sender); | 313 server_->SendTcp(sender); |
312 } | 314 } |
313 } | 315 } |
314 | 316 |
315 return static_cast<int>(data_read); | 317 return static_cast<int>(data_read); |
316 } | 318 } |
317 | 319 |
318 int VirtualSocket::Listen(int backlog) { | 320 int VirtualSocket::Listen(int backlog) { |
319 ASSERT(SOCK_STREAM == type_); | 321 RTC_DCHECK(SOCK_STREAM == type_); |
320 ASSERT(CS_CLOSED == state_); | 322 RTC_DCHECK(CS_CLOSED == state_); |
321 if (local_addr_.IsNil()) { | 323 if (local_addr_.IsNil()) { |
322 error_ = EINVAL; | 324 error_ = EINVAL; |
323 return -1; | 325 return -1; |
324 } | 326 } |
325 ASSERT(NULL == listen_queue_); | 327 RTC_DCHECK(NULL == listen_queue_); |
326 listen_queue_ = new ListenQueue; | 328 listen_queue_ = new ListenQueue; |
327 state_ = CS_CONNECTING; | 329 state_ = CS_CONNECTING; |
328 return 0; | 330 return 0; |
329 } | 331 } |
330 | 332 |
331 VirtualSocket* VirtualSocket::Accept(SocketAddress* paddr) { | 333 VirtualSocket* VirtualSocket::Accept(SocketAddress* paddr) { |
332 if (NULL == listen_queue_) { | 334 if (NULL == listen_queue_) { |
333 error_ = EINVAL; | 335 error_ = EINVAL; |
334 return NULL; | 336 return NULL; |
335 } | 337 } |
(...skipping 49 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
385 | 387 |
386 int VirtualSocket::EstimateMTU(uint16_t* mtu) { | 388 int VirtualSocket::EstimateMTU(uint16_t* mtu) { |
387 if (CS_CONNECTED != state_) | 389 if (CS_CONNECTED != state_) |
388 return ENOTCONN; | 390 return ENOTCONN; |
389 else | 391 else |
390 return 65536; | 392 return 65536; |
391 } | 393 } |
392 | 394 |
393 void VirtualSocket::OnMessage(Message* pmsg) { | 395 void VirtualSocket::OnMessage(Message* pmsg) { |
394 if (pmsg->message_id == MSG_ID_PACKET) { | 396 if (pmsg->message_id == MSG_ID_PACKET) { |
395 // ASSERT(!local_addr_.IsAnyIP()); | 397 RTC_DCHECK(NULL != pmsg->pdata); |
396 ASSERT(NULL != pmsg->pdata); | |
397 Packet* packet = static_cast<Packet*>(pmsg->pdata); | 398 Packet* packet = static_cast<Packet*>(pmsg->pdata); |
398 | 399 |
399 recv_buffer_.push_back(packet); | 400 recv_buffer_.push_back(packet); |
400 | 401 |
401 if (async_) { | 402 if (async_) { |
402 SignalReadEvent(this); | 403 SignalReadEvent(this); |
403 } | 404 } |
404 } else if (pmsg->message_id == MSG_ID_CONNECT) { | 405 } else if (pmsg->message_id == MSG_ID_CONNECT) { |
405 ASSERT(NULL != pmsg->pdata); | 406 RTC_DCHECK(NULL != pmsg->pdata); |
406 MessageAddress* data = static_cast<MessageAddress*>(pmsg->pdata); | 407 MessageAddress* data = static_cast<MessageAddress*>(pmsg->pdata); |
407 if (listen_queue_ != NULL) { | 408 if (listen_queue_ != NULL) { |
408 listen_queue_->push_back(data->addr); | 409 listen_queue_->push_back(data->addr); |
409 if (async_) { | 410 if (async_) { |
410 SignalReadEvent(this); | 411 SignalReadEvent(this); |
411 } | 412 } |
412 } else if ((SOCK_STREAM == type_) && (CS_CONNECTING == state_)) { | 413 } else if ((SOCK_STREAM == type_) && (CS_CONNECTING == state_)) { |
413 CompleteConnect(data->addr, true); | 414 CompleteConnect(data->addr, true); |
414 } else { | 415 } else { |
415 LOG(LS_VERBOSE) << "Socket at " << local_addr_ << " is not listening"; | 416 LOG(LS_VERBOSE) << "Socket at " << local_addr_ << " is not listening"; |
416 server_->Disconnect(server_->LookupBinding(data->addr)); | 417 server_->Disconnect(server_->LookupBinding(data->addr)); |
417 } | 418 } |
418 delete data; | 419 delete data; |
419 } else if (pmsg->message_id == MSG_ID_DISCONNECT) { | 420 } else if (pmsg->message_id == MSG_ID_DISCONNECT) { |
420 ASSERT(SOCK_STREAM == type_); | 421 RTC_DCHECK(SOCK_STREAM == type_); |
421 if (CS_CLOSED != state_) { | 422 if (CS_CLOSED != state_) { |
422 int error = (CS_CONNECTING == state_) ? ECONNREFUSED : 0; | 423 int error = (CS_CONNECTING == state_) ? ECONNREFUSED : 0; |
423 state_ = CS_CLOSED; | 424 state_ = CS_CLOSED; |
424 remote_addr_.Clear(); | 425 remote_addr_.Clear(); |
425 if (async_) { | 426 if (async_) { |
426 SignalCloseEvent(this, error); | 427 SignalCloseEvent(this, error); |
427 } | 428 } |
428 } | 429 } |
429 } else if (pmsg->message_id == MSG_ID_ADDRESS_BOUND) { | 430 } else if (pmsg->message_id == MSG_ID_ADDRESS_BOUND) { |
430 SignalAddressReady(this, GetLocalAddress()); | 431 SignalAddressReady(this, GetLocalAddress()); |
431 } else { | 432 } else { |
432 ASSERT(false); | 433 RTC_DCHECK(false); |
433 } | 434 } |
434 } | 435 } |
435 | 436 |
436 int VirtualSocket::InitiateConnect(const SocketAddress& addr, bool use_delay) { | 437 int VirtualSocket::InitiateConnect(const SocketAddress& addr, bool use_delay) { |
437 if (!remote_addr_.IsNil()) { | 438 if (!remote_addr_.IsNil()) { |
438 error_ = (CS_CONNECTED == state_) ? EISCONN : EINPROGRESS; | 439 error_ = (CS_CONNECTED == state_) ? EISCONN : EINPROGRESS; |
439 return -1; | 440 return -1; |
440 } | 441 } |
441 if (local_addr_.IsNil()) { | 442 if (local_addr_.IsNil()) { |
442 // If there's no local address set, grab a random one in the correct AF. | 443 // If there's no local address set, grab a random one in the correct AF. |
(...skipping 15 matching lines...) Expand all Loading... |
458 if (result != 0) { | 459 if (result != 0) { |
459 error_ = EHOSTUNREACH; | 460 error_ = EHOSTUNREACH; |
460 return -1; | 461 return -1; |
461 } | 462 } |
462 state_ = CS_CONNECTING; | 463 state_ = CS_CONNECTING; |
463 } | 464 } |
464 return 0; | 465 return 0; |
465 } | 466 } |
466 | 467 |
467 void VirtualSocket::CompleteConnect(const SocketAddress& addr, bool notify) { | 468 void VirtualSocket::CompleteConnect(const SocketAddress& addr, bool notify) { |
468 ASSERT(CS_CONNECTING == state_); | 469 RTC_DCHECK(CS_CONNECTING == state_); |
469 remote_addr_ = addr; | 470 remote_addr_ = addr; |
470 state_ = CS_CONNECTED; | 471 state_ = CS_CONNECTED; |
471 server_->AddConnection(remote_addr_, local_addr_, this); | 472 server_->AddConnection(remote_addr_, local_addr_, this); |
472 if (async_ && notify) { | 473 if (async_ && notify) { |
473 SignalConnectEvent(this); | 474 SignalConnectEvent(this); |
474 } | 475 } |
475 } | 476 } |
476 | 477 |
477 int VirtualSocket::SendUdp(const void* pv, | 478 int VirtualSocket::SendUdp(const void* pv, |
478 size_t cb, | 479 size_t cb, |
479 const SocketAddress& addr) { | 480 const SocketAddress& addr) { |
480 // If we have not been assigned a local port, then get one. | 481 // If we have not been assigned a local port, then get one. |
481 if (local_addr_.IsNil()) { | 482 if (local_addr_.IsNil()) { |
482 local_addr_ = EmptySocketAddressWithFamily(addr.ipaddr().family()); | 483 local_addr_ = EmptySocketAddressWithFamily(addr.ipaddr().family()); |
483 int result = server_->Bind(this, &local_addr_); | 484 int result = server_->Bind(this, &local_addr_); |
484 if (result != 0) { | 485 if (result != 0) { |
485 local_addr_.Clear(); | 486 local_addr_.Clear(); |
486 error_ = EADDRINUSE; | 487 error_ = EADDRINUSE; |
487 return result; | 488 return result; |
488 } | 489 } |
489 } | 490 } |
490 | 491 |
491 // Send the data in a message to the appropriate socket. | 492 // Send the data in a message to the appropriate socket. |
492 return server_->SendUdp(this, static_cast<const char*>(pv), cb, addr); | 493 return server_->SendUdp(this, static_cast<const char*>(pv), cb, addr); |
493 } | 494 } |
494 | 495 |
495 int VirtualSocket::SendTcp(const void* pv, size_t cb) { | 496 int VirtualSocket::SendTcp(const void* pv, size_t cb) { |
496 size_t capacity = server_->send_buffer_capacity_ - send_buffer_.size(); | 497 size_t capacity = server_->send_buffer_capacity_ - send_buffer_.size(); |
497 if (0 == capacity) { | 498 if (0 == capacity) { |
498 write_enabled_ = true; | 499 ready_to_send_ = false; |
499 error_ = EWOULDBLOCK; | 500 error_ = EWOULDBLOCK; |
500 return -1; | 501 return -1; |
501 } | 502 } |
502 size_t consumed = std::min(cb, capacity); | 503 size_t consumed = std::min(cb, capacity); |
503 const char* cpv = static_cast<const char*>(pv); | 504 const char* cpv = static_cast<const char*>(pv); |
504 send_buffer_.insert(send_buffer_.end(), cpv, cpv + consumed); | 505 send_buffer_.insert(send_buffer_.end(), cpv, cpv + consumed); |
505 server_->SendTcp(this); | 506 server_->SendTcp(this); |
506 return static_cast<int>(consumed); | 507 return static_cast<int>(consumed); |
507 } | 508 } |
508 | 509 |
| 510 void VirtualSocket::OnSocketServerReadyToSend() { |
| 511 if (ready_to_send_) { |
| 512 // This socket didn't encounter EWOULDBLOCK, so there's nothing to do. |
| 513 return; |
| 514 } |
| 515 if (type_ == SOCK_DGRAM) { |
| 516 ready_to_send_ = true; |
| 517 SignalWriteEvent(this); |
| 518 } else { |
| 519 RTC_DCHECK(type_ == SOCK_STREAM); |
| 520 // This will attempt to empty the full send buffer, and will fire |
| 521 // SignalWriteEvent if successful. |
| 522 server_->SendTcp(this); |
| 523 } |
| 524 } |
| 525 |
509 VirtualSocketServer::VirtualSocketServer(SocketServer* ss) | 526 VirtualSocketServer::VirtualSocketServer(SocketServer* ss) |
510 : server_(ss), | 527 : server_(ss), |
511 server_owned_(false), | 528 server_owned_(false), |
512 msg_queue_(NULL), | 529 msg_queue_(NULL), |
513 stop_on_idle_(false), | 530 stop_on_idle_(false), |
514 network_delay_(0), | 531 network_delay_(0), |
515 next_ipv4_(kInitialNextIPv4), | 532 next_ipv4_(kInitialNextIPv4), |
516 next_ipv6_(kInitialNextIPv6), | 533 next_ipv6_(kInitialNextIPv6), |
517 next_port_(kFirstEphemeralPort), | 534 next_port_(kFirstEphemeralPort), |
518 bindings_(new AddressMap()), | 535 bindings_(new AddressMap()), |
(...skipping 41 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
560 uint16_t VirtualSocketServer::GetNextPort() { | 577 uint16_t VirtualSocketServer::GetNextPort() { |
561 uint16_t port = next_port_; | 578 uint16_t port = next_port_; |
562 if (next_port_ < kLastEphemeralPort) { | 579 if (next_port_ < kLastEphemeralPort) { |
563 ++next_port_; | 580 ++next_port_; |
564 } else { | 581 } else { |
565 next_port_ = kFirstEphemeralPort; | 582 next_port_ = kFirstEphemeralPort; |
566 } | 583 } |
567 return port; | 584 return port; |
568 } | 585 } |
569 | 586 |
| 587 void VirtualSocketServer::SetSendingBlocked(bool blocked) { |
| 588 if (blocked == sending_blocked_) { |
| 589 // Unchanged; nothing to do. |
| 590 return; |
| 591 } |
| 592 sending_blocked_ = blocked; |
| 593 if (!sending_blocked_) { |
| 594 // Sending was blocked, but is now unblocked. This signal gives sockets a |
| 595 // chance to fire SignalWriteEvent, and for TCP, send buffered data. |
| 596 SignalReadyToSend(); |
| 597 } |
| 598 } |
| 599 |
570 Socket* VirtualSocketServer::CreateSocket(int type) { | 600 Socket* VirtualSocketServer::CreateSocket(int type) { |
571 return CreateSocket(AF_INET, type); | 601 return CreateSocket(AF_INET, type); |
572 } | 602 } |
573 | 603 |
574 Socket* VirtualSocketServer::CreateSocket(int family, int type) { | 604 Socket* VirtualSocketServer::CreateSocket(int family, int type) { |
575 return CreateSocketInternal(family, type); | 605 return CreateSocketInternal(family, type); |
576 } | 606 } |
577 | 607 |
578 AsyncSocket* VirtualSocketServer::CreateAsyncSocket(int type) { | 608 AsyncSocket* VirtualSocketServer::CreateAsyncSocket(int type) { |
579 return CreateAsyncSocket(AF_INET, type); | 609 return CreateAsyncSocket(AF_INET, type); |
(...skipping 11 matching lines...) Expand all Loading... |
591 | 621 |
592 void VirtualSocketServer::SetMessageQueue(MessageQueue* msg_queue) { | 622 void VirtualSocketServer::SetMessageQueue(MessageQueue* msg_queue) { |
593 msg_queue_ = msg_queue; | 623 msg_queue_ = msg_queue; |
594 if (msg_queue_) { | 624 if (msg_queue_) { |
595 msg_queue_->SignalQueueDestroyed.connect(this, | 625 msg_queue_->SignalQueueDestroyed.connect(this, |
596 &VirtualSocketServer::OnMessageQueueDestroyed); | 626 &VirtualSocketServer::OnMessageQueueDestroyed); |
597 } | 627 } |
598 } | 628 } |
599 | 629 |
600 bool VirtualSocketServer::Wait(int cmsWait, bool process_io) { | 630 bool VirtualSocketServer::Wait(int cmsWait, bool process_io) { |
601 ASSERT(msg_queue_ == Thread::Current()); | 631 RTC_DCHECK(msg_queue_ == Thread::Current()); |
602 if (stop_on_idle_ && Thread::Current()->empty()) { | 632 if (stop_on_idle_ && Thread::Current()->empty()) { |
603 return false; | 633 return false; |
604 } | 634 } |
605 return socketserver()->Wait(cmsWait, process_io); | 635 return socketserver()->Wait(cmsWait, process_io); |
606 } | 636 } |
607 | 637 |
608 void VirtualSocketServer::WakeUp() { | 638 void VirtualSocketServer::WakeUp() { |
609 socketserver()->WakeUp(); | 639 socketserver()->WakeUp(); |
610 } | 640 } |
611 | 641 |
612 bool VirtualSocketServer::ProcessMessagesUntilIdle() { | 642 bool VirtualSocketServer::ProcessMessagesUntilIdle() { |
613 ASSERT(msg_queue_ == Thread::Current()); | 643 RTC_DCHECK(msg_queue_ == Thread::Current()); |
614 stop_on_idle_ = true; | 644 stop_on_idle_ = true; |
615 while (!msg_queue_->empty()) { | 645 while (!msg_queue_->empty()) { |
616 Message msg; | 646 Message msg; |
617 if (msg_queue_->Get(&msg, Thread::kForever)) { | 647 if (msg_queue_->Get(&msg, Thread::kForever)) { |
618 msg_queue_->Dispatch(&msg); | 648 msg_queue_->Dispatch(&msg); |
619 } | 649 } |
620 } | 650 } |
621 stop_on_idle_ = false; | 651 stop_on_idle_ = false; |
622 return !msg_queue_->IsQuitting(); | 652 return !msg_queue_->IsQuitting(); |
623 } | 653 } |
(...skipping 13 matching lines...) Expand all Loading... |
637 socket->SignalCloseEvent(socket, 0); | 667 socket->SignalCloseEvent(socket, 0); |
638 | 668 |
639 // Trigger the remote connection's close event. | 669 // Trigger the remote connection's close event. |
640 socket->Close(); | 670 socket->Close(); |
641 | 671 |
642 return true; | 672 return true; |
643 } | 673 } |
644 | 674 |
645 int VirtualSocketServer::Bind(VirtualSocket* socket, | 675 int VirtualSocketServer::Bind(VirtualSocket* socket, |
646 const SocketAddress& addr) { | 676 const SocketAddress& addr) { |
647 ASSERT(NULL != socket); | 677 RTC_DCHECK(NULL != socket); |
648 // Address must be completely specified at this point | 678 // Address must be completely specified at this point |
649 ASSERT(!IPIsUnspec(addr.ipaddr())); | 679 RTC_DCHECK(!IPIsUnspec(addr.ipaddr())); |
650 ASSERT(addr.port() != 0); | 680 RTC_DCHECK(addr.port() != 0); |
651 | 681 |
652 // Normalize the address (turns v6-mapped addresses into v4-addresses). | 682 // Normalize the address (turns v6-mapped addresses into v4-addresses). |
653 SocketAddress normalized(addr.ipaddr().Normalized(), addr.port()); | 683 SocketAddress normalized(addr.ipaddr().Normalized(), addr.port()); |
654 | 684 |
655 AddressMap::value_type entry(normalized, socket); | 685 AddressMap::value_type entry(normalized, socket); |
656 return bindings_->insert(entry).second ? 0 : -1; | 686 return bindings_->insert(entry).second ? 0 : -1; |
657 } | 687 } |
658 | 688 |
659 int VirtualSocketServer::Bind(VirtualSocket* socket, SocketAddress* addr) { | 689 int VirtualSocketServer::Bind(VirtualSocket* socket, SocketAddress* addr) { |
660 ASSERT(NULL != socket); | 690 RTC_DCHECK(NULL != socket); |
661 | 691 |
662 if (!IPIsUnspec(addr->ipaddr())) { | 692 if (!IPIsUnspec(addr->ipaddr())) { |
663 addr->SetIP(addr->ipaddr().Normalized()); | 693 addr->SetIP(addr->ipaddr().Normalized()); |
664 } else { | 694 } else { |
665 ASSERT(false); | 695 RTC_DCHECK(false); |
666 } | 696 } |
667 | 697 |
668 if (addr->port() == 0) { | 698 if (addr->port() == 0) { |
669 for (int i = 0; i < kEphemeralPortCount; ++i) { | 699 for (int i = 0; i < kEphemeralPortCount; ++i) { |
670 addr->SetPort(GetNextPort()); | 700 addr->SetPort(GetNextPort()); |
671 if (bindings_->find(*addr) == bindings_->end()) { | 701 if (bindings_->find(*addr) == bindings_->end()) { |
672 break; | 702 break; |
673 } | 703 } |
674 } | 704 } |
675 } | 705 } |
(...skipping 20 matching lines...) Expand all Loading... |
696 return LookupBinding(sock_addr); | 726 return LookupBinding(sock_addr); |
697 } | 727 } |
698 | 728 |
699 return nullptr; | 729 return nullptr; |
700 } | 730 } |
701 | 731 |
702 int VirtualSocketServer::Unbind(const SocketAddress& addr, | 732 int VirtualSocketServer::Unbind(const SocketAddress& addr, |
703 VirtualSocket* socket) { | 733 VirtualSocket* socket) { |
704 SocketAddress normalized(addr.ipaddr().Normalized(), | 734 SocketAddress normalized(addr.ipaddr().Normalized(), |
705 addr.port()); | 735 addr.port()); |
706 ASSERT((*bindings_)[normalized] == socket); | 736 RTC_DCHECK((*bindings_)[normalized] == socket); |
707 bindings_->erase(bindings_->find(normalized)); | 737 bindings_->erase(bindings_->find(normalized)); |
708 return 0; | 738 return 0; |
709 } | 739 } |
710 | 740 |
711 void VirtualSocketServer::AddConnection(const SocketAddress& local, | 741 void VirtualSocketServer::AddConnection(const SocketAddress& local, |
712 const SocketAddress& remote, | 742 const SocketAddress& remote, |
713 VirtualSocket* remote_socket) { | 743 VirtualSocket* remote_socket) { |
714 // Add this socket pair to our routing table. This will allow | 744 // Add this socket pair to our routing table. This will allow |
715 // multiple clients to connect to the same server address. | 745 // multiple clients to connect to the same server address. |
716 SocketAddress local_normalized(local.ipaddr().Normalized(), | 746 SocketAddress local_normalized(local.ipaddr().Normalized(), |
(...skipping 60 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
777 // Remove the mapping. | 807 // Remove the mapping. |
778 msg_queue_->PostDelayed(RTC_FROM_HERE, delay, socket, MSG_ID_DISCONNECT); | 808 msg_queue_->PostDelayed(RTC_FROM_HERE, delay, socket, MSG_ID_DISCONNECT); |
779 return true; | 809 return true; |
780 } | 810 } |
781 return false; | 811 return false; |
782 } | 812 } |
783 | 813 |
784 int VirtualSocketServer::SendUdp(VirtualSocket* socket, | 814 int VirtualSocketServer::SendUdp(VirtualSocket* socket, |
785 const char* data, size_t data_size, | 815 const char* data, size_t data_size, |
786 const SocketAddress& remote_addr) { | 816 const SocketAddress& remote_addr) { |
| 817 if (sending_blocked_) { |
| 818 CritScope cs(&socket->crit_); |
| 819 socket->ready_to_send_ = false; |
| 820 socket->error_ = EWOULDBLOCK; |
| 821 return -1; |
| 822 } |
| 823 |
787 // See if we want to drop this packet. | 824 // See if we want to drop this packet. |
788 if (Random() < drop_prob_) { | 825 if (Random() < drop_prob_) { |
789 LOG(LS_VERBOSE) << "Dropping packet: bad luck"; | 826 LOG(LS_VERBOSE) << "Dropping packet: bad luck"; |
790 return static_cast<int>(data_size); | 827 return static_cast<int>(data_size); |
791 } | 828 } |
792 | 829 |
793 VirtualSocket* recipient = LookupBinding(remote_addr); | 830 VirtualSocket* recipient = LookupBinding(remote_addr); |
794 if (!recipient) { | 831 if (!recipient) { |
795 // Make a fake recipient for address family checking. | 832 // Make a fake recipient for address family checking. |
796 std::unique_ptr<VirtualSocket> dummy_socket( | 833 std::unique_ptr<VirtualSocket> dummy_socket( |
797 CreateSocketInternal(AF_INET, SOCK_DGRAM)); | 834 CreateSocketInternal(AF_INET, SOCK_DGRAM)); |
798 dummy_socket->SetLocalAddress(remote_addr); | 835 dummy_socket->SetLocalAddress(remote_addr); |
799 if (!CanInteractWith(socket, dummy_socket.get())) { | 836 if (!CanInteractWith(socket, dummy_socket.get())) { |
800 LOG(LS_VERBOSE) << "Incompatible address families: " | 837 LOG(LS_VERBOSE) << "Incompatible address families: " |
801 << socket->GetLocalAddress() << " and " << remote_addr; | 838 << socket->GetLocalAddress() << " and " << remote_addr; |
802 return -1; | 839 return -1; |
803 } | 840 } |
804 LOG(LS_VERBOSE) << "No one listening at " << remote_addr; | 841 LOG(LS_VERBOSE) << "No one listening at " << remote_addr; |
805 return static_cast<int>(data_size); | 842 return static_cast<int>(data_size); |
806 } | 843 } |
807 | 844 |
808 if (!CanInteractWith(socket, recipient)) { | 845 if (!CanInteractWith(socket, recipient)) { |
809 LOG(LS_VERBOSE) << "Incompatible address families: " | 846 LOG(LS_VERBOSE) << "Incompatible address families: " |
810 << socket->GetLocalAddress() << " and " << remote_addr; | 847 << socket->GetLocalAddress() << " and " << remote_addr; |
811 return -1; | 848 return -1; |
812 } | 849 } |
813 | 850 |
814 CritScope cs(&socket->crit_); | 851 { |
| 852 CritScope cs(&socket->crit_); |
815 | 853 |
816 int64_t cur_time = TimeMillis(); | 854 int64_t cur_time = TimeMillis(); |
817 PurgeNetworkPackets(socket, cur_time); | 855 PurgeNetworkPackets(socket, cur_time); |
818 | 856 |
819 // Determine whether we have enough bandwidth to accept this packet. To do | 857 // Determine whether we have enough bandwidth to accept this packet. To do |
820 // this, we need to update the send queue. Once we know it's current size, | 858 // this, we need to update the send queue. Once we know it's current size, |
821 // we know whether we can fit this packet. | 859 // we know whether we can fit this packet. |
822 // | 860 // |
823 // NOTE: There are better algorithms for maintaining such a queue (such as | 861 // NOTE: There are better algorithms for maintaining such a queue (such as |
824 // "Derivative Random Drop"); however, this algorithm is a more accurate | 862 // "Derivative Random Drop"); however, this algorithm is a more accurate |
825 // simulation of what a normal network would do. | 863 // simulation of what a normal network would do. |
826 | 864 |
827 size_t packet_size = data_size + UDP_HEADER_SIZE; | 865 size_t packet_size = data_size + UDP_HEADER_SIZE; |
828 if (socket->network_size_ + packet_size > network_capacity_) { | 866 if (socket->network_size_ + packet_size > network_capacity_) { |
829 LOG(LS_VERBOSE) << "Dropping packet: network capacity exceeded"; | 867 LOG(LS_VERBOSE) << "Dropping packet: network capacity exceeded"; |
| 868 return static_cast<int>(data_size); |
| 869 } |
| 870 |
| 871 AddPacketToNetwork(socket, recipient, cur_time, data, data_size, |
| 872 UDP_HEADER_SIZE, false); |
| 873 |
830 return static_cast<int>(data_size); | 874 return static_cast<int>(data_size); |
831 } | 875 } |
832 | |
833 AddPacketToNetwork(socket, recipient, cur_time, data, data_size, | |
834 UDP_HEADER_SIZE, false); | |
835 | |
836 return static_cast<int>(data_size); | |
837 } | 876 } |
838 | 877 |
839 void VirtualSocketServer::SendTcp(VirtualSocket* socket) { | 878 void VirtualSocketServer::SendTcp(VirtualSocket* socket) { |
| 879 if (sending_blocked_) { |
| 880 // Eventually the socket's buffer will fill and VirtualSocket::SendTcp will |
| 881 // set EWOULDBLOCK. |
| 882 return; |
| 883 } |
| 884 |
840 // TCP can't send more data than will fill up the receiver's buffer. | 885 // TCP can't send more data than will fill up the receiver's buffer. |
841 // We track the data that is in the buffer plus data in flight using the | 886 // We track the data that is in the buffer plus data in flight using the |
842 // recipient's recv_buffer_size_. Anything beyond that must be stored in the | 887 // recipient's recv_buffer_size_. Anything beyond that must be stored in the |
843 // sender's buffer. We will trigger the buffered data to be sent when data | 888 // sender's buffer. We will trigger the buffered data to be sent when data |
844 // is read from the recv_buffer. | 889 // is read from the recv_buffer. |
845 | 890 |
846 // Lookup the local/remote pair in the connections table. | 891 // Lookup the local/remote pair in the connections table. |
847 VirtualSocket* recipient = LookupConnection(socket->local_addr_, | 892 VirtualSocket* recipient = LookupConnection(socket->local_addr_, |
848 socket->remote_addr_); | 893 socket->remote_addr_); |
849 if (!recipient) { | 894 if (!recipient) { |
(...skipping 22 matching lines...) Expand all Loading... |
872 // Avoid undefined access beyond the last element of the vector. | 917 // Avoid undefined access beyond the last element of the vector. |
873 // This only happens when new_buffer_size is 0. | 918 // This only happens when new_buffer_size is 0. |
874 if (data_size < socket->send_buffer_.size()) { | 919 if (data_size < socket->send_buffer_.size()) { |
875 // memmove is required for potentially overlapping source/destination. | 920 // memmove is required for potentially overlapping source/destination. |
876 memmove(&socket->send_buffer_[0], &socket->send_buffer_[data_size], | 921 memmove(&socket->send_buffer_[0], &socket->send_buffer_[data_size], |
877 new_buffer_size); | 922 new_buffer_size); |
878 } | 923 } |
879 socket->send_buffer_.resize(new_buffer_size); | 924 socket->send_buffer_.resize(new_buffer_size); |
880 } | 925 } |
881 | 926 |
882 if (socket->write_enabled_ | 927 if (!socket->ready_to_send_ && |
883 && (socket->send_buffer_.size() < send_buffer_capacity_)) { | 928 (socket->send_buffer_.size() < send_buffer_capacity_)) { |
884 socket->write_enabled_ = false; | 929 socket->ready_to_send_ = true; |
885 socket->SignalWriteEvent(socket); | 930 socket->SignalWriteEvent(socket); |
886 } | 931 } |
887 } | 932 } |
888 | 933 |
889 void VirtualSocketServer::AddPacketToNetwork(VirtualSocket* sender, | 934 void VirtualSocketServer::AddPacketToNetwork(VirtualSocket* sender, |
890 VirtualSocket* recipient, | 935 VirtualSocket* recipient, |
891 int64_t cur_time, | 936 int64_t cur_time, |
892 const char* data, | 937 const char* data, |
893 size_t data_size, | 938 size_t data_size, |
894 size_t header_size, | 939 size_t header_size, |
(...skipping 29 matching lines...) Expand all Loading... |
924 ts = std::max(ts, network_delay_); | 969 ts = std::max(ts, network_delay_); |
925 } | 970 } |
926 msg_queue_->PostAt(RTC_FROM_HERE, ts, recipient, MSG_ID_PACKET, p); | 971 msg_queue_->PostAt(RTC_FROM_HERE, ts, recipient, MSG_ID_PACKET, p); |
927 network_delay_ = std::max(ts, network_delay_); | 972 network_delay_ = std::max(ts, network_delay_); |
928 } | 973 } |
929 | 974 |
930 void VirtualSocketServer::PurgeNetworkPackets(VirtualSocket* socket, | 975 void VirtualSocketServer::PurgeNetworkPackets(VirtualSocket* socket, |
931 int64_t cur_time) { | 976 int64_t cur_time) { |
932 while (!socket->network_.empty() && | 977 while (!socket->network_.empty() && |
933 (socket->network_.front().done_time <= cur_time)) { | 978 (socket->network_.front().done_time <= cur_time)) { |
934 ASSERT(socket->network_size_ >= socket->network_.front().size); | 979 RTC_DCHECK(socket->network_size_ >= socket->network_.front().size); |
935 socket->network_size_ -= socket->network_.front().size; | 980 socket->network_size_ -= socket->network_.front().size; |
936 socket->network_.pop_front(); | 981 socket->network_.pop_front(); |
937 } | 982 } |
938 } | 983 } |
939 | 984 |
940 uint32_t VirtualSocketServer::SendDelay(uint32_t size) { | 985 uint32_t VirtualSocketServer::SendDelay(uint32_t size) { |
941 if (bandwidth_ == 0) | 986 if (bandwidth_ == 0) |
942 return 0; | 987 return 0; |
943 else | 988 else |
944 return 1000 * size / bandwidth_; | 989 return 1000 * size / bandwidth_; |
(...skipping 84 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
1029 } | 1074 } |
1030 bool operator()(double v1, const VirtualSocketServer::Point& p2) { | 1075 bool operator()(double v1, const VirtualSocketServer::Point& p2) { |
1031 return v1 < p2.first; | 1076 return v1 < p2.first; |
1032 } | 1077 } |
1033 bool operator()(const VirtualSocketServer::Point& p1, double v2) { | 1078 bool operator()(const VirtualSocketServer::Point& p1, double v2) { |
1034 return p1.first < v2; | 1079 return p1.first < v2; |
1035 } | 1080 } |
1036 }; | 1081 }; |
1037 | 1082 |
1038 VirtualSocketServer::Function* VirtualSocketServer::Accumulate(Function* f) { | 1083 VirtualSocketServer::Function* VirtualSocketServer::Accumulate(Function* f) { |
1039 ASSERT(f->size() >= 1); | 1084 RTC_DCHECK(f->size() >= 1); |
1040 double v = 0; | 1085 double v = 0; |
1041 for (Function::size_type i = 0; i < f->size() - 1; ++i) { | 1086 for (Function::size_type i = 0; i < f->size() - 1; ++i) { |
1042 double dx = (*f)[i + 1].first - (*f)[i].first; | 1087 double dx = (*f)[i + 1].first - (*f)[i].first; |
1043 double avgy = ((*f)[i + 1].second + (*f)[i].second) / 2; | 1088 double avgy = ((*f)[i + 1].second + (*f)[i].second) / 2; |
1044 (*f)[i].second = v; | 1089 (*f)[i].second = v; |
1045 v = v + dx * avgy; | 1090 v = v + dx * avgy; |
1046 } | 1091 } |
1047 (*f)[f->size()-1].second = v; | 1092 (*f)[f->size()-1].second = v; |
1048 return f; | 1093 return f; |
1049 } | 1094 } |
(...skipping 21 matching lines...) Expand all Loading... |
1071 delete f; | 1116 delete f; |
1072 return g; | 1117 return g; |
1073 } | 1118 } |
1074 | 1119 |
1075 double VirtualSocketServer::Evaluate(Function* f, double x) { | 1120 double VirtualSocketServer::Evaluate(Function* f, double x) { |
1076 Function::iterator iter = | 1121 Function::iterator iter = |
1077 std::lower_bound(f->begin(), f->end(), x, FunctionDomainCmp()); | 1122 std::lower_bound(f->begin(), f->end(), x, FunctionDomainCmp()); |
1078 if (iter == f->begin()) { | 1123 if (iter == f->begin()) { |
1079 return (*f)[0].second; | 1124 return (*f)[0].second; |
1080 } else if (iter == f->end()) { | 1125 } else if (iter == f->end()) { |
1081 ASSERT(f->size() >= 1); | 1126 RTC_DCHECK(f->size() >= 1); |
1082 return (*f)[f->size() - 1].second; | 1127 return (*f)[f->size() - 1].second; |
1083 } else if (iter->first == x) { | 1128 } else if (iter->first == x) { |
1084 return iter->second; | 1129 return iter->second; |
1085 } else { | 1130 } else { |
1086 double x1 = (iter - 1)->first; | 1131 double x1 = (iter - 1)->first; |
1087 double y1 = (iter - 1)->second; | 1132 double y1 = (iter - 1)->second; |
1088 double x2 = iter->first; | 1133 double x2 = iter->first; |
1089 double y2 = iter->second; | 1134 double y2 = iter->second; |
1090 return y1 + (y2 - y1) * (x - x1) / (x2 - x1); | 1135 return y1 + (y2 - y1) * (x - x1) / (x2 - x1); |
1091 } | 1136 } |
(...skipping 52 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
1144 void VirtualSocketServer::SetDefaultRoute(const IPAddress& from_addr) { | 1189 void VirtualSocketServer::SetDefaultRoute(const IPAddress& from_addr) { |
1145 RTC_DCHECK(!IPIsAny(from_addr)); | 1190 RTC_DCHECK(!IPIsAny(from_addr)); |
1146 if (from_addr.family() == AF_INET) { | 1191 if (from_addr.family() == AF_INET) { |
1147 default_route_v4_ = from_addr; | 1192 default_route_v4_ = from_addr; |
1148 } else if (from_addr.family() == AF_INET6) { | 1193 } else if (from_addr.family() == AF_INET6) { |
1149 default_route_v6_ = from_addr; | 1194 default_route_v6_ = from_addr; |
1150 } | 1195 } |
1151 } | 1196 } |
1152 | 1197 |
1153 } // namespace rtc | 1198 } // namespace rtc |
OLD | NEW |