Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(92)

Side by Side Diff: webrtc/base/virtualsocketserver.cc

Issue 2284903002: Adding ability to simulate EWOULDBLOCK/SignalReadyToSend. (Closed)
Patch Set: Removing commented-out assert Created 4 years, 3 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « webrtc/base/virtualsocketserver.h ('k') | no next file » | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 /* 1 /*
2 * Copyright 2004 The WebRTC Project Authors. All rights reserved. 2 * Copyright 2004 The WebRTC Project Authors. All rights reserved.
3 * 3 *
4 * Use of this source code is governed by a BSD-style license 4 * Use of this source code is governed by a BSD-style license
5 * that can be found in the LICENSE file in the root of the source 5 * that can be found in the LICENSE file in the root of the source
6 * tree. An additional intellectual property rights grant can be found 6 * tree. An additional intellectual property rights grant can be found
7 * in the file PATENTS. All contributing project authors may 7 * in the file PATENTS. All contributing project authors may
8 * be found in the AUTHORS file in the root of the source tree. 8 * be found in the AUTHORS file in the root of the source tree.
9 */ 9 */
10 10
(...skipping 47 matching lines...) Expand 10 before | Expand all | Expand 10 after
58 MSG_ID_CONNECT, 58 MSG_ID_CONNECT,
59 MSG_ID_DISCONNECT, 59 MSG_ID_DISCONNECT,
60 }; 60 };
61 61
62 // Packets are passed between sockets as messages. We copy the data just like 62 // Packets are passed between sockets as messages. We copy the data just like
63 // the kernel does. 63 // the kernel does.
64 class Packet : public MessageData { 64 class Packet : public MessageData {
65 public: 65 public:
66 Packet(const char* data, size_t size, const SocketAddress& from) 66 Packet(const char* data, size_t size, const SocketAddress& from)
67 : size_(size), consumed_(0), from_(from) { 67 : size_(size), consumed_(0), from_(from) {
68 ASSERT(NULL != data); 68 RTC_DCHECK(NULL != data);
69 data_ = new char[size_]; 69 data_ = new char[size_];
70 memcpy(data_, data, size_); 70 memcpy(data_, data, size_);
71 } 71 }
72 72
73 ~Packet() override { 73 ~Packet() override {
74 delete[] data_; 74 delete[] data_;
75 } 75 }
76 76
77 const char* data() const { return data_ + consumed_; } 77 const char* data() const { return data_ + consumed_; }
78 size_t size() const { return size_ - consumed_; } 78 size_t size() const { return size_ - consumed_; }
79 const SocketAddress& from() const { return from_; } 79 const SocketAddress& from() const { return from_; }
80 80
81 // Remove the first size bytes from the data. 81 // Remove the first size bytes from the data.
82 void Consume(size_t size) { 82 void Consume(size_t size) {
83 ASSERT(size + consumed_ < size_); 83 RTC_DCHECK(size + consumed_ < size_);
84 consumed_ += size; 84 consumed_ += size;
85 } 85 }
86 86
87 private: 87 private:
88 char* data_; 88 char* data_;
89 size_t size_, consumed_; 89 size_t size_, consumed_;
90 SocketAddress from_; 90 SocketAddress from_;
91 }; 91 };
92 92
93 struct MessageAddress : public MessageData { 93 struct MessageAddress : public MessageData {
94 explicit MessageAddress(const SocketAddress& a) : addr(a) { } 94 explicit MessageAddress(const SocketAddress& a) : addr(a) { }
95 SocketAddress addr; 95 SocketAddress addr;
96 }; 96 };
97 97
98 VirtualSocket::VirtualSocket(VirtualSocketServer* server, 98 VirtualSocket::VirtualSocket(VirtualSocketServer* server,
99 int family, 99 int family,
100 int type, 100 int type,
101 bool async) 101 bool async)
102 : server_(server), 102 : server_(server),
103 type_(type), 103 type_(type),
104 async_(async), 104 async_(async),
105 state_(CS_CLOSED), 105 state_(CS_CLOSED),
106 error_(0), 106 error_(0),
107 listen_queue_(NULL), 107 listen_queue_(NULL),
108 write_enabled_(false),
109 network_size_(0), 108 network_size_(0),
110 recv_buffer_size_(0), 109 recv_buffer_size_(0),
111 bound_(false), 110 bound_(false),
112 was_any_(false) { 111 was_any_(false) {
113 ASSERT((type_ == SOCK_DGRAM) || (type_ == SOCK_STREAM)); 112 RTC_DCHECK((type_ == SOCK_DGRAM) || (type_ == SOCK_STREAM));
114 ASSERT(async_ || (type_ != SOCK_STREAM)); // We only support async streams 113 RTC_DCHECK(async_ ||
114 (type_ != SOCK_STREAM)); // We only support async streams
115 server->SignalReadyToSend.connect(this,
116 &VirtualSocket::OnSocketServerReadyToSend);
115 } 117 }
116 118
117 VirtualSocket::~VirtualSocket() { 119 VirtualSocket::~VirtualSocket() {
118 Close(); 120 Close();
119 121
120 for (RecvBuffer::iterator it = recv_buffer_.begin(); it != recv_buffer_.end(); 122 for (RecvBuffer::iterator it = recv_buffer_.begin(); it != recv_buffer_.end();
121 ++it) { 123 ++it) {
122 delete *it; 124 delete *it;
123 } 125 }
124 } 126 }
(...skipping 77 matching lines...) Expand 10 before | Expand all | Expand 10 after
202 // Remove mapping for both directions. 204 // Remove mapping for both directions.
203 server_->RemoveConnection(remote_addr_, local_addr_); 205 server_->RemoveConnection(remote_addr_, local_addr_);
204 server_->RemoveConnection(local_addr_, remote_addr_); 206 server_->RemoveConnection(local_addr_, remote_addr_);
205 } 207 }
206 // Cancel potential connects 208 // Cancel potential connects
207 MessageList msgs; 209 MessageList msgs;
208 if (server_->msg_queue_) { 210 if (server_->msg_queue_) {
209 server_->msg_queue_->Clear(this, MSG_ID_CONNECT, &msgs); 211 server_->msg_queue_->Clear(this, MSG_ID_CONNECT, &msgs);
210 } 212 }
211 for (MessageList::iterator it = msgs.begin(); it != msgs.end(); ++it) { 213 for (MessageList::iterator it = msgs.begin(); it != msgs.end(); ++it) {
212 ASSERT(NULL != it->pdata); 214 RTC_DCHECK(NULL != it->pdata);
213 MessageAddress* data = static_cast<MessageAddress*>(it->pdata); 215 MessageAddress* data = static_cast<MessageAddress*>(it->pdata);
214 216
215 // Lookup remote side. 217 // Lookup remote side.
216 VirtualSocket* socket = 218 VirtualSocket* socket =
217 server_->LookupConnection(local_addr_, data->addr); 219 server_->LookupConnection(local_addr_, data->addr);
218 if (socket) { 220 if (socket) {
219 // Server socket, remote side is a socket retreived by 221 // Server socket, remote side is a socket retreived by
220 // accept. Accepted sockets are not bound so we will not 222 // accept. Accepted sockets are not bound so we will not
221 // find it by looking in the bindings table. 223 // find it by looking in the bindings table.
222 server_->Disconnect(socket); 224 server_->Disconnect(socket);
(...skipping 77 matching lines...) Expand 10 before | Expand all | Expand 10 after
300 } else { 302 } else {
301 recv_buffer_.pop_front(); 303 recv_buffer_.pop_front();
302 delete packet; 304 delete packet;
303 } 305 }
304 306
305 if (SOCK_STREAM == type_) { 307 if (SOCK_STREAM == type_) {
306 bool was_full = (recv_buffer_size_ == server_->recv_buffer_capacity_); 308 bool was_full = (recv_buffer_size_ == server_->recv_buffer_capacity_);
307 recv_buffer_size_ -= data_read; 309 recv_buffer_size_ -= data_read;
308 if (was_full) { 310 if (was_full) {
309 VirtualSocket* sender = server_->LookupBinding(remote_addr_); 311 VirtualSocket* sender = server_->LookupBinding(remote_addr_);
310 ASSERT(NULL != sender); 312 RTC_DCHECK(NULL != sender);
311 server_->SendTcp(sender); 313 server_->SendTcp(sender);
312 } 314 }
313 } 315 }
314 316
315 return static_cast<int>(data_read); 317 return static_cast<int>(data_read);
316 } 318 }
317 319
318 int VirtualSocket::Listen(int backlog) { 320 int VirtualSocket::Listen(int backlog) {
319 ASSERT(SOCK_STREAM == type_); 321 RTC_DCHECK(SOCK_STREAM == type_);
320 ASSERT(CS_CLOSED == state_); 322 RTC_DCHECK(CS_CLOSED == state_);
321 if (local_addr_.IsNil()) { 323 if (local_addr_.IsNil()) {
322 error_ = EINVAL; 324 error_ = EINVAL;
323 return -1; 325 return -1;
324 } 326 }
325 ASSERT(NULL == listen_queue_); 327 RTC_DCHECK(NULL == listen_queue_);
326 listen_queue_ = new ListenQueue; 328 listen_queue_ = new ListenQueue;
327 state_ = CS_CONNECTING; 329 state_ = CS_CONNECTING;
328 return 0; 330 return 0;
329 } 331 }
330 332
331 VirtualSocket* VirtualSocket::Accept(SocketAddress* paddr) { 333 VirtualSocket* VirtualSocket::Accept(SocketAddress* paddr) {
332 if (NULL == listen_queue_) { 334 if (NULL == listen_queue_) {
333 error_ = EINVAL; 335 error_ = EINVAL;
334 return NULL; 336 return NULL;
335 } 337 }
(...skipping 49 matching lines...) Expand 10 before | Expand all | Expand 10 after
385 387
386 int VirtualSocket::EstimateMTU(uint16_t* mtu) { 388 int VirtualSocket::EstimateMTU(uint16_t* mtu) {
387 if (CS_CONNECTED != state_) 389 if (CS_CONNECTED != state_)
388 return ENOTCONN; 390 return ENOTCONN;
389 else 391 else
390 return 65536; 392 return 65536;
391 } 393 }
392 394
393 void VirtualSocket::OnMessage(Message* pmsg) { 395 void VirtualSocket::OnMessage(Message* pmsg) {
394 if (pmsg->message_id == MSG_ID_PACKET) { 396 if (pmsg->message_id == MSG_ID_PACKET) {
395 // ASSERT(!local_addr_.IsAnyIP()); 397 RTC_DCHECK(NULL != pmsg->pdata);
396 ASSERT(NULL != pmsg->pdata);
397 Packet* packet = static_cast<Packet*>(pmsg->pdata); 398 Packet* packet = static_cast<Packet*>(pmsg->pdata);
398 399
399 recv_buffer_.push_back(packet); 400 recv_buffer_.push_back(packet);
400 401
401 if (async_) { 402 if (async_) {
402 SignalReadEvent(this); 403 SignalReadEvent(this);
403 } 404 }
404 } else if (pmsg->message_id == MSG_ID_CONNECT) { 405 } else if (pmsg->message_id == MSG_ID_CONNECT) {
405 ASSERT(NULL != pmsg->pdata); 406 RTC_DCHECK(NULL != pmsg->pdata);
406 MessageAddress* data = static_cast<MessageAddress*>(pmsg->pdata); 407 MessageAddress* data = static_cast<MessageAddress*>(pmsg->pdata);
407 if (listen_queue_ != NULL) { 408 if (listen_queue_ != NULL) {
408 listen_queue_->push_back(data->addr); 409 listen_queue_->push_back(data->addr);
409 if (async_) { 410 if (async_) {
410 SignalReadEvent(this); 411 SignalReadEvent(this);
411 } 412 }
412 } else if ((SOCK_STREAM == type_) && (CS_CONNECTING == state_)) { 413 } else if ((SOCK_STREAM == type_) && (CS_CONNECTING == state_)) {
413 CompleteConnect(data->addr, true); 414 CompleteConnect(data->addr, true);
414 } else { 415 } else {
415 LOG(LS_VERBOSE) << "Socket at " << local_addr_ << " is not listening"; 416 LOG(LS_VERBOSE) << "Socket at " << local_addr_ << " is not listening";
416 server_->Disconnect(server_->LookupBinding(data->addr)); 417 server_->Disconnect(server_->LookupBinding(data->addr));
417 } 418 }
418 delete data; 419 delete data;
419 } else if (pmsg->message_id == MSG_ID_DISCONNECT) { 420 } else if (pmsg->message_id == MSG_ID_DISCONNECT) {
420 ASSERT(SOCK_STREAM == type_); 421 RTC_DCHECK(SOCK_STREAM == type_);
421 if (CS_CLOSED != state_) { 422 if (CS_CLOSED != state_) {
422 int error = (CS_CONNECTING == state_) ? ECONNREFUSED : 0; 423 int error = (CS_CONNECTING == state_) ? ECONNREFUSED : 0;
423 state_ = CS_CLOSED; 424 state_ = CS_CLOSED;
424 remote_addr_.Clear(); 425 remote_addr_.Clear();
425 if (async_) { 426 if (async_) {
426 SignalCloseEvent(this, error); 427 SignalCloseEvent(this, error);
427 } 428 }
428 } 429 }
429 } else if (pmsg->message_id == MSG_ID_ADDRESS_BOUND) { 430 } else if (pmsg->message_id == MSG_ID_ADDRESS_BOUND) {
430 SignalAddressReady(this, GetLocalAddress()); 431 SignalAddressReady(this, GetLocalAddress());
431 } else { 432 } else {
432 ASSERT(false); 433 RTC_DCHECK(false);
433 } 434 }
434 } 435 }
435 436
436 int VirtualSocket::InitiateConnect(const SocketAddress& addr, bool use_delay) { 437 int VirtualSocket::InitiateConnect(const SocketAddress& addr, bool use_delay) {
437 if (!remote_addr_.IsNil()) { 438 if (!remote_addr_.IsNil()) {
438 error_ = (CS_CONNECTED == state_) ? EISCONN : EINPROGRESS; 439 error_ = (CS_CONNECTED == state_) ? EISCONN : EINPROGRESS;
439 return -1; 440 return -1;
440 } 441 }
441 if (local_addr_.IsNil()) { 442 if (local_addr_.IsNil()) {
442 // If there's no local address set, grab a random one in the correct AF. 443 // If there's no local address set, grab a random one in the correct AF.
(...skipping 15 matching lines...) Expand all
458 if (result != 0) { 459 if (result != 0) {
459 error_ = EHOSTUNREACH; 460 error_ = EHOSTUNREACH;
460 return -1; 461 return -1;
461 } 462 }
462 state_ = CS_CONNECTING; 463 state_ = CS_CONNECTING;
463 } 464 }
464 return 0; 465 return 0;
465 } 466 }
466 467
467 void VirtualSocket::CompleteConnect(const SocketAddress& addr, bool notify) { 468 void VirtualSocket::CompleteConnect(const SocketAddress& addr, bool notify) {
468 ASSERT(CS_CONNECTING == state_); 469 RTC_DCHECK(CS_CONNECTING == state_);
469 remote_addr_ = addr; 470 remote_addr_ = addr;
470 state_ = CS_CONNECTED; 471 state_ = CS_CONNECTED;
471 server_->AddConnection(remote_addr_, local_addr_, this); 472 server_->AddConnection(remote_addr_, local_addr_, this);
472 if (async_ && notify) { 473 if (async_ && notify) {
473 SignalConnectEvent(this); 474 SignalConnectEvent(this);
474 } 475 }
475 } 476 }
476 477
477 int VirtualSocket::SendUdp(const void* pv, 478 int VirtualSocket::SendUdp(const void* pv,
478 size_t cb, 479 size_t cb,
479 const SocketAddress& addr) { 480 const SocketAddress& addr) {
480 // If we have not been assigned a local port, then get one. 481 // If we have not been assigned a local port, then get one.
481 if (local_addr_.IsNil()) { 482 if (local_addr_.IsNil()) {
482 local_addr_ = EmptySocketAddressWithFamily(addr.ipaddr().family()); 483 local_addr_ = EmptySocketAddressWithFamily(addr.ipaddr().family());
483 int result = server_->Bind(this, &local_addr_); 484 int result = server_->Bind(this, &local_addr_);
484 if (result != 0) { 485 if (result != 0) {
485 local_addr_.Clear(); 486 local_addr_.Clear();
486 error_ = EADDRINUSE; 487 error_ = EADDRINUSE;
487 return result; 488 return result;
488 } 489 }
489 } 490 }
490 491
491 // Send the data in a message to the appropriate socket. 492 // Send the data in a message to the appropriate socket.
492 return server_->SendUdp(this, static_cast<const char*>(pv), cb, addr); 493 return server_->SendUdp(this, static_cast<const char*>(pv), cb, addr);
493 } 494 }
494 495
495 int VirtualSocket::SendTcp(const void* pv, size_t cb) { 496 int VirtualSocket::SendTcp(const void* pv, size_t cb) {
496 size_t capacity = server_->send_buffer_capacity_ - send_buffer_.size(); 497 size_t capacity = server_->send_buffer_capacity_ - send_buffer_.size();
497 if (0 == capacity) { 498 if (0 == capacity) {
498 write_enabled_ = true; 499 ready_to_send_ = false;
499 error_ = EWOULDBLOCK; 500 error_ = EWOULDBLOCK;
500 return -1; 501 return -1;
501 } 502 }
502 size_t consumed = std::min(cb, capacity); 503 size_t consumed = std::min(cb, capacity);
503 const char* cpv = static_cast<const char*>(pv); 504 const char* cpv = static_cast<const char*>(pv);
504 send_buffer_.insert(send_buffer_.end(), cpv, cpv + consumed); 505 send_buffer_.insert(send_buffer_.end(), cpv, cpv + consumed);
505 server_->SendTcp(this); 506 server_->SendTcp(this);
506 return static_cast<int>(consumed); 507 return static_cast<int>(consumed);
507 } 508 }
508 509
510 void VirtualSocket::OnSocketServerReadyToSend() {
511 if (ready_to_send_) {
512 // This socket didn't encounter EWOULDBLOCK, so there's nothing to do.
513 return;
514 }
515 if (type_ == SOCK_DGRAM) {
516 ready_to_send_ = true;
517 SignalWriteEvent(this);
518 } else {
519 RTC_DCHECK(type_ == SOCK_STREAM);
520 // This will attempt to empty the full send buffer, and will fire
521 // SignalWriteEvent if successful.
522 server_->SendTcp(this);
523 }
524 }
525
509 VirtualSocketServer::VirtualSocketServer(SocketServer* ss) 526 VirtualSocketServer::VirtualSocketServer(SocketServer* ss)
510 : server_(ss), 527 : server_(ss),
511 server_owned_(false), 528 server_owned_(false),
512 msg_queue_(NULL), 529 msg_queue_(NULL),
513 stop_on_idle_(false), 530 stop_on_idle_(false),
514 network_delay_(0), 531 network_delay_(0),
515 next_ipv4_(kInitialNextIPv4), 532 next_ipv4_(kInitialNextIPv4),
516 next_ipv6_(kInitialNextIPv6), 533 next_ipv6_(kInitialNextIPv6),
517 next_port_(kFirstEphemeralPort), 534 next_port_(kFirstEphemeralPort),
518 bindings_(new AddressMap()), 535 bindings_(new AddressMap()),
(...skipping 41 matching lines...) Expand 10 before | Expand all | Expand 10 after
560 uint16_t VirtualSocketServer::GetNextPort() { 577 uint16_t VirtualSocketServer::GetNextPort() {
561 uint16_t port = next_port_; 578 uint16_t port = next_port_;
562 if (next_port_ < kLastEphemeralPort) { 579 if (next_port_ < kLastEphemeralPort) {
563 ++next_port_; 580 ++next_port_;
564 } else { 581 } else {
565 next_port_ = kFirstEphemeralPort; 582 next_port_ = kFirstEphemeralPort;
566 } 583 }
567 return port; 584 return port;
568 } 585 }
569 586
587 void VirtualSocketServer::SetSendingBlocked(bool blocked) {
588 if (blocked == sending_blocked_) {
589 // Unchanged; nothing to do.
590 return;
591 }
592 sending_blocked_ = blocked;
593 if (!sending_blocked_) {
594 // Sending was blocked, but is now unblocked. This signal gives sockets a
595 // chance to fire SignalWriteEvent, and for TCP, send buffered data.
596 SignalReadyToSend();
597 }
598 }
599
570 Socket* VirtualSocketServer::CreateSocket(int type) { 600 Socket* VirtualSocketServer::CreateSocket(int type) {
571 return CreateSocket(AF_INET, type); 601 return CreateSocket(AF_INET, type);
572 } 602 }
573 603
574 Socket* VirtualSocketServer::CreateSocket(int family, int type) { 604 Socket* VirtualSocketServer::CreateSocket(int family, int type) {
575 return CreateSocketInternal(family, type); 605 return CreateSocketInternal(family, type);
576 } 606 }
577 607
578 AsyncSocket* VirtualSocketServer::CreateAsyncSocket(int type) { 608 AsyncSocket* VirtualSocketServer::CreateAsyncSocket(int type) {
579 return CreateAsyncSocket(AF_INET, type); 609 return CreateAsyncSocket(AF_INET, type);
(...skipping 11 matching lines...) Expand all
591 621
592 void VirtualSocketServer::SetMessageQueue(MessageQueue* msg_queue) { 622 void VirtualSocketServer::SetMessageQueue(MessageQueue* msg_queue) {
593 msg_queue_ = msg_queue; 623 msg_queue_ = msg_queue;
594 if (msg_queue_) { 624 if (msg_queue_) {
595 msg_queue_->SignalQueueDestroyed.connect(this, 625 msg_queue_->SignalQueueDestroyed.connect(this,
596 &VirtualSocketServer::OnMessageQueueDestroyed); 626 &VirtualSocketServer::OnMessageQueueDestroyed);
597 } 627 }
598 } 628 }
599 629
600 bool VirtualSocketServer::Wait(int cmsWait, bool process_io) { 630 bool VirtualSocketServer::Wait(int cmsWait, bool process_io) {
601 ASSERT(msg_queue_ == Thread::Current()); 631 RTC_DCHECK(msg_queue_ == Thread::Current());
602 if (stop_on_idle_ && Thread::Current()->empty()) { 632 if (stop_on_idle_ && Thread::Current()->empty()) {
603 return false; 633 return false;
604 } 634 }
605 return socketserver()->Wait(cmsWait, process_io); 635 return socketserver()->Wait(cmsWait, process_io);
606 } 636 }
607 637
608 void VirtualSocketServer::WakeUp() { 638 void VirtualSocketServer::WakeUp() {
609 socketserver()->WakeUp(); 639 socketserver()->WakeUp();
610 } 640 }
611 641
612 bool VirtualSocketServer::ProcessMessagesUntilIdle() { 642 bool VirtualSocketServer::ProcessMessagesUntilIdle() {
613 ASSERT(msg_queue_ == Thread::Current()); 643 RTC_DCHECK(msg_queue_ == Thread::Current());
614 stop_on_idle_ = true; 644 stop_on_idle_ = true;
615 while (!msg_queue_->empty()) { 645 while (!msg_queue_->empty()) {
616 Message msg; 646 Message msg;
617 if (msg_queue_->Get(&msg, Thread::kForever)) { 647 if (msg_queue_->Get(&msg, Thread::kForever)) {
618 msg_queue_->Dispatch(&msg); 648 msg_queue_->Dispatch(&msg);
619 } 649 }
620 } 650 }
621 stop_on_idle_ = false; 651 stop_on_idle_ = false;
622 return !msg_queue_->IsQuitting(); 652 return !msg_queue_->IsQuitting();
623 } 653 }
(...skipping 13 matching lines...) Expand all
637 socket->SignalCloseEvent(socket, 0); 667 socket->SignalCloseEvent(socket, 0);
638 668
639 // Trigger the remote connection's close event. 669 // Trigger the remote connection's close event.
640 socket->Close(); 670 socket->Close();
641 671
642 return true; 672 return true;
643 } 673 }
644 674
645 int VirtualSocketServer::Bind(VirtualSocket* socket, 675 int VirtualSocketServer::Bind(VirtualSocket* socket,
646 const SocketAddress& addr) { 676 const SocketAddress& addr) {
647 ASSERT(NULL != socket); 677 RTC_DCHECK(NULL != socket);
648 // Address must be completely specified at this point 678 // Address must be completely specified at this point
649 ASSERT(!IPIsUnspec(addr.ipaddr())); 679 RTC_DCHECK(!IPIsUnspec(addr.ipaddr()));
650 ASSERT(addr.port() != 0); 680 RTC_DCHECK(addr.port() != 0);
651 681
652 // Normalize the address (turns v6-mapped addresses into v4-addresses). 682 // Normalize the address (turns v6-mapped addresses into v4-addresses).
653 SocketAddress normalized(addr.ipaddr().Normalized(), addr.port()); 683 SocketAddress normalized(addr.ipaddr().Normalized(), addr.port());
654 684
655 AddressMap::value_type entry(normalized, socket); 685 AddressMap::value_type entry(normalized, socket);
656 return bindings_->insert(entry).second ? 0 : -1; 686 return bindings_->insert(entry).second ? 0 : -1;
657 } 687 }
658 688
659 int VirtualSocketServer::Bind(VirtualSocket* socket, SocketAddress* addr) { 689 int VirtualSocketServer::Bind(VirtualSocket* socket, SocketAddress* addr) {
660 ASSERT(NULL != socket); 690 RTC_DCHECK(NULL != socket);
661 691
662 if (!IPIsUnspec(addr->ipaddr())) { 692 if (!IPIsUnspec(addr->ipaddr())) {
663 addr->SetIP(addr->ipaddr().Normalized()); 693 addr->SetIP(addr->ipaddr().Normalized());
664 } else { 694 } else {
665 ASSERT(false); 695 RTC_DCHECK(false);
666 } 696 }
667 697
668 if (addr->port() == 0) { 698 if (addr->port() == 0) {
669 for (int i = 0; i < kEphemeralPortCount; ++i) { 699 for (int i = 0; i < kEphemeralPortCount; ++i) {
670 addr->SetPort(GetNextPort()); 700 addr->SetPort(GetNextPort());
671 if (bindings_->find(*addr) == bindings_->end()) { 701 if (bindings_->find(*addr) == bindings_->end()) {
672 break; 702 break;
673 } 703 }
674 } 704 }
675 } 705 }
(...skipping 20 matching lines...) Expand all
696 return LookupBinding(sock_addr); 726 return LookupBinding(sock_addr);
697 } 727 }
698 728
699 return nullptr; 729 return nullptr;
700 } 730 }
701 731
702 int VirtualSocketServer::Unbind(const SocketAddress& addr, 732 int VirtualSocketServer::Unbind(const SocketAddress& addr,
703 VirtualSocket* socket) { 733 VirtualSocket* socket) {
704 SocketAddress normalized(addr.ipaddr().Normalized(), 734 SocketAddress normalized(addr.ipaddr().Normalized(),
705 addr.port()); 735 addr.port());
706 ASSERT((*bindings_)[normalized] == socket); 736 RTC_DCHECK((*bindings_)[normalized] == socket);
707 bindings_->erase(bindings_->find(normalized)); 737 bindings_->erase(bindings_->find(normalized));
708 return 0; 738 return 0;
709 } 739 }
710 740
711 void VirtualSocketServer::AddConnection(const SocketAddress& local, 741 void VirtualSocketServer::AddConnection(const SocketAddress& local,
712 const SocketAddress& remote, 742 const SocketAddress& remote,
713 VirtualSocket* remote_socket) { 743 VirtualSocket* remote_socket) {
714 // Add this socket pair to our routing table. This will allow 744 // Add this socket pair to our routing table. This will allow
715 // multiple clients to connect to the same server address. 745 // multiple clients to connect to the same server address.
716 SocketAddress local_normalized(local.ipaddr().Normalized(), 746 SocketAddress local_normalized(local.ipaddr().Normalized(),
(...skipping 60 matching lines...) Expand 10 before | Expand all | Expand 10 after
777 // Remove the mapping. 807 // Remove the mapping.
778 msg_queue_->PostDelayed(RTC_FROM_HERE, delay, socket, MSG_ID_DISCONNECT); 808 msg_queue_->PostDelayed(RTC_FROM_HERE, delay, socket, MSG_ID_DISCONNECT);
779 return true; 809 return true;
780 } 810 }
781 return false; 811 return false;
782 } 812 }
783 813
784 int VirtualSocketServer::SendUdp(VirtualSocket* socket, 814 int VirtualSocketServer::SendUdp(VirtualSocket* socket,
785 const char* data, size_t data_size, 815 const char* data, size_t data_size,
786 const SocketAddress& remote_addr) { 816 const SocketAddress& remote_addr) {
817 if (sending_blocked_) {
818 CritScope cs(&socket->crit_);
819 socket->ready_to_send_ = false;
820 socket->error_ = EWOULDBLOCK;
821 return -1;
822 }
823
787 // See if we want to drop this packet. 824 // See if we want to drop this packet.
788 if (Random() < drop_prob_) { 825 if (Random() < drop_prob_) {
789 LOG(LS_VERBOSE) << "Dropping packet: bad luck"; 826 LOG(LS_VERBOSE) << "Dropping packet: bad luck";
790 return static_cast<int>(data_size); 827 return static_cast<int>(data_size);
791 } 828 }
792 829
793 VirtualSocket* recipient = LookupBinding(remote_addr); 830 VirtualSocket* recipient = LookupBinding(remote_addr);
794 if (!recipient) { 831 if (!recipient) {
795 // Make a fake recipient for address family checking. 832 // Make a fake recipient for address family checking.
796 std::unique_ptr<VirtualSocket> dummy_socket( 833 std::unique_ptr<VirtualSocket> dummy_socket(
797 CreateSocketInternal(AF_INET, SOCK_DGRAM)); 834 CreateSocketInternal(AF_INET, SOCK_DGRAM));
798 dummy_socket->SetLocalAddress(remote_addr); 835 dummy_socket->SetLocalAddress(remote_addr);
799 if (!CanInteractWith(socket, dummy_socket.get())) { 836 if (!CanInteractWith(socket, dummy_socket.get())) {
800 LOG(LS_VERBOSE) << "Incompatible address families: " 837 LOG(LS_VERBOSE) << "Incompatible address families: "
801 << socket->GetLocalAddress() << " and " << remote_addr; 838 << socket->GetLocalAddress() << " and " << remote_addr;
802 return -1; 839 return -1;
803 } 840 }
804 LOG(LS_VERBOSE) << "No one listening at " << remote_addr; 841 LOG(LS_VERBOSE) << "No one listening at " << remote_addr;
805 return static_cast<int>(data_size); 842 return static_cast<int>(data_size);
806 } 843 }
807 844
808 if (!CanInteractWith(socket, recipient)) { 845 if (!CanInteractWith(socket, recipient)) {
809 LOG(LS_VERBOSE) << "Incompatible address families: " 846 LOG(LS_VERBOSE) << "Incompatible address families: "
810 << socket->GetLocalAddress() << " and " << remote_addr; 847 << socket->GetLocalAddress() << " and " << remote_addr;
811 return -1; 848 return -1;
812 } 849 }
813 850
814 CritScope cs(&socket->crit_); 851 {
852 CritScope cs(&socket->crit_);
815 853
816 int64_t cur_time = TimeMillis(); 854 int64_t cur_time = TimeMillis();
817 PurgeNetworkPackets(socket, cur_time); 855 PurgeNetworkPackets(socket, cur_time);
818 856
819 // Determine whether we have enough bandwidth to accept this packet. To do 857 // Determine whether we have enough bandwidth to accept this packet. To do
820 // this, we need to update the send queue. Once we know it's current size, 858 // this, we need to update the send queue. Once we know it's current size,
821 // we know whether we can fit this packet. 859 // we know whether we can fit this packet.
822 // 860 //
823 // NOTE: There are better algorithms for maintaining such a queue (such as 861 // NOTE: There are better algorithms for maintaining such a queue (such as
824 // "Derivative Random Drop"); however, this algorithm is a more accurate 862 // "Derivative Random Drop"); however, this algorithm is a more accurate
825 // simulation of what a normal network would do. 863 // simulation of what a normal network would do.
826 864
827 size_t packet_size = data_size + UDP_HEADER_SIZE; 865 size_t packet_size = data_size + UDP_HEADER_SIZE;
828 if (socket->network_size_ + packet_size > network_capacity_) { 866 if (socket->network_size_ + packet_size > network_capacity_) {
829 LOG(LS_VERBOSE) << "Dropping packet: network capacity exceeded"; 867 LOG(LS_VERBOSE) << "Dropping packet: network capacity exceeded";
868 return static_cast<int>(data_size);
869 }
870
871 AddPacketToNetwork(socket, recipient, cur_time, data, data_size,
872 UDP_HEADER_SIZE, false);
873
830 return static_cast<int>(data_size); 874 return static_cast<int>(data_size);
831 } 875 }
832
833 AddPacketToNetwork(socket, recipient, cur_time, data, data_size,
834 UDP_HEADER_SIZE, false);
835
836 return static_cast<int>(data_size);
837 } 876 }
838 877
839 void VirtualSocketServer::SendTcp(VirtualSocket* socket) { 878 void VirtualSocketServer::SendTcp(VirtualSocket* socket) {
879 if (sending_blocked_) {
880 // Eventually the socket's buffer will fill and VirtualSocket::SendTcp will
881 // set EWOULDBLOCK.
882 return;
883 }
884
840 // TCP can't send more data than will fill up the receiver's buffer. 885 // TCP can't send more data than will fill up the receiver's buffer.
841 // We track the data that is in the buffer plus data in flight using the 886 // We track the data that is in the buffer plus data in flight using the
842 // recipient's recv_buffer_size_. Anything beyond that must be stored in the 887 // recipient's recv_buffer_size_. Anything beyond that must be stored in the
843 // sender's buffer. We will trigger the buffered data to be sent when data 888 // sender's buffer. We will trigger the buffered data to be sent when data
844 // is read from the recv_buffer. 889 // is read from the recv_buffer.
845 890
846 // Lookup the local/remote pair in the connections table. 891 // Lookup the local/remote pair in the connections table.
847 VirtualSocket* recipient = LookupConnection(socket->local_addr_, 892 VirtualSocket* recipient = LookupConnection(socket->local_addr_,
848 socket->remote_addr_); 893 socket->remote_addr_);
849 if (!recipient) { 894 if (!recipient) {
(...skipping 22 matching lines...) Expand all
872 // Avoid undefined access beyond the last element of the vector. 917 // Avoid undefined access beyond the last element of the vector.
873 // This only happens when new_buffer_size is 0. 918 // This only happens when new_buffer_size is 0.
874 if (data_size < socket->send_buffer_.size()) { 919 if (data_size < socket->send_buffer_.size()) {
875 // memmove is required for potentially overlapping source/destination. 920 // memmove is required for potentially overlapping source/destination.
876 memmove(&socket->send_buffer_[0], &socket->send_buffer_[data_size], 921 memmove(&socket->send_buffer_[0], &socket->send_buffer_[data_size],
877 new_buffer_size); 922 new_buffer_size);
878 } 923 }
879 socket->send_buffer_.resize(new_buffer_size); 924 socket->send_buffer_.resize(new_buffer_size);
880 } 925 }
881 926
882 if (socket->write_enabled_ 927 if (!socket->ready_to_send_ &&
883 && (socket->send_buffer_.size() < send_buffer_capacity_)) { 928 (socket->send_buffer_.size() < send_buffer_capacity_)) {
884 socket->write_enabled_ = false; 929 socket->ready_to_send_ = true;
885 socket->SignalWriteEvent(socket); 930 socket->SignalWriteEvent(socket);
886 } 931 }
887 } 932 }
888 933
889 void VirtualSocketServer::AddPacketToNetwork(VirtualSocket* sender, 934 void VirtualSocketServer::AddPacketToNetwork(VirtualSocket* sender,
890 VirtualSocket* recipient, 935 VirtualSocket* recipient,
891 int64_t cur_time, 936 int64_t cur_time,
892 const char* data, 937 const char* data,
893 size_t data_size, 938 size_t data_size,
894 size_t header_size, 939 size_t header_size,
(...skipping 29 matching lines...) Expand all
924 ts = std::max(ts, network_delay_); 969 ts = std::max(ts, network_delay_);
925 } 970 }
926 msg_queue_->PostAt(RTC_FROM_HERE, ts, recipient, MSG_ID_PACKET, p); 971 msg_queue_->PostAt(RTC_FROM_HERE, ts, recipient, MSG_ID_PACKET, p);
927 network_delay_ = std::max(ts, network_delay_); 972 network_delay_ = std::max(ts, network_delay_);
928 } 973 }
929 974
930 void VirtualSocketServer::PurgeNetworkPackets(VirtualSocket* socket, 975 void VirtualSocketServer::PurgeNetworkPackets(VirtualSocket* socket,
931 int64_t cur_time) { 976 int64_t cur_time) {
932 while (!socket->network_.empty() && 977 while (!socket->network_.empty() &&
933 (socket->network_.front().done_time <= cur_time)) { 978 (socket->network_.front().done_time <= cur_time)) {
934 ASSERT(socket->network_size_ >= socket->network_.front().size); 979 RTC_DCHECK(socket->network_size_ >= socket->network_.front().size);
935 socket->network_size_ -= socket->network_.front().size; 980 socket->network_size_ -= socket->network_.front().size;
936 socket->network_.pop_front(); 981 socket->network_.pop_front();
937 } 982 }
938 } 983 }
939 984
940 uint32_t VirtualSocketServer::SendDelay(uint32_t size) { 985 uint32_t VirtualSocketServer::SendDelay(uint32_t size) {
941 if (bandwidth_ == 0) 986 if (bandwidth_ == 0)
942 return 0; 987 return 0;
943 else 988 else
944 return 1000 * size / bandwidth_; 989 return 1000 * size / bandwidth_;
(...skipping 84 matching lines...) Expand 10 before | Expand all | Expand 10 after
1029 } 1074 }
1030 bool operator()(double v1, const VirtualSocketServer::Point& p2) { 1075 bool operator()(double v1, const VirtualSocketServer::Point& p2) {
1031 return v1 < p2.first; 1076 return v1 < p2.first;
1032 } 1077 }
1033 bool operator()(const VirtualSocketServer::Point& p1, double v2) { 1078 bool operator()(const VirtualSocketServer::Point& p1, double v2) {
1034 return p1.first < v2; 1079 return p1.first < v2;
1035 } 1080 }
1036 }; 1081 };
1037 1082
1038 VirtualSocketServer::Function* VirtualSocketServer::Accumulate(Function* f) { 1083 VirtualSocketServer::Function* VirtualSocketServer::Accumulate(Function* f) {
1039 ASSERT(f->size() >= 1); 1084 RTC_DCHECK(f->size() >= 1);
1040 double v = 0; 1085 double v = 0;
1041 for (Function::size_type i = 0; i < f->size() - 1; ++i) { 1086 for (Function::size_type i = 0; i < f->size() - 1; ++i) {
1042 double dx = (*f)[i + 1].first - (*f)[i].first; 1087 double dx = (*f)[i + 1].first - (*f)[i].first;
1043 double avgy = ((*f)[i + 1].second + (*f)[i].second) / 2; 1088 double avgy = ((*f)[i + 1].second + (*f)[i].second) / 2;
1044 (*f)[i].second = v; 1089 (*f)[i].second = v;
1045 v = v + dx * avgy; 1090 v = v + dx * avgy;
1046 } 1091 }
1047 (*f)[f->size()-1].second = v; 1092 (*f)[f->size()-1].second = v;
1048 return f; 1093 return f;
1049 } 1094 }
(...skipping 21 matching lines...) Expand all
1071 delete f; 1116 delete f;
1072 return g; 1117 return g;
1073 } 1118 }
1074 1119
1075 double VirtualSocketServer::Evaluate(Function* f, double x) { 1120 double VirtualSocketServer::Evaluate(Function* f, double x) {
1076 Function::iterator iter = 1121 Function::iterator iter =
1077 std::lower_bound(f->begin(), f->end(), x, FunctionDomainCmp()); 1122 std::lower_bound(f->begin(), f->end(), x, FunctionDomainCmp());
1078 if (iter == f->begin()) { 1123 if (iter == f->begin()) {
1079 return (*f)[0].second; 1124 return (*f)[0].second;
1080 } else if (iter == f->end()) { 1125 } else if (iter == f->end()) {
1081 ASSERT(f->size() >= 1); 1126 RTC_DCHECK(f->size() >= 1);
1082 return (*f)[f->size() - 1].second; 1127 return (*f)[f->size() - 1].second;
1083 } else if (iter->first == x) { 1128 } else if (iter->first == x) {
1084 return iter->second; 1129 return iter->second;
1085 } else { 1130 } else {
1086 double x1 = (iter - 1)->first; 1131 double x1 = (iter - 1)->first;
1087 double y1 = (iter - 1)->second; 1132 double y1 = (iter - 1)->second;
1088 double x2 = iter->first; 1133 double x2 = iter->first;
1089 double y2 = iter->second; 1134 double y2 = iter->second;
1090 return y1 + (y2 - y1) * (x - x1) / (x2 - x1); 1135 return y1 + (y2 - y1) * (x - x1) / (x2 - x1);
1091 } 1136 }
(...skipping 52 matching lines...) Expand 10 before | Expand all | Expand 10 after
1144 void VirtualSocketServer::SetDefaultRoute(const IPAddress& from_addr) { 1189 void VirtualSocketServer::SetDefaultRoute(const IPAddress& from_addr) {
1145 RTC_DCHECK(!IPIsAny(from_addr)); 1190 RTC_DCHECK(!IPIsAny(from_addr));
1146 if (from_addr.family() == AF_INET) { 1191 if (from_addr.family() == AF_INET) {
1147 default_route_v4_ = from_addr; 1192 default_route_v4_ = from_addr;
1148 } else if (from_addr.family() == AF_INET6) { 1193 } else if (from_addr.family() == AF_INET6) {
1149 default_route_v6_ = from_addr; 1194 default_route_v6_ = from_addr;
1150 } 1195 }
1151 } 1196 }
1152 1197
1153 } // namespace rtc 1198 } // namespace rtc
OLDNEW
« no previous file with comments | « webrtc/base/virtualsocketserver.h ('k') | no next file » | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698