OLD | NEW |
| (Empty) |
1 /* | |
2 * Copyright 2004 The WebRTC Project Authors. All rights reserved. | |
3 * | |
4 * Use of this source code is governed by a BSD-style license | |
5 * that can be found in the LICENSE file in the root of the source | |
6 * tree. An additional intellectual property rights grant can be found | |
7 * in the file PATENTS. All contributing project authors may | |
8 * be found in the AUTHORS file in the root of the source tree. | |
9 */ | |
10 | |
11 #include <iomanip> | |
12 | |
13 #include "webrtc/base/asyncsocket.h" | |
14 #include "webrtc/base/logging.h" | |
15 #include "webrtc/base/socketfactory.h" | |
16 #include "webrtc/base/socketpool.h" | |
17 #include "webrtc/base/socketstream.h" | |
18 #include "webrtc/base/thread.h" | |
19 | |
20 namespace rtc { | |
21 | |
22 /////////////////////////////////////////////////////////////////////////////// | |
23 // StreamCache - Caches a set of open streams, defers creation to a separate | |
24 // StreamPool. | |
25 /////////////////////////////////////////////////////////////////////////////// | |
26 | |
27 StreamCache::StreamCache(StreamPool* pool) : pool_(pool) { | |
28 } | |
29 | |
30 StreamCache::~StreamCache() { | |
31 for (ConnectedList::iterator it = active_.begin(); it != active_.end(); | |
32 ++it) { | |
33 delete it->second; | |
34 } | |
35 for (ConnectedList::iterator it = cached_.begin(); it != cached_.end(); | |
36 ++it) { | |
37 delete it->second; | |
38 } | |
39 } | |
40 | |
41 StreamInterface* StreamCache::RequestConnectedStream( | |
42 const SocketAddress& remote, int* err) { | |
43 LOG_F(LS_VERBOSE) << "(" << remote << ")"; | |
44 for (ConnectedList::iterator it = cached_.begin(); it != cached_.end(); | |
45 ++it) { | |
46 if (remote == it->first) { | |
47 it->second->SignalEvent.disconnect(this); | |
48 // Move from cached_ to active_ | |
49 active_.push_front(*it); | |
50 cached_.erase(it); | |
51 if (err) | |
52 *err = 0; | |
53 LOG_F(LS_VERBOSE) << "Providing cached stream"; | |
54 return active_.front().second; | |
55 } | |
56 } | |
57 if (StreamInterface* stream = pool_->RequestConnectedStream(remote, err)) { | |
58 // We track active streams so that we can remember their address | |
59 active_.push_front(ConnectedStream(remote, stream)); | |
60 LOG_F(LS_VERBOSE) << "Providing new stream"; | |
61 return active_.front().second; | |
62 } | |
63 return NULL; | |
64 } | |
65 | |
66 void StreamCache::ReturnConnectedStream(StreamInterface* stream) { | |
67 for (ConnectedList::iterator it = active_.begin(); it != active_.end(); | |
68 ++it) { | |
69 if (stream == it->second) { | |
70 LOG_F(LS_VERBOSE) << "(" << it->first << ")"; | |
71 if (stream->GetState() == SS_CLOSED) { | |
72 // Return closed streams | |
73 LOG_F(LS_VERBOSE) << "Returning closed stream"; | |
74 pool_->ReturnConnectedStream(it->second); | |
75 } else { | |
76 // Monitor open streams | |
77 stream->SignalEvent.connect(this, &StreamCache::OnStreamEvent); | |
78 LOG_F(LS_VERBOSE) << "Caching stream"; | |
79 cached_.push_front(*it); | |
80 } | |
81 active_.erase(it); | |
82 return; | |
83 } | |
84 } | |
85 ASSERT(false); | |
86 } | |
87 | |
88 void StreamCache::OnStreamEvent(StreamInterface* stream, int events, int err) { | |
89 if ((events & SE_CLOSE) == 0) { | |
90 LOG_F(LS_WARNING) << "(" << events << ", " << err | |
91 << ") received non-close event"; | |
92 return; | |
93 } | |
94 for (ConnectedList::iterator it = cached_.begin(); it != cached_.end(); | |
95 ++it) { | |
96 if (stream == it->second) { | |
97 LOG_F(LS_VERBOSE) << "(" << it->first << ")"; | |
98 // We don't cache closed streams, so return it. | |
99 it->second->SignalEvent.disconnect(this); | |
100 LOG_F(LS_VERBOSE) << "Returning closed stream"; | |
101 pool_->ReturnConnectedStream(it->second); | |
102 cached_.erase(it); | |
103 return; | |
104 } | |
105 } | |
106 ASSERT(false); | |
107 } | |
108 | |
109 ////////////////////////////////////////////////////////////////////// | |
110 // NewSocketPool | |
111 ////////////////////////////////////////////////////////////////////// | |
112 | |
113 NewSocketPool::NewSocketPool(SocketFactory* factory) : factory_(factory) { | |
114 } | |
115 | |
116 NewSocketPool::~NewSocketPool() { | |
117 } | |
118 | |
119 StreamInterface* | |
120 NewSocketPool::RequestConnectedStream(const SocketAddress& remote, int* err) { | |
121 AsyncSocket* socket = | |
122 factory_->CreateAsyncSocket(remote.family(), SOCK_STREAM); | |
123 if (!socket) { | |
124 if (err) | |
125 *err = -1; | |
126 return NULL; | |
127 } | |
128 if ((socket->Connect(remote) != 0) && !socket->IsBlocking()) { | |
129 if (err) | |
130 *err = socket->GetError(); | |
131 delete socket; | |
132 return NULL; | |
133 } | |
134 if (err) | |
135 *err = 0; | |
136 return new SocketStream(socket); | |
137 } | |
138 | |
139 void | |
140 NewSocketPool::ReturnConnectedStream(StreamInterface* stream) { | |
141 Thread::Current()->Dispose(stream); | |
142 } | |
143 | |
144 ////////////////////////////////////////////////////////////////////// | |
145 // ReuseSocketPool | |
146 ////////////////////////////////////////////////////////////////////// | |
147 | |
148 ReuseSocketPool::ReuseSocketPool(SocketFactory* factory) | |
149 : factory_(factory), stream_(NULL), checked_out_(false) { | |
150 } | |
151 | |
152 ReuseSocketPool::~ReuseSocketPool() { | |
153 ASSERT(!checked_out_); | |
154 delete stream_; | |
155 } | |
156 | |
157 StreamInterface* | |
158 ReuseSocketPool::RequestConnectedStream(const SocketAddress& remote, int* err) { | |
159 // Only one socket can be used from this "pool" at a time | |
160 ASSERT(!checked_out_); | |
161 if (!stream_) { | |
162 LOG_F(LS_VERBOSE) << "Creating new socket"; | |
163 int family = remote.family(); | |
164 // TODO: Deal with this when we/I clean up DNS resolution. | |
165 if (remote.IsUnresolvedIP()) { | |
166 family = AF_INET; | |
167 } | |
168 AsyncSocket* socket = | |
169 factory_->CreateAsyncSocket(family, SOCK_STREAM); | |
170 if (!socket) { | |
171 if (err) | |
172 *err = -1; | |
173 return NULL; | |
174 } | |
175 stream_ = new SocketStream(socket); | |
176 } | |
177 if ((stream_->GetState() == SS_OPEN) && (remote == remote_)) { | |
178 LOG_F(LS_VERBOSE) << "Reusing connection to: " << remote_; | |
179 } else { | |
180 remote_ = remote; | |
181 stream_->Close(); | |
182 if ((stream_->GetSocket()->Connect(remote_) != 0) | |
183 && !stream_->GetSocket()->IsBlocking()) { | |
184 if (err) | |
185 *err = stream_->GetSocket()->GetError(); | |
186 return NULL; | |
187 } else { | |
188 LOG_F(LS_VERBOSE) << "Opening connection to: " << remote_; | |
189 } | |
190 } | |
191 stream_->SignalEvent.disconnect(this); | |
192 checked_out_ = true; | |
193 if (err) | |
194 *err = 0; | |
195 return stream_; | |
196 } | |
197 | |
198 void | |
199 ReuseSocketPool::ReturnConnectedStream(StreamInterface* stream) { | |
200 ASSERT(stream == stream_); | |
201 ASSERT(checked_out_); | |
202 checked_out_ = false; | |
203 // Until the socket is reused, monitor it to determine if it closes. | |
204 stream_->SignalEvent.connect(this, &ReuseSocketPool::OnStreamEvent); | |
205 } | |
206 | |
207 void | |
208 ReuseSocketPool::OnStreamEvent(StreamInterface* stream, int events, int err) { | |
209 ASSERT(stream == stream_); | |
210 ASSERT(!checked_out_); | |
211 | |
212 // If the stream was written to and then immediately returned to us then | |
213 // we may get a writable notification for it, which we should ignore. | |
214 if (events == SE_WRITE) { | |
215 LOG_F(LS_VERBOSE) << "Pooled Socket unexpectedly writable: ignoring"; | |
216 return; | |
217 } | |
218 | |
219 // If the peer sent data, we can't process it, so drop the connection. | |
220 // If the socket has closed, clean it up. | |
221 // In either case, we'll reconnect it the next time it is used. | |
222 ASSERT(0 != (events & (SE_READ|SE_CLOSE))); | |
223 if (0 != (events & SE_CLOSE)) { | |
224 LOG_F(LS_VERBOSE) << "Connection closed with error: " << err; | |
225 } else { | |
226 LOG_F(LS_VERBOSE) << "Pooled Socket unexpectedly readable: closing"; | |
227 } | |
228 stream_->Close(); | |
229 } | |
230 | |
231 /////////////////////////////////////////////////////////////////////////////// | |
232 // LoggingPoolAdapter - Adapts a StreamPool to supply streams with attached | |
233 // LoggingAdapters. | |
234 /////////////////////////////////////////////////////////////////////////////// | |
235 | |
236 LoggingPoolAdapter::LoggingPoolAdapter( | |
237 StreamPool* pool, LoggingSeverity level, const std::string& label, | |
238 bool binary_mode) | |
239 : pool_(pool), level_(level), label_(label), binary_mode_(binary_mode) { | |
240 } | |
241 | |
242 LoggingPoolAdapter::~LoggingPoolAdapter() { | |
243 for (StreamList::iterator it = recycle_bin_.begin(); | |
244 it != recycle_bin_.end(); ++it) { | |
245 delete *it; | |
246 } | |
247 } | |
248 | |
249 StreamInterface* LoggingPoolAdapter::RequestConnectedStream( | |
250 const SocketAddress& remote, int* err) { | |
251 if (StreamInterface* stream = pool_->RequestConnectedStream(remote, err)) { | |
252 ASSERT(SS_CLOSED != stream->GetState()); | |
253 std::stringstream ss; | |
254 ss << label_ << "(0x" << std::setfill('0') << std::hex << std::setw(8) | |
255 << stream << ")"; | |
256 LOG_V(level_) << ss.str() | |
257 << ((SS_OPEN == stream->GetState()) ? " Connected" | |
258 : " Connecting") | |
259 << " to " << remote; | |
260 if (recycle_bin_.empty()) { | |
261 return new LoggingAdapter(stream, level_, ss.str(), binary_mode_); | |
262 } | |
263 LoggingAdapter* logging = recycle_bin_.front(); | |
264 recycle_bin_.pop_front(); | |
265 logging->set_label(ss.str()); | |
266 logging->Attach(stream); | |
267 return logging; | |
268 } | |
269 return NULL; | |
270 } | |
271 | |
272 void LoggingPoolAdapter::ReturnConnectedStream(StreamInterface* stream) { | |
273 LoggingAdapter* logging = static_cast<LoggingAdapter*>(stream); | |
274 pool_->ReturnConnectedStream(logging->Detach()); | |
275 recycle_bin_.push_back(logging); | |
276 } | |
277 | |
278 /////////////////////////////////////////////////////////////////////////////// | |
279 | |
280 } // namespace rtc | |
OLD | NEW |