| OLD | NEW | 
|---|
|  | (Empty) | 
| 1 /* |  | 
| 2  *  Copyright 2004 The WebRTC Project Authors. All rights reserved. |  | 
| 3  * |  | 
| 4  *  Use of this source code is governed by a BSD-style license |  | 
| 5  *  that can be found in the LICENSE file in the root of the source |  | 
| 6  *  tree. An additional intellectual property rights grant can be found |  | 
| 7  *  in the file PATENTS.  All contributing project authors may |  | 
| 8  *  be found in the AUTHORS file in the root of the source tree. |  | 
| 9  */ |  | 
| 10 |  | 
| 11 #include "webrtc/base/asynctcpsocket.h" |  | 
| 12 |  | 
| 13 #include <string.h> |  | 
| 14 |  | 
| 15 #include <algorithm> |  | 
| 16 #include <memory> |  | 
| 17 |  | 
| 18 #include "webrtc/base/byteorder.h" |  | 
| 19 #include "webrtc/base/checks.h" |  | 
| 20 #include "webrtc/base/logging.h" |  | 
| 21 |  | 
| 22 #if defined(WEBRTC_POSIX) |  | 
| 23 #include <errno.h> |  | 
| 24 #endif  // WEBRTC_POSIX |  | 
| 25 |  | 
| 26 namespace rtc { |  | 
| 27 |  | 
| 28 static const size_t kMaxPacketSize = 64 * 1024; |  | 
| 29 |  | 
| 30 typedef uint16_t PacketLength; |  | 
| 31 static const size_t kPacketLenSize = sizeof(PacketLength); |  | 
| 32 |  | 
| 33 static const size_t kBufSize = kMaxPacketSize + kPacketLenSize; |  | 
| 34 |  | 
| 35 // The input buffer will be resized so that at least kMinimumRecvSize bytes can |  | 
| 36 // be received (but it will not grow above the maximum size passed to the |  | 
| 37 // constructor). |  | 
| 38 static const size_t kMinimumRecvSize = 128; |  | 
| 39 |  | 
| 40 static const int kListenBacklog = 5; |  | 
| 41 |  | 
| 42 // Binds and connects |socket| |  | 
| 43 AsyncSocket* AsyncTCPSocketBase::ConnectSocket( |  | 
| 44     rtc::AsyncSocket* socket, |  | 
| 45     const rtc::SocketAddress& bind_address, |  | 
| 46     const rtc::SocketAddress& remote_address) { |  | 
| 47   std::unique_ptr<rtc::AsyncSocket> owned_socket(socket); |  | 
| 48   if (socket->Bind(bind_address) < 0) { |  | 
| 49     LOG(LS_ERROR) << "Bind() failed with error " << socket->GetError(); |  | 
| 50     return nullptr; |  | 
| 51   } |  | 
| 52   if (socket->Connect(remote_address) < 0) { |  | 
| 53     LOG(LS_ERROR) << "Connect() failed with error " << socket->GetError(); |  | 
| 54     return nullptr; |  | 
| 55   } |  | 
| 56   return owned_socket.release(); |  | 
| 57 } |  | 
| 58 |  | 
| 59 AsyncTCPSocketBase::AsyncTCPSocketBase(AsyncSocket* socket, bool listen, |  | 
| 60                                        size_t max_packet_size) |  | 
| 61     : socket_(socket), |  | 
| 62       listen_(listen), |  | 
| 63       max_insize_(max_packet_size), |  | 
| 64       max_outsize_(max_packet_size) { |  | 
| 65   if (!listen_) { |  | 
| 66     // Listening sockets don't send/receive data, so they don't need buffers. |  | 
| 67     inbuf_.EnsureCapacity(kMinimumRecvSize); |  | 
| 68   } |  | 
| 69 |  | 
| 70   RTC_DCHECK(socket_.get() != nullptr); |  | 
| 71   socket_->SignalConnectEvent.connect( |  | 
| 72       this, &AsyncTCPSocketBase::OnConnectEvent); |  | 
| 73   socket_->SignalReadEvent.connect(this, &AsyncTCPSocketBase::OnReadEvent); |  | 
| 74   socket_->SignalWriteEvent.connect(this, &AsyncTCPSocketBase::OnWriteEvent); |  | 
| 75   socket_->SignalCloseEvent.connect(this, &AsyncTCPSocketBase::OnCloseEvent); |  | 
| 76 |  | 
| 77   if (listen_) { |  | 
| 78     if (socket_->Listen(kListenBacklog) < 0) { |  | 
| 79       LOG(LS_ERROR) << "Listen() failed with error " << socket_->GetError(); |  | 
| 80     } |  | 
| 81   } |  | 
| 82 } |  | 
| 83 |  | 
| 84 AsyncTCPSocketBase::~AsyncTCPSocketBase() {} |  | 
| 85 |  | 
| 86 SocketAddress AsyncTCPSocketBase::GetLocalAddress() const { |  | 
| 87   return socket_->GetLocalAddress(); |  | 
| 88 } |  | 
| 89 |  | 
| 90 SocketAddress AsyncTCPSocketBase::GetRemoteAddress() const { |  | 
| 91   return socket_->GetRemoteAddress(); |  | 
| 92 } |  | 
| 93 |  | 
| 94 int AsyncTCPSocketBase::Close() { |  | 
| 95   return socket_->Close(); |  | 
| 96 } |  | 
| 97 |  | 
| 98 AsyncTCPSocket::State AsyncTCPSocketBase::GetState() const { |  | 
| 99   switch (socket_->GetState()) { |  | 
| 100     case Socket::CS_CLOSED: |  | 
| 101       return STATE_CLOSED; |  | 
| 102     case Socket::CS_CONNECTING: |  | 
| 103       if (listen_) { |  | 
| 104         return STATE_BOUND; |  | 
| 105       } else { |  | 
| 106         return STATE_CONNECTING; |  | 
| 107       } |  | 
| 108     case Socket::CS_CONNECTED: |  | 
| 109       return STATE_CONNECTED; |  | 
| 110     default: |  | 
| 111       RTC_NOTREACHED(); |  | 
| 112       return STATE_CLOSED; |  | 
| 113   } |  | 
| 114 } |  | 
| 115 |  | 
| 116 int AsyncTCPSocketBase::GetOption(Socket::Option opt, int* value) { |  | 
| 117   return socket_->GetOption(opt, value); |  | 
| 118 } |  | 
| 119 |  | 
| 120 int AsyncTCPSocketBase::SetOption(Socket::Option opt, int value) { |  | 
| 121   return socket_->SetOption(opt, value); |  | 
| 122 } |  | 
| 123 |  | 
| 124 int AsyncTCPSocketBase::GetError() const { |  | 
| 125   return socket_->GetError(); |  | 
| 126 } |  | 
| 127 |  | 
| 128 void AsyncTCPSocketBase::SetError(int error) { |  | 
| 129   return socket_->SetError(error); |  | 
| 130 } |  | 
| 131 |  | 
| 132 int AsyncTCPSocketBase::SendTo(const void *pv, size_t cb, |  | 
| 133                                const SocketAddress& addr, |  | 
| 134                                const rtc::PacketOptions& options) { |  | 
| 135   const SocketAddress& remote_address = GetRemoteAddress(); |  | 
| 136   if (addr == remote_address) |  | 
| 137     return Send(pv, cb, options); |  | 
| 138   // Remote address may be empty if there is a sudden network change. |  | 
| 139   RTC_DCHECK(remote_address.IsNil()); |  | 
| 140   socket_->SetError(ENOTCONN); |  | 
| 141   return -1; |  | 
| 142 } |  | 
| 143 |  | 
| 144 int AsyncTCPSocketBase::SendRaw(const void * pv, size_t cb) { |  | 
| 145   if (outbuf_.size() + cb > max_outsize_) { |  | 
| 146     socket_->SetError(EMSGSIZE); |  | 
| 147     return -1; |  | 
| 148   } |  | 
| 149 |  | 
| 150   RTC_DCHECK(!listen_); |  | 
| 151   outbuf_.AppendData(static_cast<const uint8_t*>(pv), cb); |  | 
| 152 |  | 
| 153   return FlushOutBuffer(); |  | 
| 154 } |  | 
| 155 |  | 
| 156 int AsyncTCPSocketBase::FlushOutBuffer() { |  | 
| 157   RTC_DCHECK(!listen_); |  | 
| 158   int res = socket_->Send(outbuf_.data(), outbuf_.size()); |  | 
| 159   if (res <= 0) { |  | 
| 160     return res; |  | 
| 161   } |  | 
| 162   if (static_cast<size_t>(res) > outbuf_.size()) { |  | 
| 163     RTC_NOTREACHED(); |  | 
| 164     return -1; |  | 
| 165   } |  | 
| 166   size_t new_size = outbuf_.size() - res; |  | 
| 167   if (new_size > 0) { |  | 
| 168     memmove(outbuf_.data(), outbuf_.data() + res, new_size); |  | 
| 169   } |  | 
| 170   outbuf_.SetSize(new_size); |  | 
| 171   return res; |  | 
| 172 } |  | 
| 173 |  | 
| 174 void AsyncTCPSocketBase::AppendToOutBuffer(const void* pv, size_t cb) { |  | 
| 175   RTC_DCHECK(outbuf_.size() + cb <= max_outsize_); |  | 
| 176   RTC_DCHECK(!listen_); |  | 
| 177   outbuf_.AppendData(static_cast<const uint8_t*>(pv), cb); |  | 
| 178 } |  | 
| 179 |  | 
| 180 void AsyncTCPSocketBase::OnConnectEvent(AsyncSocket* socket) { |  | 
| 181   SignalConnect(this); |  | 
| 182 } |  | 
| 183 |  | 
| 184 void AsyncTCPSocketBase::OnReadEvent(AsyncSocket* socket) { |  | 
| 185   RTC_DCHECK(socket_.get() == socket); |  | 
| 186 |  | 
| 187   if (listen_) { |  | 
| 188     rtc::SocketAddress address; |  | 
| 189     rtc::AsyncSocket* new_socket = socket->Accept(&address); |  | 
| 190     if (!new_socket) { |  | 
| 191       // TODO(stefan): Do something better like forwarding the error |  | 
| 192       // to the user. |  | 
| 193       LOG(LS_ERROR) << "TCP accept failed with error " << socket_->GetError(); |  | 
| 194       return; |  | 
| 195     } |  | 
| 196 |  | 
| 197     HandleIncomingConnection(new_socket); |  | 
| 198 |  | 
| 199     // Prime a read event in case data is waiting. |  | 
| 200     new_socket->SignalReadEvent(new_socket); |  | 
| 201   } else { |  | 
| 202     size_t total_recv = 0; |  | 
| 203     while (true) { |  | 
| 204       size_t free_size = inbuf_.capacity() - inbuf_.size(); |  | 
| 205       if (free_size < kMinimumRecvSize && inbuf_.capacity() < max_insize_) { |  | 
| 206         inbuf_.EnsureCapacity(std::min(max_insize_, inbuf_.capacity() * 2)); |  | 
| 207         free_size = inbuf_.capacity() - inbuf_.size(); |  | 
| 208       } |  | 
| 209 |  | 
| 210       int len = |  | 
| 211           socket_->Recv(inbuf_.data() + inbuf_.size(), free_size, nullptr); |  | 
| 212       if (len < 0) { |  | 
| 213         // TODO(stefan): Do something better like forwarding the error to the |  | 
| 214         // user. |  | 
| 215         if (!socket_->IsBlocking()) { |  | 
| 216           LOG(LS_ERROR) << "Recv() returned error: " << socket_->GetError(); |  | 
| 217         } |  | 
| 218         break; |  | 
| 219       } |  | 
| 220 |  | 
| 221       total_recv += len; |  | 
| 222       inbuf_.SetSize(inbuf_.size() + len); |  | 
| 223       if (!len || static_cast<size_t>(len) < free_size) { |  | 
| 224         break; |  | 
| 225       } |  | 
| 226     } |  | 
| 227 |  | 
| 228     if (!total_recv) { |  | 
| 229       return; |  | 
| 230     } |  | 
| 231 |  | 
| 232     size_t size = inbuf_.size(); |  | 
| 233     ProcessInput(inbuf_.data<char>(), &size); |  | 
| 234 |  | 
| 235     if (size > inbuf_.size()) { |  | 
| 236       LOG(LS_ERROR) << "input buffer overflow"; |  | 
| 237       RTC_NOTREACHED(); |  | 
| 238       inbuf_.Clear(); |  | 
| 239     } else { |  | 
| 240       inbuf_.SetSize(size); |  | 
| 241     } |  | 
| 242   } |  | 
| 243 } |  | 
| 244 |  | 
| 245 void AsyncTCPSocketBase::OnWriteEvent(AsyncSocket* socket) { |  | 
| 246   RTC_DCHECK(socket_.get() == socket); |  | 
| 247 |  | 
| 248   if (outbuf_.size() > 0) { |  | 
| 249     FlushOutBuffer(); |  | 
| 250   } |  | 
| 251 |  | 
| 252   if (outbuf_.size() == 0) { |  | 
| 253     SignalReadyToSend(this); |  | 
| 254   } |  | 
| 255 } |  | 
| 256 |  | 
| 257 void AsyncTCPSocketBase::OnCloseEvent(AsyncSocket* socket, int error) { |  | 
| 258   SignalClose(this, error); |  | 
| 259 } |  | 
| 260 |  | 
| 261 // AsyncTCPSocket |  | 
| 262 // Binds and connects |socket| and creates AsyncTCPSocket for |  | 
| 263 // it. Takes ownership of |socket|. Returns null if bind() or |  | 
| 264 // connect() fail (|socket| is destroyed in that case). |  | 
| 265 AsyncTCPSocket* AsyncTCPSocket::Create( |  | 
| 266     AsyncSocket* socket, |  | 
| 267     const SocketAddress& bind_address, |  | 
| 268     const SocketAddress& remote_address) { |  | 
| 269   return new AsyncTCPSocket(AsyncTCPSocketBase::ConnectSocket( |  | 
| 270       socket, bind_address, remote_address), false); |  | 
| 271 } |  | 
| 272 |  | 
| 273 AsyncTCPSocket::AsyncTCPSocket(AsyncSocket* socket, bool listen) |  | 
| 274     : AsyncTCPSocketBase(socket, listen, kBufSize) { |  | 
| 275 } |  | 
| 276 |  | 
| 277 int AsyncTCPSocket::Send(const void *pv, size_t cb, |  | 
| 278                          const rtc::PacketOptions& options) { |  | 
| 279   if (cb > kBufSize) { |  | 
| 280     SetError(EMSGSIZE); |  | 
| 281     return -1; |  | 
| 282   } |  | 
| 283 |  | 
| 284   // If we are blocking on send, then silently drop this packet |  | 
| 285   if (!IsOutBufferEmpty()) |  | 
| 286     return static_cast<int>(cb); |  | 
| 287 |  | 
| 288   PacketLength pkt_len = HostToNetwork16(static_cast<PacketLength>(cb)); |  | 
| 289   AppendToOutBuffer(&pkt_len, kPacketLenSize); |  | 
| 290   AppendToOutBuffer(pv, cb); |  | 
| 291 |  | 
| 292   int res = FlushOutBuffer(); |  | 
| 293   if (res <= 0) { |  | 
| 294     // drop packet if we made no progress |  | 
| 295     ClearOutBuffer(); |  | 
| 296     return res; |  | 
| 297   } |  | 
| 298 |  | 
| 299   rtc::SentPacket sent_packet(options.packet_id, rtc::TimeMillis()); |  | 
| 300   SignalSentPacket(this, sent_packet); |  | 
| 301 |  | 
| 302   // We claim to have sent the whole thing, even if we only sent partial |  | 
| 303   return static_cast<int>(cb); |  | 
| 304 } |  | 
| 305 |  | 
| 306 void AsyncTCPSocket::ProcessInput(char * data, size_t* len) { |  | 
| 307   SocketAddress remote_addr(GetRemoteAddress()); |  | 
| 308 |  | 
| 309   while (true) { |  | 
| 310     if (*len < kPacketLenSize) |  | 
| 311       return; |  | 
| 312 |  | 
| 313     PacketLength pkt_len = rtc::GetBE16(data); |  | 
| 314     if (*len < kPacketLenSize + pkt_len) |  | 
| 315       return; |  | 
| 316 |  | 
| 317     SignalReadPacket(this, data + kPacketLenSize, pkt_len, remote_addr, |  | 
| 318                      CreatePacketTime(0)); |  | 
| 319 |  | 
| 320     *len -= kPacketLenSize + pkt_len; |  | 
| 321     if (*len > 0) { |  | 
| 322       memmove(data, data + kPacketLenSize + pkt_len, *len); |  | 
| 323     } |  | 
| 324   } |  | 
| 325 } |  | 
| 326 |  | 
| 327 void AsyncTCPSocket::HandleIncomingConnection(AsyncSocket* socket) { |  | 
| 328   SignalNewConnection(this, new AsyncTCPSocket(socket, false)); |  | 
| 329 } |  | 
| 330 |  | 
| 331 }  // namespace rtc |  | 
| OLD | NEW | 
|---|