OLD | NEW |
(Empty) | |
| 1 /* |
| 2 * Copyright 2004 The WebRTC Project Authors. All rights reserved. |
| 3 * |
| 4 * Use of this source code is governed by a BSD-style license |
| 5 * that can be found in the LICENSE file in the root of the source |
| 6 * tree. An additional intellectual property rights grant can be found |
| 7 * in the file PATENTS. All contributing project authors may |
| 8 * be found in the AUTHORS file in the root of the source tree. |
| 9 */ |
| 10 |
| 11 #include <iomanip> |
| 12 |
| 13 #include "webrtc/base/asyncsocket.h" |
| 14 #include "webrtc/base/logging.h" |
| 15 #include "webrtc/base/socketfactory.h" |
| 16 #include "webrtc/base/socketpool.h" |
| 17 #include "webrtc/base/socketstream.h" |
| 18 #include "webrtc/base/thread.h" |
| 19 |
| 20 namespace rtc { |
| 21 |
| 22 /////////////////////////////////////////////////////////////////////////////// |
| 23 // StreamCache - Caches a set of open streams, defers creation to a separate |
| 24 // StreamPool. |
| 25 /////////////////////////////////////////////////////////////////////////////// |
| 26 |
| 27 StreamCache::StreamCache(StreamPool* pool) : pool_(pool) { |
| 28 } |
| 29 |
| 30 StreamCache::~StreamCache() { |
| 31 for (ConnectedList::iterator it = active_.begin(); it != active_.end(); |
| 32 ++it) { |
| 33 delete it->second; |
| 34 } |
| 35 for (ConnectedList::iterator it = cached_.begin(); it != cached_.end(); |
| 36 ++it) { |
| 37 delete it->second; |
| 38 } |
| 39 } |
| 40 |
| 41 StreamInterface* StreamCache::RequestConnectedStream( |
| 42 const SocketAddress& remote, int* err) { |
| 43 LOG_F(LS_VERBOSE) << "(" << remote << ")"; |
| 44 for (ConnectedList::iterator it = cached_.begin(); it != cached_.end(); |
| 45 ++it) { |
| 46 if (remote == it->first) { |
| 47 it->second->SignalEvent.disconnect(this); |
| 48 // Move from cached_ to active_ |
| 49 active_.push_front(*it); |
| 50 cached_.erase(it); |
| 51 if (err) |
| 52 *err = 0; |
| 53 LOG_F(LS_VERBOSE) << "Providing cached stream"; |
| 54 return active_.front().second; |
| 55 } |
| 56 } |
| 57 if (StreamInterface* stream = pool_->RequestConnectedStream(remote, err)) { |
| 58 // We track active streams so that we can remember their address |
| 59 active_.push_front(ConnectedStream(remote, stream)); |
| 60 LOG_F(LS_VERBOSE) << "Providing new stream"; |
| 61 return active_.front().second; |
| 62 } |
| 63 return NULL; |
| 64 } |
| 65 |
| 66 void StreamCache::ReturnConnectedStream(StreamInterface* stream) { |
| 67 for (ConnectedList::iterator it = active_.begin(); it != active_.end(); |
| 68 ++it) { |
| 69 if (stream == it->second) { |
| 70 LOG_F(LS_VERBOSE) << "(" << it->first << ")"; |
| 71 if (stream->GetState() == SS_CLOSED) { |
| 72 // Return closed streams |
| 73 LOG_F(LS_VERBOSE) << "Returning closed stream"; |
| 74 pool_->ReturnConnectedStream(it->second); |
| 75 } else { |
| 76 // Monitor open streams |
| 77 stream->SignalEvent.connect(this, &StreamCache::OnStreamEvent); |
| 78 LOG_F(LS_VERBOSE) << "Caching stream"; |
| 79 cached_.push_front(*it); |
| 80 } |
| 81 active_.erase(it); |
| 82 return; |
| 83 } |
| 84 } |
| 85 ASSERT(false); |
| 86 } |
| 87 |
| 88 void StreamCache::OnStreamEvent(StreamInterface* stream, int events, int err) { |
| 89 if ((events & SE_CLOSE) == 0) { |
| 90 LOG_F(LS_WARNING) << "(" << events << ", " << err |
| 91 << ") received non-close event"; |
| 92 return; |
| 93 } |
| 94 for (ConnectedList::iterator it = cached_.begin(); it != cached_.end(); |
| 95 ++it) { |
| 96 if (stream == it->second) { |
| 97 LOG_F(LS_VERBOSE) << "(" << it->first << ")"; |
| 98 // We don't cache closed streams, so return it. |
| 99 it->second->SignalEvent.disconnect(this); |
| 100 LOG_F(LS_VERBOSE) << "Returning closed stream"; |
| 101 pool_->ReturnConnectedStream(it->second); |
| 102 cached_.erase(it); |
| 103 return; |
| 104 } |
| 105 } |
| 106 ASSERT(false); |
| 107 } |
| 108 |
| 109 ////////////////////////////////////////////////////////////////////// |
| 110 // NewSocketPool |
| 111 ////////////////////////////////////////////////////////////////////// |
| 112 |
| 113 NewSocketPool::NewSocketPool(SocketFactory* factory) : factory_(factory) { |
| 114 } |
| 115 |
| 116 NewSocketPool::~NewSocketPool() { |
| 117 } |
| 118 |
| 119 StreamInterface* |
| 120 NewSocketPool::RequestConnectedStream(const SocketAddress& remote, int* err) { |
| 121 AsyncSocket* socket = |
| 122 factory_->CreateAsyncSocket(remote.family(), SOCK_STREAM); |
| 123 if (!socket) { |
| 124 if (err) |
| 125 *err = -1; |
| 126 return NULL; |
| 127 } |
| 128 if ((socket->Connect(remote) != 0) && !socket->IsBlocking()) { |
| 129 if (err) |
| 130 *err = socket->GetError(); |
| 131 delete socket; |
| 132 return NULL; |
| 133 } |
| 134 if (err) |
| 135 *err = 0; |
| 136 return new SocketStream(socket); |
| 137 } |
| 138 |
| 139 void |
| 140 NewSocketPool::ReturnConnectedStream(StreamInterface* stream) { |
| 141 Thread::Current()->Dispose(stream); |
| 142 } |
| 143 |
| 144 ////////////////////////////////////////////////////////////////////// |
| 145 // ReuseSocketPool |
| 146 ////////////////////////////////////////////////////////////////////// |
| 147 |
| 148 ReuseSocketPool::ReuseSocketPool(SocketFactory* factory) |
| 149 : factory_(factory), stream_(NULL), checked_out_(false) { |
| 150 } |
| 151 |
| 152 ReuseSocketPool::~ReuseSocketPool() { |
| 153 ASSERT(!checked_out_); |
| 154 delete stream_; |
| 155 } |
| 156 |
| 157 StreamInterface* |
| 158 ReuseSocketPool::RequestConnectedStream(const SocketAddress& remote, int* err) { |
| 159 // Only one socket can be used from this "pool" at a time |
| 160 ASSERT(!checked_out_); |
| 161 if (!stream_) { |
| 162 LOG_F(LS_VERBOSE) << "Creating new socket"; |
| 163 int family = remote.family(); |
| 164 // TODO: Deal with this when we/I clean up DNS resolution. |
| 165 if (remote.IsUnresolvedIP()) { |
| 166 family = AF_INET; |
| 167 } |
| 168 AsyncSocket* socket = |
| 169 factory_->CreateAsyncSocket(family, SOCK_STREAM); |
| 170 if (!socket) { |
| 171 if (err) |
| 172 *err = -1; |
| 173 return NULL; |
| 174 } |
| 175 stream_ = new SocketStream(socket); |
| 176 } |
| 177 if ((stream_->GetState() == SS_OPEN) && (remote == remote_)) { |
| 178 LOG_F(LS_VERBOSE) << "Reusing connection to: " << remote_; |
| 179 } else { |
| 180 remote_ = remote; |
| 181 stream_->Close(); |
| 182 if ((stream_->GetSocket()->Connect(remote_) != 0) |
| 183 && !stream_->GetSocket()->IsBlocking()) { |
| 184 if (err) |
| 185 *err = stream_->GetSocket()->GetError(); |
| 186 return NULL; |
| 187 } else { |
| 188 LOG_F(LS_VERBOSE) << "Opening connection to: " << remote_; |
| 189 } |
| 190 } |
| 191 stream_->SignalEvent.disconnect(this); |
| 192 checked_out_ = true; |
| 193 if (err) |
| 194 *err = 0; |
| 195 return stream_; |
| 196 } |
| 197 |
| 198 void |
| 199 ReuseSocketPool::ReturnConnectedStream(StreamInterface* stream) { |
| 200 ASSERT(stream == stream_); |
| 201 ASSERT(checked_out_); |
| 202 checked_out_ = false; |
| 203 // Until the socket is reused, monitor it to determine if it closes. |
| 204 stream_->SignalEvent.connect(this, &ReuseSocketPool::OnStreamEvent); |
| 205 } |
| 206 |
| 207 void |
| 208 ReuseSocketPool::OnStreamEvent(StreamInterface* stream, int events, int err) { |
| 209 ASSERT(stream == stream_); |
| 210 ASSERT(!checked_out_); |
| 211 |
| 212 // If the stream was written to and then immediately returned to us then |
| 213 // we may get a writable notification for it, which we should ignore. |
| 214 if (events == SE_WRITE) { |
| 215 LOG_F(LS_VERBOSE) << "Pooled Socket unexpectedly writable: ignoring"; |
| 216 return; |
| 217 } |
| 218 |
| 219 // If the peer sent data, we can't process it, so drop the connection. |
| 220 // If the socket has closed, clean it up. |
| 221 // In either case, we'll reconnect it the next time it is used. |
| 222 ASSERT(0 != (events & (SE_READ|SE_CLOSE))); |
| 223 if (0 != (events & SE_CLOSE)) { |
| 224 LOG_F(LS_VERBOSE) << "Connection closed with error: " << err; |
| 225 } else { |
| 226 LOG_F(LS_VERBOSE) << "Pooled Socket unexpectedly readable: closing"; |
| 227 } |
| 228 stream_->Close(); |
| 229 } |
| 230 |
| 231 /////////////////////////////////////////////////////////////////////////////// |
| 232 // LoggingPoolAdapter - Adapts a StreamPool to supply streams with attached |
| 233 // LoggingAdapters. |
| 234 /////////////////////////////////////////////////////////////////////////////// |
| 235 |
| 236 LoggingPoolAdapter::LoggingPoolAdapter( |
| 237 StreamPool* pool, LoggingSeverity level, const std::string& label, |
| 238 bool binary_mode) |
| 239 : pool_(pool), level_(level), label_(label), binary_mode_(binary_mode) { |
| 240 } |
| 241 |
| 242 LoggingPoolAdapter::~LoggingPoolAdapter() { |
| 243 for (StreamList::iterator it = recycle_bin_.begin(); |
| 244 it != recycle_bin_.end(); ++it) { |
| 245 delete *it; |
| 246 } |
| 247 } |
| 248 |
| 249 StreamInterface* LoggingPoolAdapter::RequestConnectedStream( |
| 250 const SocketAddress& remote, int* err) { |
| 251 if (StreamInterface* stream = pool_->RequestConnectedStream(remote, err)) { |
| 252 ASSERT(SS_CLOSED != stream->GetState()); |
| 253 std::stringstream ss; |
| 254 ss << label_ << "(0x" << std::setfill('0') << std::hex << std::setw(8) |
| 255 << stream << ")"; |
| 256 LOG_V(level_) << ss.str() |
| 257 << ((SS_OPEN == stream->GetState()) ? " Connected" |
| 258 : " Connecting") |
| 259 << " to " << remote; |
| 260 if (recycle_bin_.empty()) { |
| 261 return new LoggingAdapter(stream, level_, ss.str(), binary_mode_); |
| 262 } |
| 263 LoggingAdapter* logging = recycle_bin_.front(); |
| 264 recycle_bin_.pop_front(); |
| 265 logging->set_label(ss.str()); |
| 266 logging->Attach(stream); |
| 267 return logging; |
| 268 } |
| 269 return NULL; |
| 270 } |
| 271 |
| 272 void LoggingPoolAdapter::ReturnConnectedStream(StreamInterface* stream) { |
| 273 LoggingAdapter* logging = static_cast<LoggingAdapter*>(stream); |
| 274 pool_->ReturnConnectedStream(logging->Detach()); |
| 275 recycle_bin_.push_back(logging); |
| 276 } |
| 277 |
| 278 /////////////////////////////////////////////////////////////////////////////// |
| 279 |
| 280 } // namespace rtc |
OLD | NEW |