Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(56)

Unified Diff: webrtc/base/virtualsocketserver.cc

Issue 2284903002: Adding ability to simulate EWOULDBLOCK/SignalReadyToSend. (Closed)
Patch Set: Removing commented-out assert Created 4 years, 4 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « webrtc/base/virtualsocketserver.h ('k') | no next file » | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: webrtc/base/virtualsocketserver.cc
diff --git a/webrtc/base/virtualsocketserver.cc b/webrtc/base/virtualsocketserver.cc
index ef3c6e53295e87c690771d9873bb40cf7cd7eb7a..87775cd63cc6afcd4e2dece01ec32bfde2780adf 100644
--- a/webrtc/base/virtualsocketserver.cc
+++ b/webrtc/base/virtualsocketserver.cc
@@ -65,7 +65,7 @@ class Packet : public MessageData {
public:
Packet(const char* data, size_t size, const SocketAddress& from)
: size_(size), consumed_(0), from_(from) {
- ASSERT(NULL != data);
+ RTC_DCHECK(NULL != data);
data_ = new char[size_];
memcpy(data_, data, size_);
}
@@ -80,7 +80,7 @@ class Packet : public MessageData {
// Remove the first size bytes from the data.
void Consume(size_t size) {
- ASSERT(size + consumed_ < size_);
+ RTC_DCHECK(size + consumed_ < size_);
consumed_ += size;
}
@@ -105,13 +105,15 @@ VirtualSocket::VirtualSocket(VirtualSocketServer* server,
state_(CS_CLOSED),
error_(0),
listen_queue_(NULL),
- write_enabled_(false),
network_size_(0),
recv_buffer_size_(0),
bound_(false),
was_any_(false) {
- ASSERT((type_ == SOCK_DGRAM) || (type_ == SOCK_STREAM));
- ASSERT(async_ || (type_ != SOCK_STREAM)); // We only support async streams
+ RTC_DCHECK((type_ == SOCK_DGRAM) || (type_ == SOCK_STREAM));
+ RTC_DCHECK(async_ ||
+ (type_ != SOCK_STREAM)); // We only support async streams
+ server->SignalReadyToSend.connect(this,
+ &VirtualSocket::OnSocketServerReadyToSend);
}
VirtualSocket::~VirtualSocket() {
@@ -209,7 +211,7 @@ int VirtualSocket::Close() {
server_->msg_queue_->Clear(this, MSG_ID_CONNECT, &msgs);
}
for (MessageList::iterator it = msgs.begin(); it != msgs.end(); ++it) {
- ASSERT(NULL != it->pdata);
+ RTC_DCHECK(NULL != it->pdata);
MessageAddress* data = static_cast<MessageAddress*>(it->pdata);
// Lookup remote side.
@@ -307,7 +309,7 @@ int VirtualSocket::RecvFrom(void* pv,
recv_buffer_size_ -= data_read;
if (was_full) {
VirtualSocket* sender = server_->LookupBinding(remote_addr_);
- ASSERT(NULL != sender);
+ RTC_DCHECK(NULL != sender);
server_->SendTcp(sender);
}
}
@@ -316,13 +318,13 @@ int VirtualSocket::RecvFrom(void* pv,
}
int VirtualSocket::Listen(int backlog) {
- ASSERT(SOCK_STREAM == type_);
- ASSERT(CS_CLOSED == state_);
+ RTC_DCHECK(SOCK_STREAM == type_);
+ RTC_DCHECK(CS_CLOSED == state_);
if (local_addr_.IsNil()) {
error_ = EINVAL;
return -1;
}
- ASSERT(NULL == listen_queue_);
+ RTC_DCHECK(NULL == listen_queue_);
listen_queue_ = new ListenQueue;
state_ = CS_CONNECTING;
return 0;
@@ -392,8 +394,7 @@ int VirtualSocket::EstimateMTU(uint16_t* mtu) {
void VirtualSocket::OnMessage(Message* pmsg) {
if (pmsg->message_id == MSG_ID_PACKET) {
- // ASSERT(!local_addr_.IsAnyIP());
- ASSERT(NULL != pmsg->pdata);
+ RTC_DCHECK(NULL != pmsg->pdata);
Packet* packet = static_cast<Packet*>(pmsg->pdata);
recv_buffer_.push_back(packet);
@@ -402,7 +403,7 @@ void VirtualSocket::OnMessage(Message* pmsg) {
SignalReadEvent(this);
}
} else if (pmsg->message_id == MSG_ID_CONNECT) {
- ASSERT(NULL != pmsg->pdata);
+ RTC_DCHECK(NULL != pmsg->pdata);
MessageAddress* data = static_cast<MessageAddress*>(pmsg->pdata);
if (listen_queue_ != NULL) {
listen_queue_->push_back(data->addr);
@@ -417,7 +418,7 @@ void VirtualSocket::OnMessage(Message* pmsg) {
}
delete data;
} else if (pmsg->message_id == MSG_ID_DISCONNECT) {
- ASSERT(SOCK_STREAM == type_);
+ RTC_DCHECK(SOCK_STREAM == type_);
if (CS_CLOSED != state_) {
int error = (CS_CONNECTING == state_) ? ECONNREFUSED : 0;
state_ = CS_CLOSED;
@@ -429,7 +430,7 @@ void VirtualSocket::OnMessage(Message* pmsg) {
} else if (pmsg->message_id == MSG_ID_ADDRESS_BOUND) {
SignalAddressReady(this, GetLocalAddress());
} else {
- ASSERT(false);
+ RTC_DCHECK(false);
}
}
@@ -465,7 +466,7 @@ int VirtualSocket::InitiateConnect(const SocketAddress& addr, bool use_delay) {
}
void VirtualSocket::CompleteConnect(const SocketAddress& addr, bool notify) {
- ASSERT(CS_CONNECTING == state_);
+ RTC_DCHECK(CS_CONNECTING == state_);
remote_addr_ = addr;
state_ = CS_CONNECTED;
server_->AddConnection(remote_addr_, local_addr_, this);
@@ -495,7 +496,7 @@ int VirtualSocket::SendUdp(const void* pv,
int VirtualSocket::SendTcp(const void* pv, size_t cb) {
size_t capacity = server_->send_buffer_capacity_ - send_buffer_.size();
if (0 == capacity) {
- write_enabled_ = true;
+ ready_to_send_ = false;
error_ = EWOULDBLOCK;
return -1;
}
@@ -506,6 +507,22 @@ int VirtualSocket::SendTcp(const void* pv, size_t cb) {
return static_cast<int>(consumed);
}
+void VirtualSocket::OnSocketServerReadyToSend() {
+ if (ready_to_send_) {
+ // This socket didn't encounter EWOULDBLOCK, so there's nothing to do.
+ return;
+ }
+ if (type_ == SOCK_DGRAM) {
+ ready_to_send_ = true;
+ SignalWriteEvent(this);
+ } else {
+ RTC_DCHECK(type_ == SOCK_STREAM);
+ // This will attempt to empty the full send buffer, and will fire
+ // SignalWriteEvent if successful.
+ server_->SendTcp(this);
+ }
+}
+
VirtualSocketServer::VirtualSocketServer(SocketServer* ss)
: server_(ss),
server_owned_(false),
@@ -567,6 +584,19 @@ uint16_t VirtualSocketServer::GetNextPort() {
return port;
}
+void VirtualSocketServer::SetSendingBlocked(bool blocked) {
+ if (blocked == sending_blocked_) {
+ // Unchanged; nothing to do.
+ return;
+ }
+ sending_blocked_ = blocked;
+ if (!sending_blocked_) {
+ // Sending was blocked, but is now unblocked. This signal gives sockets a
+ // chance to fire SignalWriteEvent, and for TCP, send buffered data.
+ SignalReadyToSend();
+ }
+}
+
Socket* VirtualSocketServer::CreateSocket(int type) {
return CreateSocket(AF_INET, type);
}
@@ -598,7 +628,7 @@ void VirtualSocketServer::SetMessageQueue(MessageQueue* msg_queue) {
}
bool VirtualSocketServer::Wait(int cmsWait, bool process_io) {
- ASSERT(msg_queue_ == Thread::Current());
+ RTC_DCHECK(msg_queue_ == Thread::Current());
if (stop_on_idle_ && Thread::Current()->empty()) {
return false;
}
@@ -610,7 +640,7 @@ void VirtualSocketServer::WakeUp() {
}
bool VirtualSocketServer::ProcessMessagesUntilIdle() {
- ASSERT(msg_queue_ == Thread::Current());
+ RTC_DCHECK(msg_queue_ == Thread::Current());
stop_on_idle_ = true;
while (!msg_queue_->empty()) {
Message msg;
@@ -644,10 +674,10 @@ bool VirtualSocketServer::CloseTcpConnections(
int VirtualSocketServer::Bind(VirtualSocket* socket,
const SocketAddress& addr) {
- ASSERT(NULL != socket);
+ RTC_DCHECK(NULL != socket);
// Address must be completely specified at this point
- ASSERT(!IPIsUnspec(addr.ipaddr()));
- ASSERT(addr.port() != 0);
+ RTC_DCHECK(!IPIsUnspec(addr.ipaddr()));
+ RTC_DCHECK(addr.port() != 0);
// Normalize the address (turns v6-mapped addresses into v4-addresses).
SocketAddress normalized(addr.ipaddr().Normalized(), addr.port());
@@ -657,12 +687,12 @@ int VirtualSocketServer::Bind(VirtualSocket* socket,
}
int VirtualSocketServer::Bind(VirtualSocket* socket, SocketAddress* addr) {
- ASSERT(NULL != socket);
+ RTC_DCHECK(NULL != socket);
if (!IPIsUnspec(addr->ipaddr())) {
addr->SetIP(addr->ipaddr().Normalized());
} else {
- ASSERT(false);
+ RTC_DCHECK(false);
}
if (addr->port() == 0) {
@@ -703,7 +733,7 @@ int VirtualSocketServer::Unbind(const SocketAddress& addr,
VirtualSocket* socket) {
SocketAddress normalized(addr.ipaddr().Normalized(),
addr.port());
- ASSERT((*bindings_)[normalized] == socket);
+ RTC_DCHECK((*bindings_)[normalized] == socket);
bindings_->erase(bindings_->find(normalized));
return 0;
}
@@ -784,6 +814,13 @@ bool VirtualSocketServer::Disconnect(VirtualSocket* socket) {
int VirtualSocketServer::SendUdp(VirtualSocket* socket,
const char* data, size_t data_size,
const SocketAddress& remote_addr) {
+ if (sending_blocked_) {
+ CritScope cs(&socket->crit_);
+ socket->ready_to_send_ = false;
+ socket->error_ = EWOULDBLOCK;
+ return -1;
+ }
+
// See if we want to drop this packet.
if (Random() < drop_prob_) {
LOG(LS_VERBOSE) << "Dropping packet: bad luck";
@@ -811,32 +848,40 @@ int VirtualSocketServer::SendUdp(VirtualSocket* socket,
return -1;
}
- CritScope cs(&socket->crit_);
+ {
+ CritScope cs(&socket->crit_);
+
+ int64_t cur_time = TimeMillis();
+ PurgeNetworkPackets(socket, cur_time);
+
+ // Determine whether we have enough bandwidth to accept this packet. To do
+ // this, we need to update the send queue. Once we know it's current size,
+ // we know whether we can fit this packet.
+ //
+ // NOTE: There are better algorithms for maintaining such a queue (such as
+ // "Derivative Random Drop"); however, this algorithm is a more accurate
+ // simulation of what a normal network would do.
+
+ size_t packet_size = data_size + UDP_HEADER_SIZE;
+ if (socket->network_size_ + packet_size > network_capacity_) {
+ LOG(LS_VERBOSE) << "Dropping packet: network capacity exceeded";
+ return static_cast<int>(data_size);
+ }
- int64_t cur_time = TimeMillis();
- PurgeNetworkPackets(socket, cur_time);
+ AddPacketToNetwork(socket, recipient, cur_time, data, data_size,
+ UDP_HEADER_SIZE, false);
- // Determine whether we have enough bandwidth to accept this packet. To do
- // this, we need to update the send queue. Once we know it's current size,
- // we know whether we can fit this packet.
- //
- // NOTE: There are better algorithms for maintaining such a queue (such as
- // "Derivative Random Drop"); however, this algorithm is a more accurate
- // simulation of what a normal network would do.
-
- size_t packet_size = data_size + UDP_HEADER_SIZE;
- if (socket->network_size_ + packet_size > network_capacity_) {
- LOG(LS_VERBOSE) << "Dropping packet: network capacity exceeded";
return static_cast<int>(data_size);
}
-
- AddPacketToNetwork(socket, recipient, cur_time, data, data_size,
- UDP_HEADER_SIZE, false);
-
- return static_cast<int>(data_size);
}
void VirtualSocketServer::SendTcp(VirtualSocket* socket) {
+ if (sending_blocked_) {
+ // Eventually the socket's buffer will fill and VirtualSocket::SendTcp will
+ // set EWOULDBLOCK.
+ return;
+ }
+
// TCP can't send more data than will fill up the receiver's buffer.
// We track the data that is in the buffer plus data in flight using the
// recipient's recv_buffer_size_. Anything beyond that must be stored in the
@@ -879,9 +924,9 @@ void VirtualSocketServer::SendTcp(VirtualSocket* socket) {
socket->send_buffer_.resize(new_buffer_size);
}
- if (socket->write_enabled_
- && (socket->send_buffer_.size() < send_buffer_capacity_)) {
- socket->write_enabled_ = false;
+ if (!socket->ready_to_send_ &&
+ (socket->send_buffer_.size() < send_buffer_capacity_)) {
+ socket->ready_to_send_ = true;
socket->SignalWriteEvent(socket);
}
}
@@ -931,7 +976,7 @@ void VirtualSocketServer::PurgeNetworkPackets(VirtualSocket* socket,
int64_t cur_time) {
while (!socket->network_.empty() &&
(socket->network_.front().done_time <= cur_time)) {
- ASSERT(socket->network_size_ >= socket->network_.front().size);
+ RTC_DCHECK(socket->network_size_ >= socket->network_.front().size);
socket->network_size_ -= socket->network_.front().size;
socket->network_.pop_front();
}
@@ -1036,7 +1081,7 @@ struct FunctionDomainCmp {
};
VirtualSocketServer::Function* VirtualSocketServer::Accumulate(Function* f) {
- ASSERT(f->size() >= 1);
+ RTC_DCHECK(f->size() >= 1);
double v = 0;
for (Function::size_type i = 0; i < f->size() - 1; ++i) {
double dx = (*f)[i + 1].first - (*f)[i].first;
@@ -1078,7 +1123,7 @@ double VirtualSocketServer::Evaluate(Function* f, double x) {
if (iter == f->begin()) {
return (*f)[0].second;
} else if (iter == f->end()) {
- ASSERT(f->size() >= 1);
+ RTC_DCHECK(f->size() >= 1);
return (*f)[f->size() - 1].second;
} else if (iter->first == x) {
return iter->second;
« no previous file with comments | « webrtc/base/virtualsocketserver.h ('k') | no next file » | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698