OLD | NEW |
1 /* | 1 /* |
2 * Copyright 2004 The WebRTC Project Authors. All rights reserved. | 2 * Copyright 2004 The WebRTC Project Authors. All rights reserved. |
3 * | 3 * |
4 * Use of this source code is governed by a BSD-style license | 4 * Use of this source code is governed by a BSD-style license |
5 * that can be found in the LICENSE file in the root of the source | 5 * that can be found in the LICENSE file in the root of the source |
6 * tree. An additional intellectual property rights grant can be found | 6 * tree. An additional intellectual property rights grant can be found |
7 * in the file PATENTS. All contributing project authors may | 7 * in the file PATENTS. All contributing project authors may |
8 * be found in the AUTHORS file in the root of the source tree. | 8 * be found in the AUTHORS file in the root of the source tree. |
9 */ | 9 */ |
10 | 10 |
11 #include "webrtc/base/asynctcpsocket.h" | 11 #include "webrtc/base/asynctcpsocket.h" |
12 | 12 |
13 #include <string.h> | 13 #include <string.h> |
14 | 14 |
| 15 #include <algorithm> |
| 16 |
15 #include "webrtc/base/byteorder.h" | 17 #include "webrtc/base/byteorder.h" |
16 #include "webrtc/base/checks.h" | 18 #include "webrtc/base/checks.h" |
17 #include "webrtc/base/common.h" | 19 #include "webrtc/base/common.h" |
18 #include "webrtc/base/logging.h" | 20 #include "webrtc/base/logging.h" |
19 | 21 |
20 #if defined(WEBRTC_POSIX) | 22 #if defined(WEBRTC_POSIX) |
21 #include <errno.h> | 23 #include <errno.h> |
22 #endif // WEBRTC_POSIX | 24 #endif // WEBRTC_POSIX |
23 | 25 |
24 namespace rtc { | 26 namespace rtc { |
25 | 27 |
26 static const size_t kMaxPacketSize = 64 * 1024; | 28 static const size_t kMaxPacketSize = 64 * 1024; |
27 | 29 |
28 typedef uint16_t PacketLength; | 30 typedef uint16_t PacketLength; |
29 static const size_t kPacketLenSize = sizeof(PacketLength); | 31 static const size_t kPacketLenSize = sizeof(PacketLength); |
30 | 32 |
31 static const size_t kBufSize = kMaxPacketSize + kPacketLenSize; | 33 static const size_t kBufSize = kMaxPacketSize + kPacketLenSize; |
32 | 34 |
| 35 // The input buffer will be resized so that at least kMinimumRecvSize bytes can |
| 36 // be received (but it will not grow above the maximum size passed to the |
| 37 // constructor). |
| 38 static const size_t kMinimumRecvSize = 128; |
| 39 |
33 static const int kListenBacklog = 5; | 40 static const int kListenBacklog = 5; |
34 | 41 |
35 // Binds and connects |socket| | 42 // Binds and connects |socket| |
36 AsyncSocket* AsyncTCPSocketBase::ConnectSocket( | 43 AsyncSocket* AsyncTCPSocketBase::ConnectSocket( |
37 rtc::AsyncSocket* socket, | 44 rtc::AsyncSocket* socket, |
38 const rtc::SocketAddress& bind_address, | 45 const rtc::SocketAddress& bind_address, |
39 const rtc::SocketAddress& remote_address) { | 46 const rtc::SocketAddress& remote_address) { |
40 rtc::scoped_ptr<rtc::AsyncSocket> owned_socket(socket); | 47 rtc::scoped_ptr<rtc::AsyncSocket> owned_socket(socket); |
41 if (socket->Bind(bind_address) < 0) { | 48 if (socket->Bind(bind_address) < 0) { |
42 LOG(LS_ERROR) << "Bind() failed with error " << socket->GetError(); | 49 LOG(LS_ERROR) << "Bind() failed with error " << socket->GetError(); |
43 return NULL; | 50 return NULL; |
44 } | 51 } |
45 if (socket->Connect(remote_address) < 0) { | 52 if (socket->Connect(remote_address) < 0) { |
46 LOG(LS_ERROR) << "Connect() failed with error " << socket->GetError(); | 53 LOG(LS_ERROR) << "Connect() failed with error " << socket->GetError(); |
47 return NULL; | 54 return NULL; |
48 } | 55 } |
49 return owned_socket.release(); | 56 return owned_socket.release(); |
50 } | 57 } |
51 | 58 |
52 AsyncTCPSocketBase::AsyncTCPSocketBase(AsyncSocket* socket, bool listen, | 59 AsyncTCPSocketBase::AsyncTCPSocketBase(AsyncSocket* socket, bool listen, |
53 size_t max_packet_size) | 60 size_t max_packet_size) |
54 : socket_(socket), | 61 : socket_(socket), |
55 listen_(listen), | 62 listen_(listen), |
56 insize_(max_packet_size), | 63 max_insize_(max_packet_size), |
57 inpos_(0), | |
58 max_outsize_(max_packet_size) { | 64 max_outsize_(max_packet_size) { |
59 if (!listen_) { | 65 if (!listen_) { |
60 // Listening sockets don't send/receive data, so they don't need buffers. | 66 // Listening sockets don't send/receive data, so they don't need buffers. |
61 inbuf_.reset(new char[insize_]); | 67 inbuf_.EnsureCapacity(kMinimumRecvSize); |
62 } | 68 } |
63 | 69 |
64 RTC_DCHECK(socket_.get() != NULL); | 70 RTC_DCHECK(socket_.get() != NULL); |
65 socket_->SignalConnectEvent.connect( | 71 socket_->SignalConnectEvent.connect( |
66 this, &AsyncTCPSocketBase::OnConnectEvent); | 72 this, &AsyncTCPSocketBase::OnConnectEvent); |
67 socket_->SignalReadEvent.connect(this, &AsyncTCPSocketBase::OnReadEvent); | 73 socket_->SignalReadEvent.connect(this, &AsyncTCPSocketBase::OnReadEvent); |
68 socket_->SignalWriteEvent.connect(this, &AsyncTCPSocketBase::OnWriteEvent); | 74 socket_->SignalWriteEvent.connect(this, &AsyncTCPSocketBase::OnWriteEvent); |
69 socket_->SignalCloseEvent.connect(this, &AsyncTCPSocketBase::OnCloseEvent); | 75 socket_->SignalCloseEvent.connect(this, &AsyncTCPSocketBase::OnCloseEvent); |
70 | 76 |
71 if (listen_) { | 77 if (listen_) { |
(...skipping 103 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
175 SignalConnect(this); | 181 SignalConnect(this); |
176 } | 182 } |
177 | 183 |
178 void AsyncTCPSocketBase::OnReadEvent(AsyncSocket* socket) { | 184 void AsyncTCPSocketBase::OnReadEvent(AsyncSocket* socket) { |
179 RTC_DCHECK(socket_.get() == socket); | 185 RTC_DCHECK(socket_.get() == socket); |
180 | 186 |
181 if (listen_) { | 187 if (listen_) { |
182 rtc::SocketAddress address; | 188 rtc::SocketAddress address; |
183 rtc::AsyncSocket* new_socket = socket->Accept(&address); | 189 rtc::AsyncSocket* new_socket = socket->Accept(&address); |
184 if (!new_socket) { | 190 if (!new_socket) { |
185 // TODO: Do something better like forwarding the error | 191 // TODO(stefan): Do something better like forwarding the error |
186 // to the user. | 192 // to the user. |
187 LOG(LS_ERROR) << "TCP accept failed with error " << socket_->GetError(); | 193 LOG(LS_ERROR) << "TCP accept failed with error " << socket_->GetError(); |
188 return; | 194 return; |
189 } | 195 } |
190 | 196 |
191 HandleIncomingConnection(new_socket); | 197 HandleIncomingConnection(new_socket); |
192 | 198 |
193 // Prime a read event in case data is waiting. | 199 // Prime a read event in case data is waiting. |
194 new_socket->SignalReadEvent(new_socket); | 200 new_socket->SignalReadEvent(new_socket); |
195 } else { | 201 } else { |
196 RTC_DCHECK(inbuf_.get()); | 202 size_t total_recv = 0; |
197 int len = socket_->Recv(inbuf_.get() + inpos_, insize_ - inpos_); | 203 while (true) { |
198 if (len < 0) { | 204 size_t free_size = inbuf_.capacity() - inbuf_.size(); |
199 // TODO: Do something better like forwarding the error to the user. | 205 if (free_size < kMinimumRecvSize && inbuf_.capacity() < max_insize_) { |
200 if (!socket_->IsBlocking()) { | 206 inbuf_.EnsureCapacity(std::min(max_insize_, inbuf_.capacity() * 2)); |
201 LOG(LS_ERROR) << "Recv() returned error: " << socket_->GetError(); | 207 free_size = inbuf_.capacity() - inbuf_.size(); |
202 } | 208 } |
| 209 |
| 210 int len = socket_->Recv(inbuf_.data() + inbuf_.size(), free_size); |
| 211 if (len < 0) { |
| 212 // TODO(stefan): Do something better like forwarding the error to the |
| 213 // user. |
| 214 if (!socket_->IsBlocking()) { |
| 215 LOG(LS_ERROR) << "Recv() returned error: " << socket_->GetError(); |
| 216 } |
| 217 break; |
| 218 } |
| 219 |
| 220 total_recv += len; |
| 221 inbuf_.SetSize(inbuf_.size() + len); |
| 222 if (!len || static_cast<size_t>(len) < free_size) { |
| 223 break; |
| 224 } |
| 225 } |
| 226 |
| 227 if (!total_recv) { |
203 return; | 228 return; |
204 } | 229 } |
205 | 230 |
206 inpos_ += len; | 231 size_t size = inbuf_.size(); |
| 232 ProcessInput(inbuf_.data<char>(), &size); |
207 | 233 |
208 ProcessInput(inbuf_.get(), &inpos_); | 234 if (size > inbuf_.size()) { |
209 | |
210 if (inpos_ >= insize_) { | |
211 LOG(LS_ERROR) << "input buffer overflow"; | 235 LOG(LS_ERROR) << "input buffer overflow"; |
212 RTC_NOTREACHED(); | 236 RTC_NOTREACHED(); |
213 inpos_ = 0; | 237 inbuf_.Clear(); |
| 238 } else { |
| 239 inbuf_.SetSize(size); |
214 } | 240 } |
215 } | 241 } |
216 } | 242 } |
217 | 243 |
218 void AsyncTCPSocketBase::OnWriteEvent(AsyncSocket* socket) { | 244 void AsyncTCPSocketBase::OnWriteEvent(AsyncSocket* socket) { |
219 RTC_DCHECK(socket_.get() == socket); | 245 RTC_DCHECK(socket_.get() == socket); |
220 | 246 |
221 if (outbuf_.size() > 0) { | 247 if (outbuf_.size() > 0) { |
222 FlushOutBuffer(); | 248 FlushOutBuffer(); |
223 } | 249 } |
(...skipping 71 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
295 memmove(data, data + kPacketLenSize + pkt_len, *len); | 321 memmove(data, data + kPacketLenSize + pkt_len, *len); |
296 } | 322 } |
297 } | 323 } |
298 } | 324 } |
299 | 325 |
300 void AsyncTCPSocket::HandleIncomingConnection(AsyncSocket* socket) { | 326 void AsyncTCPSocket::HandleIncomingConnection(AsyncSocket* socket) { |
301 SignalNewConnection(this, new AsyncTCPSocket(socket, false)); | 327 SignalNewConnection(this, new AsyncTCPSocket(socket, false)); |
302 } | 328 } |
303 | 329 |
304 } // namespace rtc | 330 } // namespace rtc |
OLD | NEW |