Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(100)

Side by Side Diff: webrtc/base/virtualsocketserver.cc

Issue 2284903002: Adding ability to simulate EWOULDBLOCK/SignalReadyToSend. (Closed)
Patch Set: Created 4 years, 3 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 /* 1 /*
2 * Copyright 2004 The WebRTC Project Authors. All rights reserved. 2 * Copyright 2004 The WebRTC Project Authors. All rights reserved.
3 * 3 *
4 * Use of this source code is governed by a BSD-style license 4 * Use of this source code is governed by a BSD-style license
5 * that can be found in the LICENSE file in the root of the source 5 * that can be found in the LICENSE file in the root of the source
6 * tree. An additional intellectual property rights grant can be found 6 * tree. An additional intellectual property rights grant can be found
7 * in the file PATENTS. All contributing project authors may 7 * in the file PATENTS. All contributing project authors may
8 * be found in the AUTHORS file in the root of the source tree. 8 * be found in the AUTHORS file in the root of the source tree.
9 */ 9 */
10 10
(...skipping 47 matching lines...) Expand 10 before | Expand all | Expand 10 after
58 MSG_ID_CONNECT, 58 MSG_ID_CONNECT,
59 MSG_ID_DISCONNECT, 59 MSG_ID_DISCONNECT,
60 }; 60 };
61 61
62 // Packets are passed between sockets as messages. We copy the data just like 62 // Packets are passed between sockets as messages. We copy the data just like
63 // the kernel does. 63 // the kernel does.
64 class Packet : public MessageData { 64 class Packet : public MessageData {
65 public: 65 public:
66 Packet(const char* data, size_t size, const SocketAddress& from) 66 Packet(const char* data, size_t size, const SocketAddress& from)
67 : size_(size), consumed_(0), from_(from) { 67 : size_(size), consumed_(0), from_(from) {
68 ASSERT(NULL != data); 68 RTC_DCHECK(NULL != data);
69 data_ = new char[size_]; 69 data_ = new char[size_];
70 memcpy(data_, data, size_); 70 memcpy(data_, data, size_);
71 } 71 }
72 72
73 ~Packet() override { 73 ~Packet() override {
74 delete[] data_; 74 delete[] data_;
75 } 75 }
76 76
77 const char* data() const { return data_ + consumed_; } 77 const char* data() const { return data_ + consumed_; }
78 size_t size() const { return size_ - consumed_; } 78 size_t size() const { return size_ - consumed_; }
79 const SocketAddress& from() const { return from_; } 79 const SocketAddress& from() const { return from_; }
80 80
81 // Remove the first size bytes from the data. 81 // Remove the first size bytes from the data.
82 void Consume(size_t size) { 82 void Consume(size_t size) {
83 ASSERT(size + consumed_ < size_); 83 RTC_DCHECK(size + consumed_ < size_);
84 consumed_ += size; 84 consumed_ += size;
85 } 85 }
86 86
87 private: 87 private:
88 char* data_; 88 char* data_;
89 size_t size_, consumed_; 89 size_t size_, consumed_;
90 SocketAddress from_; 90 SocketAddress from_;
91 }; 91 };
92 92
93 struct MessageAddress : public MessageData { 93 struct MessageAddress : public MessageData {
94 explicit MessageAddress(const SocketAddress& a) : addr(a) { } 94 explicit MessageAddress(const SocketAddress& a) : addr(a) { }
95 SocketAddress addr; 95 SocketAddress addr;
96 }; 96 };
97 97
98 VirtualSocket::VirtualSocket(VirtualSocketServer* server, 98 VirtualSocket::VirtualSocket(VirtualSocketServer* server,
99 int family, 99 int family,
100 int type, 100 int type,
101 bool async) 101 bool async)
102 : server_(server), 102 : server_(server),
103 type_(type), 103 type_(type),
104 async_(async), 104 async_(async),
105 state_(CS_CLOSED), 105 state_(CS_CLOSED),
106 error_(0), 106 error_(0),
107 listen_queue_(NULL), 107 listen_queue_(NULL),
108 write_enabled_(false),
109 network_size_(0), 108 network_size_(0),
110 recv_buffer_size_(0), 109 recv_buffer_size_(0),
111 bound_(false), 110 bound_(false),
112 was_any_(false) { 111 was_any_(false) {
113 ASSERT((type_ == SOCK_DGRAM) || (type_ == SOCK_STREAM)); 112 RTC_DCHECK((type_ == SOCK_DGRAM) || (type_ == SOCK_STREAM));
114 ASSERT(async_ || (type_ != SOCK_STREAM)); // We only support async streams 113 RTC_DCHECK(async_ ||
114 (type_ != SOCK_STREAM)); // We only support async streams
115 server->SignalReadyToSend.connect(this,
116 &VirtualSocket::OnSocketServerReadyToSend);
115 } 117 }
116 118
117 VirtualSocket::~VirtualSocket() { 119 VirtualSocket::~VirtualSocket() {
118 Close(); 120 Close();
119 121
120 for (RecvBuffer::iterator it = recv_buffer_.begin(); it != recv_buffer_.end(); 122 for (RecvBuffer::iterator it = recv_buffer_.begin(); it != recv_buffer_.end();
121 ++it) { 123 ++it) {
122 delete *it; 124 delete *it;
123 } 125 }
124 } 126 }
(...skipping 77 matching lines...) Expand 10 before | Expand all | Expand 10 after
202 // Remove mapping for both directions. 204 // Remove mapping for both directions.
203 server_->RemoveConnection(remote_addr_, local_addr_); 205 server_->RemoveConnection(remote_addr_, local_addr_);
204 server_->RemoveConnection(local_addr_, remote_addr_); 206 server_->RemoveConnection(local_addr_, remote_addr_);
205 } 207 }
206 // Cancel potential connects 208 // Cancel potential connects
207 MessageList msgs; 209 MessageList msgs;
208 if (server_->msg_queue_) { 210 if (server_->msg_queue_) {
209 server_->msg_queue_->Clear(this, MSG_ID_CONNECT, &msgs); 211 server_->msg_queue_->Clear(this, MSG_ID_CONNECT, &msgs);
210 } 212 }
211 for (MessageList::iterator it = msgs.begin(); it != msgs.end(); ++it) { 213 for (MessageList::iterator it = msgs.begin(); it != msgs.end(); ++it) {
212 ASSERT(NULL != it->pdata); 214 RTC_DCHECK(NULL != it->pdata);
213 MessageAddress* data = static_cast<MessageAddress*>(it->pdata); 215 MessageAddress* data = static_cast<MessageAddress*>(it->pdata);
214 216
215 // Lookup remote side. 217 // Lookup remote side.
216 VirtualSocket* socket = 218 VirtualSocket* socket =
217 server_->LookupConnection(local_addr_, data->addr); 219 server_->LookupConnection(local_addr_, data->addr);
218 if (socket) { 220 if (socket) {
219 // Server socket, remote side is a socket retreived by 221 // Server socket, remote side is a socket retreived by
220 // accept. Accepted sockets are not bound so we will not 222 // accept. Accepted sockets are not bound so we will not
221 // find it by looking in the bindings table. 223 // find it by looking in the bindings table.
222 server_->Disconnect(socket); 224 server_->Disconnect(socket);
(...skipping 77 matching lines...) Expand 10 before | Expand all | Expand 10 after
300 } else { 302 } else {
301 recv_buffer_.pop_front(); 303 recv_buffer_.pop_front();
302 delete packet; 304 delete packet;
303 } 305 }
304 306
305 if (SOCK_STREAM == type_) { 307 if (SOCK_STREAM == type_) {
306 bool was_full = (recv_buffer_size_ == server_->recv_buffer_capacity_); 308 bool was_full = (recv_buffer_size_ == server_->recv_buffer_capacity_);
307 recv_buffer_size_ -= data_read; 309 recv_buffer_size_ -= data_read;
308 if (was_full) { 310 if (was_full) {
309 VirtualSocket* sender = server_->LookupBinding(remote_addr_); 311 VirtualSocket* sender = server_->LookupBinding(remote_addr_);
310 ASSERT(NULL != sender); 312 RTC_DCHECK(NULL != sender);
311 server_->SendTcp(sender); 313 server_->SendTcp(sender);
312 } 314 }
313 } 315 }
314 316
315 return static_cast<int>(data_read); 317 return static_cast<int>(data_read);
316 } 318 }
317 319
318 int VirtualSocket::Listen(int backlog) { 320 int VirtualSocket::Listen(int backlog) {
319 ASSERT(SOCK_STREAM == type_); 321 RTC_DCHECK(SOCK_STREAM == type_);
320 ASSERT(CS_CLOSED == state_); 322 RTC_DCHECK(CS_CLOSED == state_);
321 if (local_addr_.IsNil()) { 323 if (local_addr_.IsNil()) {
322 error_ = EINVAL; 324 error_ = EINVAL;
323 return -1; 325 return -1;
324 } 326 }
325 ASSERT(NULL == listen_queue_); 327 RTC_DCHECK(NULL == listen_queue_);
326 listen_queue_ = new ListenQueue; 328 listen_queue_ = new ListenQueue;
327 state_ = CS_CONNECTING; 329 state_ = CS_CONNECTING;
328 return 0; 330 return 0;
329 } 331 }
330 332
331 VirtualSocket* VirtualSocket::Accept(SocketAddress* paddr) { 333 VirtualSocket* VirtualSocket::Accept(SocketAddress* paddr) {
332 if (NULL == listen_queue_) { 334 if (NULL == listen_queue_) {
333 error_ = EINVAL; 335 error_ = EINVAL;
334 return NULL; 336 return NULL;
335 } 337 }
(...skipping 49 matching lines...) Expand 10 before | Expand all | Expand 10 after
385 387
386 int VirtualSocket::EstimateMTU(uint16_t* mtu) { 388 int VirtualSocket::EstimateMTU(uint16_t* mtu) {
387 if (CS_CONNECTED != state_) 389 if (CS_CONNECTED != state_)
388 return ENOTCONN; 390 return ENOTCONN;
389 else 391 else
390 return 65536; 392 return 65536;
391 } 393 }
392 394
393 void VirtualSocket::OnMessage(Message* pmsg) { 395 void VirtualSocket::OnMessage(Message* pmsg) {
394 if (pmsg->message_id == MSG_ID_PACKET) { 396 if (pmsg->message_id == MSG_ID_PACKET) {
395 // ASSERT(!local_addr_.IsAnyIP()); 397 // RTC_DCHECK(!local_addr_.IsAnyIP());
skvlad 2016/08/26 22:39:04 Do you happen to know why this is commented out? S
Taylor Brandstetter 2016/08/26 23:01:03 It appears this line has been commented out from t
396 ASSERT(NULL != pmsg->pdata); 398 RTC_DCHECK(NULL != pmsg->pdata);
397 Packet* packet = static_cast<Packet*>(pmsg->pdata); 399 Packet* packet = static_cast<Packet*>(pmsg->pdata);
398 400
399 recv_buffer_.push_back(packet); 401 recv_buffer_.push_back(packet);
400 402
401 if (async_) { 403 if (async_) {
402 SignalReadEvent(this); 404 SignalReadEvent(this);
403 } 405 }
404 } else if (pmsg->message_id == MSG_ID_CONNECT) { 406 } else if (pmsg->message_id == MSG_ID_CONNECT) {
405 ASSERT(NULL != pmsg->pdata); 407 RTC_DCHECK(NULL != pmsg->pdata);
406 MessageAddress* data = static_cast<MessageAddress*>(pmsg->pdata); 408 MessageAddress* data = static_cast<MessageAddress*>(pmsg->pdata);
407 if (listen_queue_ != NULL) { 409 if (listen_queue_ != NULL) {
408 listen_queue_->push_back(data->addr); 410 listen_queue_->push_back(data->addr);
409 if (async_) { 411 if (async_) {
410 SignalReadEvent(this); 412 SignalReadEvent(this);
411 } 413 }
412 } else if ((SOCK_STREAM == type_) && (CS_CONNECTING == state_)) { 414 } else if ((SOCK_STREAM == type_) && (CS_CONNECTING == state_)) {
413 CompleteConnect(data->addr, true); 415 CompleteConnect(data->addr, true);
414 } else { 416 } else {
415 LOG(LS_VERBOSE) << "Socket at " << local_addr_ << " is not listening"; 417 LOG(LS_VERBOSE) << "Socket at " << local_addr_ << " is not listening";
416 server_->Disconnect(server_->LookupBinding(data->addr)); 418 server_->Disconnect(server_->LookupBinding(data->addr));
417 } 419 }
418 delete data; 420 delete data;
419 } else if (pmsg->message_id == MSG_ID_DISCONNECT) { 421 } else if (pmsg->message_id == MSG_ID_DISCONNECT) {
420 ASSERT(SOCK_STREAM == type_); 422 RTC_DCHECK(SOCK_STREAM == type_);
421 if (CS_CLOSED != state_) { 423 if (CS_CLOSED != state_) {
422 int error = (CS_CONNECTING == state_) ? ECONNREFUSED : 0; 424 int error = (CS_CONNECTING == state_) ? ECONNREFUSED : 0;
423 state_ = CS_CLOSED; 425 state_ = CS_CLOSED;
424 remote_addr_.Clear(); 426 remote_addr_.Clear();
425 if (async_) { 427 if (async_) {
426 SignalCloseEvent(this, error); 428 SignalCloseEvent(this, error);
427 } 429 }
428 } 430 }
429 } else if (pmsg->message_id == MSG_ID_ADDRESS_BOUND) { 431 } else if (pmsg->message_id == MSG_ID_ADDRESS_BOUND) {
430 SignalAddressReady(this, GetLocalAddress()); 432 SignalAddressReady(this, GetLocalAddress());
431 } else { 433 } else {
432 ASSERT(false); 434 RTC_DCHECK(false);
433 } 435 }
434 } 436 }
435 437
436 int VirtualSocket::InitiateConnect(const SocketAddress& addr, bool use_delay) { 438 int VirtualSocket::InitiateConnect(const SocketAddress& addr, bool use_delay) {
437 if (!remote_addr_.IsNil()) { 439 if (!remote_addr_.IsNil()) {
438 error_ = (CS_CONNECTED == state_) ? EISCONN : EINPROGRESS; 440 error_ = (CS_CONNECTED == state_) ? EISCONN : EINPROGRESS;
439 return -1; 441 return -1;
440 } 442 }
441 if (local_addr_.IsNil()) { 443 if (local_addr_.IsNil()) {
442 // If there's no local address set, grab a random one in the correct AF. 444 // If there's no local address set, grab a random one in the correct AF.
(...skipping 15 matching lines...) Expand all
458 if (result != 0) { 460 if (result != 0) {
459 error_ = EHOSTUNREACH; 461 error_ = EHOSTUNREACH;
460 return -1; 462 return -1;
461 } 463 }
462 state_ = CS_CONNECTING; 464 state_ = CS_CONNECTING;
463 } 465 }
464 return 0; 466 return 0;
465 } 467 }
466 468
467 void VirtualSocket::CompleteConnect(const SocketAddress& addr, bool notify) { 469 void VirtualSocket::CompleteConnect(const SocketAddress& addr, bool notify) {
468 ASSERT(CS_CONNECTING == state_); 470 RTC_DCHECK(CS_CONNECTING == state_);
469 remote_addr_ = addr; 471 remote_addr_ = addr;
470 state_ = CS_CONNECTED; 472 state_ = CS_CONNECTED;
471 server_->AddConnection(remote_addr_, local_addr_, this); 473 server_->AddConnection(remote_addr_, local_addr_, this);
472 if (async_ && notify) { 474 if (async_ && notify) {
473 SignalConnectEvent(this); 475 SignalConnectEvent(this);
474 } 476 }
475 } 477 }
476 478
477 int VirtualSocket::SendUdp(const void* pv, 479 int VirtualSocket::SendUdp(const void* pv,
478 size_t cb, 480 size_t cb,
479 const SocketAddress& addr) { 481 const SocketAddress& addr) {
480 // If we have not been assigned a local port, then get one. 482 // If we have not been assigned a local port, then get one.
481 if (local_addr_.IsNil()) { 483 if (local_addr_.IsNil()) {
482 local_addr_ = EmptySocketAddressWithFamily(addr.ipaddr().family()); 484 local_addr_ = EmptySocketAddressWithFamily(addr.ipaddr().family());
483 int result = server_->Bind(this, &local_addr_); 485 int result = server_->Bind(this, &local_addr_);
484 if (result != 0) { 486 if (result != 0) {
485 local_addr_.Clear(); 487 local_addr_.Clear();
486 error_ = EADDRINUSE; 488 error_ = EADDRINUSE;
487 return result; 489 return result;
488 } 490 }
489 } 491 }
490 492
491 // Send the data in a message to the appropriate socket. 493 // Send the data in a message to the appropriate socket.
492 return server_->SendUdp(this, static_cast<const char*>(pv), cb, addr); 494 return server_->SendUdp(this, static_cast<const char*>(pv), cb, addr);
493 } 495 }
494 496
495 int VirtualSocket::SendTcp(const void* pv, size_t cb) { 497 int VirtualSocket::SendTcp(const void* pv, size_t cb) {
496 size_t capacity = server_->send_buffer_capacity_ - send_buffer_.size(); 498 size_t capacity = server_->send_buffer_capacity_ - send_buffer_.size();
497 if (0 == capacity) { 499 if (0 == capacity) {
498 write_enabled_ = true; 500 ready_to_send_ = false;
499 error_ = EWOULDBLOCK; 501 error_ = EWOULDBLOCK;
500 return -1; 502 return -1;
501 } 503 }
502 size_t consumed = std::min(cb, capacity); 504 size_t consumed = std::min(cb, capacity);
503 const char* cpv = static_cast<const char*>(pv); 505 const char* cpv = static_cast<const char*>(pv);
504 send_buffer_.insert(send_buffer_.end(), cpv, cpv + consumed); 506 send_buffer_.insert(send_buffer_.end(), cpv, cpv + consumed);
505 server_->SendTcp(this); 507 server_->SendTcp(this);
506 return static_cast<int>(consumed); 508 return static_cast<int>(consumed);
507 } 509 }
508 510
511 void VirtualSocket::OnSocketServerReadyToSend() {
512 if (ready_to_send_) {
513 // This socket didn't encounter EWOULDBLOCK, so there's nothing to do.
514 return;
515 }
516 if (type_ == SOCK_DGRAM) {
517 ready_to_send_ = true;
518 SignalWriteEvent(this);
519 } else {
520 RTC_DCHECK(type_ == SOCK_STREAM);
521 // This will attempt to empty the full send buffer, and will fire
522 // SignalWriteEvent if successful.
523 server_->SendTcp(this);
524 }
525 }
526
509 VirtualSocketServer::VirtualSocketServer(SocketServer* ss) 527 VirtualSocketServer::VirtualSocketServer(SocketServer* ss)
510 : server_(ss), 528 : server_(ss),
511 server_owned_(false), 529 server_owned_(false),
512 msg_queue_(NULL), 530 msg_queue_(NULL),
513 stop_on_idle_(false), 531 stop_on_idle_(false),
514 network_delay_(0), 532 network_delay_(0),
515 next_ipv4_(kInitialNextIPv4), 533 next_ipv4_(kInitialNextIPv4),
516 next_ipv6_(kInitialNextIPv6), 534 next_ipv6_(kInitialNextIPv6),
517 next_port_(kFirstEphemeralPort), 535 next_port_(kFirstEphemeralPort),
518 bindings_(new AddressMap()), 536 bindings_(new AddressMap()),
(...skipping 41 matching lines...) Expand 10 before | Expand all | Expand 10 after
560 uint16_t VirtualSocketServer::GetNextPort() { 578 uint16_t VirtualSocketServer::GetNextPort() {
561 uint16_t port = next_port_; 579 uint16_t port = next_port_;
562 if (next_port_ < kLastEphemeralPort) { 580 if (next_port_ < kLastEphemeralPort) {
563 ++next_port_; 581 ++next_port_;
564 } else { 582 } else {
565 next_port_ = kFirstEphemeralPort; 583 next_port_ = kFirstEphemeralPort;
566 } 584 }
567 return port; 585 return port;
568 } 586 }
569 587
588 void VirtualSocketServer::SetSendingBlocked(bool blocked) {
589 if (blocked == sending_blocked_) {
590 // Unchanged; nothing to do.
591 return;
592 }
593 sending_blocked_ = blocked;
594 if (!sending_blocked_) {
595 // Sending was blocked, but is now unblocked. This signal gives sockets a
596 // chance to fire SignalWriteEvent, and for TCP, send buffered data.
597 SignalReadyToSend();
598 }
599 }
600
570 Socket* VirtualSocketServer::CreateSocket(int type) { 601 Socket* VirtualSocketServer::CreateSocket(int type) {
571 return CreateSocket(AF_INET, type); 602 return CreateSocket(AF_INET, type);
572 } 603 }
573 604
574 Socket* VirtualSocketServer::CreateSocket(int family, int type) { 605 Socket* VirtualSocketServer::CreateSocket(int family, int type) {
575 return CreateSocketInternal(family, type); 606 return CreateSocketInternal(family, type);
576 } 607 }
577 608
578 AsyncSocket* VirtualSocketServer::CreateAsyncSocket(int type) { 609 AsyncSocket* VirtualSocketServer::CreateAsyncSocket(int type) {
579 return CreateAsyncSocket(AF_INET, type); 610 return CreateAsyncSocket(AF_INET, type);
(...skipping 11 matching lines...) Expand all
591 622
592 void VirtualSocketServer::SetMessageQueue(MessageQueue* msg_queue) { 623 void VirtualSocketServer::SetMessageQueue(MessageQueue* msg_queue) {
593 msg_queue_ = msg_queue; 624 msg_queue_ = msg_queue;
594 if (msg_queue_) { 625 if (msg_queue_) {
595 msg_queue_->SignalQueueDestroyed.connect(this, 626 msg_queue_->SignalQueueDestroyed.connect(this,
596 &VirtualSocketServer::OnMessageQueueDestroyed); 627 &VirtualSocketServer::OnMessageQueueDestroyed);
597 } 628 }
598 } 629 }
599 630
600 bool VirtualSocketServer::Wait(int cmsWait, bool process_io) { 631 bool VirtualSocketServer::Wait(int cmsWait, bool process_io) {
601 ASSERT(msg_queue_ == Thread::Current()); 632 RTC_DCHECK(msg_queue_ == Thread::Current());
602 if (stop_on_idle_ && Thread::Current()->empty()) { 633 if (stop_on_idle_ && Thread::Current()->empty()) {
603 return false; 634 return false;
604 } 635 }
605 return socketserver()->Wait(cmsWait, process_io); 636 return socketserver()->Wait(cmsWait, process_io);
606 } 637 }
607 638
608 void VirtualSocketServer::WakeUp() { 639 void VirtualSocketServer::WakeUp() {
609 socketserver()->WakeUp(); 640 socketserver()->WakeUp();
610 } 641 }
611 642
612 bool VirtualSocketServer::ProcessMessagesUntilIdle() { 643 bool VirtualSocketServer::ProcessMessagesUntilIdle() {
613 ASSERT(msg_queue_ == Thread::Current()); 644 RTC_DCHECK(msg_queue_ == Thread::Current());
614 stop_on_idle_ = true; 645 stop_on_idle_ = true;
615 while (!msg_queue_->empty()) { 646 while (!msg_queue_->empty()) {
616 Message msg; 647 Message msg;
617 if (msg_queue_->Get(&msg, Thread::kForever)) { 648 if (msg_queue_->Get(&msg, Thread::kForever)) {
618 msg_queue_->Dispatch(&msg); 649 msg_queue_->Dispatch(&msg);
619 } 650 }
620 } 651 }
621 stop_on_idle_ = false; 652 stop_on_idle_ = false;
622 return !msg_queue_->IsQuitting(); 653 return !msg_queue_->IsQuitting();
623 } 654 }
(...skipping 13 matching lines...) Expand all
637 socket->SignalCloseEvent(socket, 0); 668 socket->SignalCloseEvent(socket, 0);
638 669
639 // Trigger the remote connection's close event. 670 // Trigger the remote connection's close event.
640 socket->Close(); 671 socket->Close();
641 672
642 return true; 673 return true;
643 } 674 }
644 675
645 int VirtualSocketServer::Bind(VirtualSocket* socket, 676 int VirtualSocketServer::Bind(VirtualSocket* socket,
646 const SocketAddress& addr) { 677 const SocketAddress& addr) {
647 ASSERT(NULL != socket); 678 RTC_DCHECK(NULL != socket);
648 // Address must be completely specified at this point 679 // Address must be completely specified at this point
649 ASSERT(!IPIsUnspec(addr.ipaddr())); 680 RTC_DCHECK(!IPIsUnspec(addr.ipaddr()));
650 ASSERT(addr.port() != 0); 681 RTC_DCHECK(addr.port() != 0);
651 682
652 // Normalize the address (turns v6-mapped addresses into v4-addresses). 683 // Normalize the address (turns v6-mapped addresses into v4-addresses).
653 SocketAddress normalized(addr.ipaddr().Normalized(), addr.port()); 684 SocketAddress normalized(addr.ipaddr().Normalized(), addr.port());
654 685
655 AddressMap::value_type entry(normalized, socket); 686 AddressMap::value_type entry(normalized, socket);
656 return bindings_->insert(entry).second ? 0 : -1; 687 return bindings_->insert(entry).second ? 0 : -1;
657 } 688 }
658 689
659 int VirtualSocketServer::Bind(VirtualSocket* socket, SocketAddress* addr) { 690 int VirtualSocketServer::Bind(VirtualSocket* socket, SocketAddress* addr) {
660 ASSERT(NULL != socket); 691 RTC_DCHECK(NULL != socket);
661 692
662 if (!IPIsUnspec(addr->ipaddr())) { 693 if (!IPIsUnspec(addr->ipaddr())) {
663 addr->SetIP(addr->ipaddr().Normalized()); 694 addr->SetIP(addr->ipaddr().Normalized());
664 } else { 695 } else {
665 ASSERT(false); 696 RTC_DCHECK(false);
666 } 697 }
667 698
668 if (addr->port() == 0) { 699 if (addr->port() == 0) {
669 for (int i = 0; i < kEphemeralPortCount; ++i) { 700 for (int i = 0; i < kEphemeralPortCount; ++i) {
670 addr->SetPort(GetNextPort()); 701 addr->SetPort(GetNextPort());
671 if (bindings_->find(*addr) == bindings_->end()) { 702 if (bindings_->find(*addr) == bindings_->end()) {
672 break; 703 break;
673 } 704 }
674 } 705 }
675 } 706 }
(...skipping 20 matching lines...) Expand all
696 return LookupBinding(sock_addr); 727 return LookupBinding(sock_addr);
697 } 728 }
698 729
699 return nullptr; 730 return nullptr;
700 } 731 }
701 732
702 int VirtualSocketServer::Unbind(const SocketAddress& addr, 733 int VirtualSocketServer::Unbind(const SocketAddress& addr,
703 VirtualSocket* socket) { 734 VirtualSocket* socket) {
704 SocketAddress normalized(addr.ipaddr().Normalized(), 735 SocketAddress normalized(addr.ipaddr().Normalized(),
705 addr.port()); 736 addr.port());
706 ASSERT((*bindings_)[normalized] == socket); 737 RTC_DCHECK((*bindings_)[normalized] == socket);
707 bindings_->erase(bindings_->find(normalized)); 738 bindings_->erase(bindings_->find(normalized));
708 return 0; 739 return 0;
709 } 740 }
710 741
711 void VirtualSocketServer::AddConnection(const SocketAddress& local, 742 void VirtualSocketServer::AddConnection(const SocketAddress& local,
712 const SocketAddress& remote, 743 const SocketAddress& remote,
713 VirtualSocket* remote_socket) { 744 VirtualSocket* remote_socket) {
714 // Add this socket pair to our routing table. This will allow 745 // Add this socket pair to our routing table. This will allow
715 // multiple clients to connect to the same server address. 746 // multiple clients to connect to the same server address.
716 SocketAddress local_normalized(local.ipaddr().Normalized(), 747 SocketAddress local_normalized(local.ipaddr().Normalized(),
(...skipping 60 matching lines...) Expand 10 before | Expand all | Expand 10 after
777 // Remove the mapping. 808 // Remove the mapping.
778 msg_queue_->PostDelayed(RTC_FROM_HERE, delay, socket, MSG_ID_DISCONNECT); 809 msg_queue_->PostDelayed(RTC_FROM_HERE, delay, socket, MSG_ID_DISCONNECT);
779 return true; 810 return true;
780 } 811 }
781 return false; 812 return false;
782 } 813 }
783 814
784 int VirtualSocketServer::SendUdp(VirtualSocket* socket, 815 int VirtualSocketServer::SendUdp(VirtualSocket* socket,
785 const char* data, size_t data_size, 816 const char* data, size_t data_size,
786 const SocketAddress& remote_addr) { 817 const SocketAddress& remote_addr) {
818 if (sending_blocked_) {
819 CritScope cs(&socket->crit_);
820 socket->ready_to_send_ = false;
821 socket->error_ = EWOULDBLOCK;
822 return -1;
823 }
824
787 // See if we want to drop this packet. 825 // See if we want to drop this packet.
788 if (Random() < drop_prob_) { 826 if (Random() < drop_prob_) {
789 LOG(LS_VERBOSE) << "Dropping packet: bad luck"; 827 LOG(LS_VERBOSE) << "Dropping packet: bad luck";
790 return static_cast<int>(data_size); 828 return static_cast<int>(data_size);
791 } 829 }
792 830
793 VirtualSocket* recipient = LookupBinding(remote_addr); 831 VirtualSocket* recipient = LookupBinding(remote_addr);
794 if (!recipient) { 832 if (!recipient) {
795 // Make a fake recipient for address family checking. 833 // Make a fake recipient for address family checking.
796 std::unique_ptr<VirtualSocket> dummy_socket( 834 std::unique_ptr<VirtualSocket> dummy_socket(
797 CreateSocketInternal(AF_INET, SOCK_DGRAM)); 835 CreateSocketInternal(AF_INET, SOCK_DGRAM));
798 dummy_socket->SetLocalAddress(remote_addr); 836 dummy_socket->SetLocalAddress(remote_addr);
799 if (!CanInteractWith(socket, dummy_socket.get())) { 837 if (!CanInteractWith(socket, dummy_socket.get())) {
800 LOG(LS_VERBOSE) << "Incompatible address families: " 838 LOG(LS_VERBOSE) << "Incompatible address families: "
801 << socket->GetLocalAddress() << " and " << remote_addr; 839 << socket->GetLocalAddress() << " and " << remote_addr;
802 return -1; 840 return -1;
803 } 841 }
804 LOG(LS_VERBOSE) << "No one listening at " << remote_addr; 842 LOG(LS_VERBOSE) << "No one listening at " << remote_addr;
805 return static_cast<int>(data_size); 843 return static_cast<int>(data_size);
806 } 844 }
807 845
808 if (!CanInteractWith(socket, recipient)) { 846 if (!CanInteractWith(socket, recipient)) {
809 LOG(LS_VERBOSE) << "Incompatible address families: " 847 LOG(LS_VERBOSE) << "Incompatible address families: "
810 << socket->GetLocalAddress() << " and " << remote_addr; 848 << socket->GetLocalAddress() << " and " << remote_addr;
811 return -1; 849 return -1;
812 } 850 }
813 851
814 CritScope cs(&socket->crit_); 852 {
853 CritScope cs(&socket->crit_);
815 854
816 int64_t cur_time = TimeMillis(); 855 int64_t cur_time = TimeMillis();
817 PurgeNetworkPackets(socket, cur_time); 856 PurgeNetworkPackets(socket, cur_time);
818 857
819 // Determine whether we have enough bandwidth to accept this packet. To do 858 // Determine whether we have enough bandwidth to accept this packet. To do
820 // this, we need to update the send queue. Once we know it's current size, 859 // this, we need to update the send queue. Once we know it's current size,
821 // we know whether we can fit this packet. 860 // we know whether we can fit this packet.
822 // 861 //
823 // NOTE: There are better algorithms for maintaining such a queue (such as 862 // NOTE: There are better algorithms for maintaining such a queue (such as
824 // "Derivative Random Drop"); however, this algorithm is a more accurate 863 // "Derivative Random Drop"); however, this algorithm is a more accurate
825 // simulation of what a normal network would do. 864 // simulation of what a normal network would do.
826 865
827 size_t packet_size = data_size + UDP_HEADER_SIZE; 866 size_t packet_size = data_size + UDP_HEADER_SIZE;
828 if (socket->network_size_ + packet_size > network_capacity_) { 867 if (socket->network_size_ + packet_size > network_capacity_) {
829 LOG(LS_VERBOSE) << "Dropping packet: network capacity exceeded"; 868 LOG(LS_VERBOSE) << "Dropping packet: network capacity exceeded";
869 return static_cast<int>(data_size);
870 }
871
872 AddPacketToNetwork(socket, recipient, cur_time, data, data_size,
873 UDP_HEADER_SIZE, false);
874
830 return static_cast<int>(data_size); 875 return static_cast<int>(data_size);
831 } 876 }
832
833 AddPacketToNetwork(socket, recipient, cur_time, data, data_size,
834 UDP_HEADER_SIZE, false);
835
836 return static_cast<int>(data_size);
837 } 877 }
838 878
839 void VirtualSocketServer::SendTcp(VirtualSocket* socket) { 879 void VirtualSocketServer::SendTcp(VirtualSocket* socket) {
880 if (sending_blocked_) {
881 // Eventually the socket's buffer will fill and VirtualSocket::SendTcp will
882 // set EWOULDBLOCK.
883 return;
884 }
885
840 // TCP can't send more data than will fill up the receiver's buffer. 886 // TCP can't send more data than will fill up the receiver's buffer.
841 // We track the data that is in the buffer plus data in flight using the 887 // We track the data that is in the buffer plus data in flight using the
842 // recipient's recv_buffer_size_. Anything beyond that must be stored in the 888 // recipient's recv_buffer_size_. Anything beyond that must be stored in the
843 // sender's buffer. We will trigger the buffered data to be sent when data 889 // sender's buffer. We will trigger the buffered data to be sent when data
844 // is read from the recv_buffer. 890 // is read from the recv_buffer.
845 891
846 // Lookup the local/remote pair in the connections table. 892 // Lookup the local/remote pair in the connections table.
847 VirtualSocket* recipient = LookupConnection(socket->local_addr_, 893 VirtualSocket* recipient = LookupConnection(socket->local_addr_,
848 socket->remote_addr_); 894 socket->remote_addr_);
849 if (!recipient) { 895 if (!recipient) {
(...skipping 22 matching lines...) Expand all
872 // Avoid undefined access beyond the last element of the vector. 918 // Avoid undefined access beyond the last element of the vector.
873 // This only happens when new_buffer_size is 0. 919 // This only happens when new_buffer_size is 0.
874 if (data_size < socket->send_buffer_.size()) { 920 if (data_size < socket->send_buffer_.size()) {
875 // memmove is required for potentially overlapping source/destination. 921 // memmove is required for potentially overlapping source/destination.
876 memmove(&socket->send_buffer_[0], &socket->send_buffer_[data_size], 922 memmove(&socket->send_buffer_[0], &socket->send_buffer_[data_size],
877 new_buffer_size); 923 new_buffer_size);
878 } 924 }
879 socket->send_buffer_.resize(new_buffer_size); 925 socket->send_buffer_.resize(new_buffer_size);
880 } 926 }
881 927
882 if (socket->write_enabled_ 928 if (!socket->ready_to_send_ &&
883 && (socket->send_buffer_.size() < send_buffer_capacity_)) { 929 (socket->send_buffer_.size() < send_buffer_capacity_)) {
884 socket->write_enabled_ = false; 930 socket->ready_to_send_ = true;
885 socket->SignalWriteEvent(socket); 931 socket->SignalWriteEvent(socket);
886 } 932 }
887 } 933 }
888 934
889 void VirtualSocketServer::AddPacketToNetwork(VirtualSocket* sender, 935 void VirtualSocketServer::AddPacketToNetwork(VirtualSocket* sender,
890 VirtualSocket* recipient, 936 VirtualSocket* recipient,
891 int64_t cur_time, 937 int64_t cur_time,
892 const char* data, 938 const char* data,
893 size_t data_size, 939 size_t data_size,
894 size_t header_size, 940 size_t header_size,
(...skipping 29 matching lines...) Expand all
924 ts = std::max(ts, network_delay_); 970 ts = std::max(ts, network_delay_);
925 } 971 }
926 msg_queue_->PostAt(RTC_FROM_HERE, ts, recipient, MSG_ID_PACKET, p); 972 msg_queue_->PostAt(RTC_FROM_HERE, ts, recipient, MSG_ID_PACKET, p);
927 network_delay_ = std::max(ts, network_delay_); 973 network_delay_ = std::max(ts, network_delay_);
928 } 974 }
929 975
930 void VirtualSocketServer::PurgeNetworkPackets(VirtualSocket* socket, 976 void VirtualSocketServer::PurgeNetworkPackets(VirtualSocket* socket,
931 int64_t cur_time) { 977 int64_t cur_time) {
932 while (!socket->network_.empty() && 978 while (!socket->network_.empty() &&
933 (socket->network_.front().done_time <= cur_time)) { 979 (socket->network_.front().done_time <= cur_time)) {
934 ASSERT(socket->network_size_ >= socket->network_.front().size); 980 RTC_DCHECK(socket->network_size_ >= socket->network_.front().size);
935 socket->network_size_ -= socket->network_.front().size; 981 socket->network_size_ -= socket->network_.front().size;
936 socket->network_.pop_front(); 982 socket->network_.pop_front();
937 } 983 }
938 } 984 }
939 985
940 uint32_t VirtualSocketServer::SendDelay(uint32_t size) { 986 uint32_t VirtualSocketServer::SendDelay(uint32_t size) {
941 if (bandwidth_ == 0) 987 if (bandwidth_ == 0)
942 return 0; 988 return 0;
943 else 989 else
944 return 1000 * size / bandwidth_; 990 return 1000 * size / bandwidth_;
(...skipping 84 matching lines...) Expand 10 before | Expand all | Expand 10 after
1029 } 1075 }
1030 bool operator()(double v1, const VirtualSocketServer::Point& p2) { 1076 bool operator()(double v1, const VirtualSocketServer::Point& p2) {
1031 return v1 < p2.first; 1077 return v1 < p2.first;
1032 } 1078 }
1033 bool operator()(const VirtualSocketServer::Point& p1, double v2) { 1079 bool operator()(const VirtualSocketServer::Point& p1, double v2) {
1034 return p1.first < v2; 1080 return p1.first < v2;
1035 } 1081 }
1036 }; 1082 };
1037 1083
1038 VirtualSocketServer::Function* VirtualSocketServer::Accumulate(Function* f) { 1084 VirtualSocketServer::Function* VirtualSocketServer::Accumulate(Function* f) {
1039 ASSERT(f->size() >= 1); 1085 RTC_DCHECK(f->size() >= 1);
1040 double v = 0; 1086 double v = 0;
1041 for (Function::size_type i = 0; i < f->size() - 1; ++i) { 1087 for (Function::size_type i = 0; i < f->size() - 1; ++i) {
1042 double dx = (*f)[i + 1].first - (*f)[i].first; 1088 double dx = (*f)[i + 1].first - (*f)[i].first;
1043 double avgy = ((*f)[i + 1].second + (*f)[i].second) / 2; 1089 double avgy = ((*f)[i + 1].second + (*f)[i].second) / 2;
1044 (*f)[i].second = v; 1090 (*f)[i].second = v;
1045 v = v + dx * avgy; 1091 v = v + dx * avgy;
1046 } 1092 }
1047 (*f)[f->size()-1].second = v; 1093 (*f)[f->size()-1].second = v;
1048 return f; 1094 return f;
1049 } 1095 }
(...skipping 21 matching lines...) Expand all
1071 delete f; 1117 delete f;
1072 return g; 1118 return g;
1073 } 1119 }
1074 1120
1075 double VirtualSocketServer::Evaluate(Function* f, double x) { 1121 double VirtualSocketServer::Evaluate(Function* f, double x) {
1076 Function::iterator iter = 1122 Function::iterator iter =
1077 std::lower_bound(f->begin(), f->end(), x, FunctionDomainCmp()); 1123 std::lower_bound(f->begin(), f->end(), x, FunctionDomainCmp());
1078 if (iter == f->begin()) { 1124 if (iter == f->begin()) {
1079 return (*f)[0].second; 1125 return (*f)[0].second;
1080 } else if (iter == f->end()) { 1126 } else if (iter == f->end()) {
1081 ASSERT(f->size() >= 1); 1127 RTC_DCHECK(f->size() >= 1);
1082 return (*f)[f->size() - 1].second; 1128 return (*f)[f->size() - 1].second;
1083 } else if (iter->first == x) { 1129 } else if (iter->first == x) {
1084 return iter->second; 1130 return iter->second;
1085 } else { 1131 } else {
1086 double x1 = (iter - 1)->first; 1132 double x1 = (iter - 1)->first;
1087 double y1 = (iter - 1)->second; 1133 double y1 = (iter - 1)->second;
1088 double x2 = iter->first; 1134 double x2 = iter->first;
1089 double y2 = iter->second; 1135 double y2 = iter->second;
1090 return y1 + (y2 - y1) * (x - x1) / (x2 - x1); 1136 return y1 + (y2 - y1) * (x - x1) / (x2 - x1);
1091 } 1137 }
(...skipping 52 matching lines...) Expand 10 before | Expand all | Expand 10 after
1144 void VirtualSocketServer::SetDefaultRoute(const IPAddress& from_addr) { 1190 void VirtualSocketServer::SetDefaultRoute(const IPAddress& from_addr) {
1145 RTC_DCHECK(!IPIsAny(from_addr)); 1191 RTC_DCHECK(!IPIsAny(from_addr));
1146 if (from_addr.family() == AF_INET) { 1192 if (from_addr.family() == AF_INET) {
1147 default_route_v4_ = from_addr; 1193 default_route_v4_ = from_addr;
1148 } else if (from_addr.family() == AF_INET6) { 1194 } else if (from_addr.family() == AF_INET6) {
1149 default_route_v6_ = from_addr; 1195 default_route_v6_ = from_addr;
1150 } 1196 }
1151 } 1197 }
1152 1198
1153 } // namespace rtc 1199 } // namespace rtc
OLDNEW
« webrtc/base/virtualsocketserver.h ('K') | « webrtc/base/virtualsocketserver.h ('k') | no next file » | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698