OLD | NEW |
| (Empty) |
1 /* | |
2 * Copyright 2004 The WebRTC Project Authors. All rights reserved. | |
3 * | |
4 * Use of this source code is governed by a BSD-style license | |
5 * that can be found in the LICENSE file in the root of the source | |
6 * tree. An additional intellectual property rights grant can be found | |
7 * in the file PATENTS. All contributing project authors may | |
8 * be found in the AUTHORS file in the root of the source tree. | |
9 */ | |
10 | |
11 #include "webrtc/base/asynctcpsocket.h" | |
12 | |
13 #include <string.h> | |
14 | |
15 #include <algorithm> | |
16 #include <memory> | |
17 | |
18 #include "webrtc/base/byteorder.h" | |
19 #include "webrtc/base/checks.h" | |
20 #include "webrtc/base/logging.h" | |
21 | |
22 #if defined(WEBRTC_POSIX) | |
23 #include <errno.h> | |
24 #endif // WEBRTC_POSIX | |
25 | |
26 namespace rtc { | |
27 | |
28 static const size_t kMaxPacketSize = 64 * 1024; | |
29 | |
30 typedef uint16_t PacketLength; | |
31 static const size_t kPacketLenSize = sizeof(PacketLength); | |
32 | |
33 static const size_t kBufSize = kMaxPacketSize + kPacketLenSize; | |
34 | |
35 // The input buffer will be resized so that at least kMinimumRecvSize bytes can | |
36 // be received (but it will not grow above the maximum size passed to the | |
37 // constructor). | |
38 static const size_t kMinimumRecvSize = 128; | |
39 | |
40 static const int kListenBacklog = 5; | |
41 | |
42 // Binds and connects |socket| | |
43 AsyncSocket* AsyncTCPSocketBase::ConnectSocket( | |
44 rtc::AsyncSocket* socket, | |
45 const rtc::SocketAddress& bind_address, | |
46 const rtc::SocketAddress& remote_address) { | |
47 std::unique_ptr<rtc::AsyncSocket> owned_socket(socket); | |
48 if (socket->Bind(bind_address) < 0) { | |
49 LOG(LS_ERROR) << "Bind() failed with error " << socket->GetError(); | |
50 return nullptr; | |
51 } | |
52 if (socket->Connect(remote_address) < 0) { | |
53 LOG(LS_ERROR) << "Connect() failed with error " << socket->GetError(); | |
54 return nullptr; | |
55 } | |
56 return owned_socket.release(); | |
57 } | |
58 | |
59 AsyncTCPSocketBase::AsyncTCPSocketBase(AsyncSocket* socket, bool listen, | |
60 size_t max_packet_size) | |
61 : socket_(socket), | |
62 listen_(listen), | |
63 max_insize_(max_packet_size), | |
64 max_outsize_(max_packet_size) { | |
65 if (!listen_) { | |
66 // Listening sockets don't send/receive data, so they don't need buffers. | |
67 inbuf_.EnsureCapacity(kMinimumRecvSize); | |
68 } | |
69 | |
70 RTC_DCHECK(socket_.get() != nullptr); | |
71 socket_->SignalConnectEvent.connect( | |
72 this, &AsyncTCPSocketBase::OnConnectEvent); | |
73 socket_->SignalReadEvent.connect(this, &AsyncTCPSocketBase::OnReadEvent); | |
74 socket_->SignalWriteEvent.connect(this, &AsyncTCPSocketBase::OnWriteEvent); | |
75 socket_->SignalCloseEvent.connect(this, &AsyncTCPSocketBase::OnCloseEvent); | |
76 | |
77 if (listen_) { | |
78 if (socket_->Listen(kListenBacklog) < 0) { | |
79 LOG(LS_ERROR) << "Listen() failed with error " << socket_->GetError(); | |
80 } | |
81 } | |
82 } | |
83 | |
84 AsyncTCPSocketBase::~AsyncTCPSocketBase() {} | |
85 | |
86 SocketAddress AsyncTCPSocketBase::GetLocalAddress() const { | |
87 return socket_->GetLocalAddress(); | |
88 } | |
89 | |
90 SocketAddress AsyncTCPSocketBase::GetRemoteAddress() const { | |
91 return socket_->GetRemoteAddress(); | |
92 } | |
93 | |
94 int AsyncTCPSocketBase::Close() { | |
95 return socket_->Close(); | |
96 } | |
97 | |
98 AsyncTCPSocket::State AsyncTCPSocketBase::GetState() const { | |
99 switch (socket_->GetState()) { | |
100 case Socket::CS_CLOSED: | |
101 return STATE_CLOSED; | |
102 case Socket::CS_CONNECTING: | |
103 if (listen_) { | |
104 return STATE_BOUND; | |
105 } else { | |
106 return STATE_CONNECTING; | |
107 } | |
108 case Socket::CS_CONNECTED: | |
109 return STATE_CONNECTED; | |
110 default: | |
111 RTC_NOTREACHED(); | |
112 return STATE_CLOSED; | |
113 } | |
114 } | |
115 | |
116 int AsyncTCPSocketBase::GetOption(Socket::Option opt, int* value) { | |
117 return socket_->GetOption(opt, value); | |
118 } | |
119 | |
120 int AsyncTCPSocketBase::SetOption(Socket::Option opt, int value) { | |
121 return socket_->SetOption(opt, value); | |
122 } | |
123 | |
124 int AsyncTCPSocketBase::GetError() const { | |
125 return socket_->GetError(); | |
126 } | |
127 | |
128 void AsyncTCPSocketBase::SetError(int error) { | |
129 return socket_->SetError(error); | |
130 } | |
131 | |
132 int AsyncTCPSocketBase::SendTo(const void *pv, size_t cb, | |
133 const SocketAddress& addr, | |
134 const rtc::PacketOptions& options) { | |
135 const SocketAddress& remote_address = GetRemoteAddress(); | |
136 if (addr == remote_address) | |
137 return Send(pv, cb, options); | |
138 // Remote address may be empty if there is a sudden network change. | |
139 RTC_DCHECK(remote_address.IsNil()); | |
140 socket_->SetError(ENOTCONN); | |
141 return -1; | |
142 } | |
143 | |
144 int AsyncTCPSocketBase::SendRaw(const void * pv, size_t cb) { | |
145 if (outbuf_.size() + cb > max_outsize_) { | |
146 socket_->SetError(EMSGSIZE); | |
147 return -1; | |
148 } | |
149 | |
150 RTC_DCHECK(!listen_); | |
151 outbuf_.AppendData(static_cast<const uint8_t*>(pv), cb); | |
152 | |
153 return FlushOutBuffer(); | |
154 } | |
155 | |
156 int AsyncTCPSocketBase::FlushOutBuffer() { | |
157 RTC_DCHECK(!listen_); | |
158 int res = socket_->Send(outbuf_.data(), outbuf_.size()); | |
159 if (res <= 0) { | |
160 return res; | |
161 } | |
162 if (static_cast<size_t>(res) > outbuf_.size()) { | |
163 RTC_NOTREACHED(); | |
164 return -1; | |
165 } | |
166 size_t new_size = outbuf_.size() - res; | |
167 if (new_size > 0) { | |
168 memmove(outbuf_.data(), outbuf_.data() + res, new_size); | |
169 } | |
170 outbuf_.SetSize(new_size); | |
171 return res; | |
172 } | |
173 | |
174 void AsyncTCPSocketBase::AppendToOutBuffer(const void* pv, size_t cb) { | |
175 RTC_DCHECK(outbuf_.size() + cb <= max_outsize_); | |
176 RTC_DCHECK(!listen_); | |
177 outbuf_.AppendData(static_cast<const uint8_t*>(pv), cb); | |
178 } | |
179 | |
180 void AsyncTCPSocketBase::OnConnectEvent(AsyncSocket* socket) { | |
181 SignalConnect(this); | |
182 } | |
183 | |
184 void AsyncTCPSocketBase::OnReadEvent(AsyncSocket* socket) { | |
185 RTC_DCHECK(socket_.get() == socket); | |
186 | |
187 if (listen_) { | |
188 rtc::SocketAddress address; | |
189 rtc::AsyncSocket* new_socket = socket->Accept(&address); | |
190 if (!new_socket) { | |
191 // TODO(stefan): Do something better like forwarding the error | |
192 // to the user. | |
193 LOG(LS_ERROR) << "TCP accept failed with error " << socket_->GetError(); | |
194 return; | |
195 } | |
196 | |
197 HandleIncomingConnection(new_socket); | |
198 | |
199 // Prime a read event in case data is waiting. | |
200 new_socket->SignalReadEvent(new_socket); | |
201 } else { | |
202 size_t total_recv = 0; | |
203 while (true) { | |
204 size_t free_size = inbuf_.capacity() - inbuf_.size(); | |
205 if (free_size < kMinimumRecvSize && inbuf_.capacity() < max_insize_) { | |
206 inbuf_.EnsureCapacity(std::min(max_insize_, inbuf_.capacity() * 2)); | |
207 free_size = inbuf_.capacity() - inbuf_.size(); | |
208 } | |
209 | |
210 int len = | |
211 socket_->Recv(inbuf_.data() + inbuf_.size(), free_size, nullptr); | |
212 if (len < 0) { | |
213 // TODO(stefan): Do something better like forwarding the error to the | |
214 // user. | |
215 if (!socket_->IsBlocking()) { | |
216 LOG(LS_ERROR) << "Recv() returned error: " << socket_->GetError(); | |
217 } | |
218 break; | |
219 } | |
220 | |
221 total_recv += len; | |
222 inbuf_.SetSize(inbuf_.size() + len); | |
223 if (!len || static_cast<size_t>(len) < free_size) { | |
224 break; | |
225 } | |
226 } | |
227 | |
228 if (!total_recv) { | |
229 return; | |
230 } | |
231 | |
232 size_t size = inbuf_.size(); | |
233 ProcessInput(inbuf_.data<char>(), &size); | |
234 | |
235 if (size > inbuf_.size()) { | |
236 LOG(LS_ERROR) << "input buffer overflow"; | |
237 RTC_NOTREACHED(); | |
238 inbuf_.Clear(); | |
239 } else { | |
240 inbuf_.SetSize(size); | |
241 } | |
242 } | |
243 } | |
244 | |
245 void AsyncTCPSocketBase::OnWriteEvent(AsyncSocket* socket) { | |
246 RTC_DCHECK(socket_.get() == socket); | |
247 | |
248 if (outbuf_.size() > 0) { | |
249 FlushOutBuffer(); | |
250 } | |
251 | |
252 if (outbuf_.size() == 0) { | |
253 SignalReadyToSend(this); | |
254 } | |
255 } | |
256 | |
257 void AsyncTCPSocketBase::OnCloseEvent(AsyncSocket* socket, int error) { | |
258 SignalClose(this, error); | |
259 } | |
260 | |
261 // AsyncTCPSocket | |
262 // Binds and connects |socket| and creates AsyncTCPSocket for | |
263 // it. Takes ownership of |socket|. Returns null if bind() or | |
264 // connect() fail (|socket| is destroyed in that case). | |
265 AsyncTCPSocket* AsyncTCPSocket::Create( | |
266 AsyncSocket* socket, | |
267 const SocketAddress& bind_address, | |
268 const SocketAddress& remote_address) { | |
269 return new AsyncTCPSocket(AsyncTCPSocketBase::ConnectSocket( | |
270 socket, bind_address, remote_address), false); | |
271 } | |
272 | |
273 AsyncTCPSocket::AsyncTCPSocket(AsyncSocket* socket, bool listen) | |
274 : AsyncTCPSocketBase(socket, listen, kBufSize) { | |
275 } | |
276 | |
277 int AsyncTCPSocket::Send(const void *pv, size_t cb, | |
278 const rtc::PacketOptions& options) { | |
279 if (cb > kBufSize) { | |
280 SetError(EMSGSIZE); | |
281 return -1; | |
282 } | |
283 | |
284 // If we are blocking on send, then silently drop this packet | |
285 if (!IsOutBufferEmpty()) | |
286 return static_cast<int>(cb); | |
287 | |
288 PacketLength pkt_len = HostToNetwork16(static_cast<PacketLength>(cb)); | |
289 AppendToOutBuffer(&pkt_len, kPacketLenSize); | |
290 AppendToOutBuffer(pv, cb); | |
291 | |
292 int res = FlushOutBuffer(); | |
293 if (res <= 0) { | |
294 // drop packet if we made no progress | |
295 ClearOutBuffer(); | |
296 return res; | |
297 } | |
298 | |
299 rtc::SentPacket sent_packet(options.packet_id, rtc::TimeMillis()); | |
300 SignalSentPacket(this, sent_packet); | |
301 | |
302 // We claim to have sent the whole thing, even if we only sent partial | |
303 return static_cast<int>(cb); | |
304 } | |
305 | |
306 void AsyncTCPSocket::ProcessInput(char * data, size_t* len) { | |
307 SocketAddress remote_addr(GetRemoteAddress()); | |
308 | |
309 while (true) { | |
310 if (*len < kPacketLenSize) | |
311 return; | |
312 | |
313 PacketLength pkt_len = rtc::GetBE16(data); | |
314 if (*len < kPacketLenSize + pkt_len) | |
315 return; | |
316 | |
317 SignalReadPacket(this, data + kPacketLenSize, pkt_len, remote_addr, | |
318 CreatePacketTime(0)); | |
319 | |
320 *len -= kPacketLenSize + pkt_len; | |
321 if (*len > 0) { | |
322 memmove(data, data + kPacketLenSize + pkt_len, *len); | |
323 } | |
324 } | |
325 } | |
326 | |
327 void AsyncTCPSocket::HandleIncomingConnection(AsyncSocket* socket) { | |
328 SignalNewConnection(this, new AsyncTCPSocket(socket, false)); | |
329 } | |
330 | |
331 } // namespace rtc | |
OLD | NEW |