| OLD | NEW |
| (Empty) |
| 1 /* | |
| 2 * Copyright 2004 The WebRTC Project Authors. All rights reserved. | |
| 3 * | |
| 4 * Use of this source code is governed by a BSD-style license | |
| 5 * that can be found in the LICENSE file in the root of the source | |
| 6 * tree. An additional intellectual property rights grant can be found | |
| 7 * in the file PATENTS. All contributing project authors may | |
| 8 * be found in the AUTHORS file in the root of the source tree. | |
| 9 */ | |
| 10 | |
| 11 #include "webrtc/base/asynctcpsocket.h" | |
| 12 | |
| 13 #include <string.h> | |
| 14 | |
| 15 #include <algorithm> | |
| 16 #include <memory> | |
| 17 | |
| 18 #include "webrtc/base/byteorder.h" | |
| 19 #include "webrtc/base/checks.h" | |
| 20 #include "webrtc/base/logging.h" | |
| 21 | |
| 22 #if defined(WEBRTC_POSIX) | |
| 23 #include <errno.h> | |
| 24 #endif // WEBRTC_POSIX | |
| 25 | |
| 26 namespace rtc { | |
| 27 | |
| 28 static const size_t kMaxPacketSize = 64 * 1024; | |
| 29 | |
| 30 typedef uint16_t PacketLength; | |
| 31 static const size_t kPacketLenSize = sizeof(PacketLength); | |
| 32 | |
| 33 static const size_t kBufSize = kMaxPacketSize + kPacketLenSize; | |
| 34 | |
| 35 // The input buffer will be resized so that at least kMinimumRecvSize bytes can | |
| 36 // be received (but it will not grow above the maximum size passed to the | |
| 37 // constructor). | |
| 38 static const size_t kMinimumRecvSize = 128; | |
| 39 | |
| 40 static const int kListenBacklog = 5; | |
| 41 | |
| 42 // Binds and connects |socket| | |
| 43 AsyncSocket* AsyncTCPSocketBase::ConnectSocket( | |
| 44 rtc::AsyncSocket* socket, | |
| 45 const rtc::SocketAddress& bind_address, | |
| 46 const rtc::SocketAddress& remote_address) { | |
| 47 std::unique_ptr<rtc::AsyncSocket> owned_socket(socket); | |
| 48 if (socket->Bind(bind_address) < 0) { | |
| 49 LOG(LS_ERROR) << "Bind() failed with error " << socket->GetError(); | |
| 50 return nullptr; | |
| 51 } | |
| 52 if (socket->Connect(remote_address) < 0) { | |
| 53 LOG(LS_ERROR) << "Connect() failed with error " << socket->GetError(); | |
| 54 return nullptr; | |
| 55 } | |
| 56 return owned_socket.release(); | |
| 57 } | |
| 58 | |
| 59 AsyncTCPSocketBase::AsyncTCPSocketBase(AsyncSocket* socket, bool listen, | |
| 60 size_t max_packet_size) | |
| 61 : socket_(socket), | |
| 62 listen_(listen), | |
| 63 max_insize_(max_packet_size), | |
| 64 max_outsize_(max_packet_size) { | |
| 65 if (!listen_) { | |
| 66 // Listening sockets don't send/receive data, so they don't need buffers. | |
| 67 inbuf_.EnsureCapacity(kMinimumRecvSize); | |
| 68 } | |
| 69 | |
| 70 RTC_DCHECK(socket_.get() != nullptr); | |
| 71 socket_->SignalConnectEvent.connect( | |
| 72 this, &AsyncTCPSocketBase::OnConnectEvent); | |
| 73 socket_->SignalReadEvent.connect(this, &AsyncTCPSocketBase::OnReadEvent); | |
| 74 socket_->SignalWriteEvent.connect(this, &AsyncTCPSocketBase::OnWriteEvent); | |
| 75 socket_->SignalCloseEvent.connect(this, &AsyncTCPSocketBase::OnCloseEvent); | |
| 76 | |
| 77 if (listen_) { | |
| 78 if (socket_->Listen(kListenBacklog) < 0) { | |
| 79 LOG(LS_ERROR) << "Listen() failed with error " << socket_->GetError(); | |
| 80 } | |
| 81 } | |
| 82 } | |
| 83 | |
| 84 AsyncTCPSocketBase::~AsyncTCPSocketBase() {} | |
| 85 | |
| 86 SocketAddress AsyncTCPSocketBase::GetLocalAddress() const { | |
| 87 return socket_->GetLocalAddress(); | |
| 88 } | |
| 89 | |
| 90 SocketAddress AsyncTCPSocketBase::GetRemoteAddress() const { | |
| 91 return socket_->GetRemoteAddress(); | |
| 92 } | |
| 93 | |
| 94 int AsyncTCPSocketBase::Close() { | |
| 95 return socket_->Close(); | |
| 96 } | |
| 97 | |
| 98 AsyncTCPSocket::State AsyncTCPSocketBase::GetState() const { | |
| 99 switch (socket_->GetState()) { | |
| 100 case Socket::CS_CLOSED: | |
| 101 return STATE_CLOSED; | |
| 102 case Socket::CS_CONNECTING: | |
| 103 if (listen_) { | |
| 104 return STATE_BOUND; | |
| 105 } else { | |
| 106 return STATE_CONNECTING; | |
| 107 } | |
| 108 case Socket::CS_CONNECTED: | |
| 109 return STATE_CONNECTED; | |
| 110 default: | |
| 111 RTC_NOTREACHED(); | |
| 112 return STATE_CLOSED; | |
| 113 } | |
| 114 } | |
| 115 | |
| 116 int AsyncTCPSocketBase::GetOption(Socket::Option opt, int* value) { | |
| 117 return socket_->GetOption(opt, value); | |
| 118 } | |
| 119 | |
| 120 int AsyncTCPSocketBase::SetOption(Socket::Option opt, int value) { | |
| 121 return socket_->SetOption(opt, value); | |
| 122 } | |
| 123 | |
| 124 int AsyncTCPSocketBase::GetError() const { | |
| 125 return socket_->GetError(); | |
| 126 } | |
| 127 | |
| 128 void AsyncTCPSocketBase::SetError(int error) { | |
| 129 return socket_->SetError(error); | |
| 130 } | |
| 131 | |
| 132 int AsyncTCPSocketBase::SendTo(const void *pv, size_t cb, | |
| 133 const SocketAddress& addr, | |
| 134 const rtc::PacketOptions& options) { | |
| 135 const SocketAddress& remote_address = GetRemoteAddress(); | |
| 136 if (addr == remote_address) | |
| 137 return Send(pv, cb, options); | |
| 138 // Remote address may be empty if there is a sudden network change. | |
| 139 RTC_DCHECK(remote_address.IsNil()); | |
| 140 socket_->SetError(ENOTCONN); | |
| 141 return -1; | |
| 142 } | |
| 143 | |
| 144 int AsyncTCPSocketBase::SendRaw(const void * pv, size_t cb) { | |
| 145 if (outbuf_.size() + cb > max_outsize_) { | |
| 146 socket_->SetError(EMSGSIZE); | |
| 147 return -1; | |
| 148 } | |
| 149 | |
| 150 RTC_DCHECK(!listen_); | |
| 151 outbuf_.AppendData(static_cast<const uint8_t*>(pv), cb); | |
| 152 | |
| 153 return FlushOutBuffer(); | |
| 154 } | |
| 155 | |
| 156 int AsyncTCPSocketBase::FlushOutBuffer() { | |
| 157 RTC_DCHECK(!listen_); | |
| 158 int res = socket_->Send(outbuf_.data(), outbuf_.size()); | |
| 159 if (res <= 0) { | |
| 160 return res; | |
| 161 } | |
| 162 if (static_cast<size_t>(res) > outbuf_.size()) { | |
| 163 RTC_NOTREACHED(); | |
| 164 return -1; | |
| 165 } | |
| 166 size_t new_size = outbuf_.size() - res; | |
| 167 if (new_size > 0) { | |
| 168 memmove(outbuf_.data(), outbuf_.data() + res, new_size); | |
| 169 } | |
| 170 outbuf_.SetSize(new_size); | |
| 171 return res; | |
| 172 } | |
| 173 | |
| 174 void AsyncTCPSocketBase::AppendToOutBuffer(const void* pv, size_t cb) { | |
| 175 RTC_DCHECK(outbuf_.size() + cb <= max_outsize_); | |
| 176 RTC_DCHECK(!listen_); | |
| 177 outbuf_.AppendData(static_cast<const uint8_t*>(pv), cb); | |
| 178 } | |
| 179 | |
| 180 void AsyncTCPSocketBase::OnConnectEvent(AsyncSocket* socket) { | |
| 181 SignalConnect(this); | |
| 182 } | |
| 183 | |
| 184 void AsyncTCPSocketBase::OnReadEvent(AsyncSocket* socket) { | |
| 185 RTC_DCHECK(socket_.get() == socket); | |
| 186 | |
| 187 if (listen_) { | |
| 188 rtc::SocketAddress address; | |
| 189 rtc::AsyncSocket* new_socket = socket->Accept(&address); | |
| 190 if (!new_socket) { | |
| 191 // TODO(stefan): Do something better like forwarding the error | |
| 192 // to the user. | |
| 193 LOG(LS_ERROR) << "TCP accept failed with error " << socket_->GetError(); | |
| 194 return; | |
| 195 } | |
| 196 | |
| 197 HandleIncomingConnection(new_socket); | |
| 198 | |
| 199 // Prime a read event in case data is waiting. | |
| 200 new_socket->SignalReadEvent(new_socket); | |
| 201 } else { | |
| 202 size_t total_recv = 0; | |
| 203 while (true) { | |
| 204 size_t free_size = inbuf_.capacity() - inbuf_.size(); | |
| 205 if (free_size < kMinimumRecvSize && inbuf_.capacity() < max_insize_) { | |
| 206 inbuf_.EnsureCapacity(std::min(max_insize_, inbuf_.capacity() * 2)); | |
| 207 free_size = inbuf_.capacity() - inbuf_.size(); | |
| 208 } | |
| 209 | |
| 210 int len = | |
| 211 socket_->Recv(inbuf_.data() + inbuf_.size(), free_size, nullptr); | |
| 212 if (len < 0) { | |
| 213 // TODO(stefan): Do something better like forwarding the error to the | |
| 214 // user. | |
| 215 if (!socket_->IsBlocking()) { | |
| 216 LOG(LS_ERROR) << "Recv() returned error: " << socket_->GetError(); | |
| 217 } | |
| 218 break; | |
| 219 } | |
| 220 | |
| 221 total_recv += len; | |
| 222 inbuf_.SetSize(inbuf_.size() + len); | |
| 223 if (!len || static_cast<size_t>(len) < free_size) { | |
| 224 break; | |
| 225 } | |
| 226 } | |
| 227 | |
| 228 if (!total_recv) { | |
| 229 return; | |
| 230 } | |
| 231 | |
| 232 size_t size = inbuf_.size(); | |
| 233 ProcessInput(inbuf_.data<char>(), &size); | |
| 234 | |
| 235 if (size > inbuf_.size()) { | |
| 236 LOG(LS_ERROR) << "input buffer overflow"; | |
| 237 RTC_NOTREACHED(); | |
| 238 inbuf_.Clear(); | |
| 239 } else { | |
| 240 inbuf_.SetSize(size); | |
| 241 } | |
| 242 } | |
| 243 } | |
| 244 | |
| 245 void AsyncTCPSocketBase::OnWriteEvent(AsyncSocket* socket) { | |
| 246 RTC_DCHECK(socket_.get() == socket); | |
| 247 | |
| 248 if (outbuf_.size() > 0) { | |
| 249 FlushOutBuffer(); | |
| 250 } | |
| 251 | |
| 252 if (outbuf_.size() == 0) { | |
| 253 SignalReadyToSend(this); | |
| 254 } | |
| 255 } | |
| 256 | |
| 257 void AsyncTCPSocketBase::OnCloseEvent(AsyncSocket* socket, int error) { | |
| 258 SignalClose(this, error); | |
| 259 } | |
| 260 | |
| 261 // AsyncTCPSocket | |
| 262 // Binds and connects |socket| and creates AsyncTCPSocket for | |
| 263 // it. Takes ownership of |socket|. Returns null if bind() or | |
| 264 // connect() fail (|socket| is destroyed in that case). | |
| 265 AsyncTCPSocket* AsyncTCPSocket::Create( | |
| 266 AsyncSocket* socket, | |
| 267 const SocketAddress& bind_address, | |
| 268 const SocketAddress& remote_address) { | |
| 269 return new AsyncTCPSocket(AsyncTCPSocketBase::ConnectSocket( | |
| 270 socket, bind_address, remote_address), false); | |
| 271 } | |
| 272 | |
| 273 AsyncTCPSocket::AsyncTCPSocket(AsyncSocket* socket, bool listen) | |
| 274 : AsyncTCPSocketBase(socket, listen, kBufSize) { | |
| 275 } | |
| 276 | |
| 277 int AsyncTCPSocket::Send(const void *pv, size_t cb, | |
| 278 const rtc::PacketOptions& options) { | |
| 279 if (cb > kBufSize) { | |
| 280 SetError(EMSGSIZE); | |
| 281 return -1; | |
| 282 } | |
| 283 | |
| 284 // If we are blocking on send, then silently drop this packet | |
| 285 if (!IsOutBufferEmpty()) | |
| 286 return static_cast<int>(cb); | |
| 287 | |
| 288 PacketLength pkt_len = HostToNetwork16(static_cast<PacketLength>(cb)); | |
| 289 AppendToOutBuffer(&pkt_len, kPacketLenSize); | |
| 290 AppendToOutBuffer(pv, cb); | |
| 291 | |
| 292 int res = FlushOutBuffer(); | |
| 293 if (res <= 0) { | |
| 294 // drop packet if we made no progress | |
| 295 ClearOutBuffer(); | |
| 296 return res; | |
| 297 } | |
| 298 | |
| 299 rtc::SentPacket sent_packet(options.packet_id, rtc::TimeMillis()); | |
| 300 SignalSentPacket(this, sent_packet); | |
| 301 | |
| 302 // We claim to have sent the whole thing, even if we only sent partial | |
| 303 return static_cast<int>(cb); | |
| 304 } | |
| 305 | |
| 306 void AsyncTCPSocket::ProcessInput(char * data, size_t* len) { | |
| 307 SocketAddress remote_addr(GetRemoteAddress()); | |
| 308 | |
| 309 while (true) { | |
| 310 if (*len < kPacketLenSize) | |
| 311 return; | |
| 312 | |
| 313 PacketLength pkt_len = rtc::GetBE16(data); | |
| 314 if (*len < kPacketLenSize + pkt_len) | |
| 315 return; | |
| 316 | |
| 317 SignalReadPacket(this, data + kPacketLenSize, pkt_len, remote_addr, | |
| 318 CreatePacketTime(0)); | |
| 319 | |
| 320 *len -= kPacketLenSize + pkt_len; | |
| 321 if (*len > 0) { | |
| 322 memmove(data, data + kPacketLenSize + pkt_len, *len); | |
| 323 } | |
| 324 } | |
| 325 } | |
| 326 | |
| 327 void AsyncTCPSocket::HandleIncomingConnection(AsyncSocket* socket) { | |
| 328 SignalNewConnection(this, new AsyncTCPSocket(socket, false)); | |
| 329 } | |
| 330 | |
| 331 } // namespace rtc | |
| OLD | NEW |