OLD | NEW |
| (Empty) |
1 /* | |
2 * Copyright 2004 The WebRTC Project Authors. All rights reserved. | |
3 * | |
4 * Use of this source code is governed by a BSD-style license | |
5 * that can be found in the LICENSE file in the root of the source | |
6 * tree. An additional intellectual property rights grant can be found | |
7 * in the file PATENTS. All contributing project authors may | |
8 * be found in the AUTHORS file in the root of the source tree. | |
9 */ | |
10 | |
11 #include <iomanip> | |
12 | |
13 #include "webrtc/base/asyncsocket.h" | |
14 #include "webrtc/base/checks.h" | |
15 #include "webrtc/base/logging.h" | |
16 #include "webrtc/base/socketfactory.h" | |
17 #include "webrtc/base/socketpool.h" | |
18 #include "webrtc/base/socketstream.h" | |
19 #include "webrtc/base/thread.h" | |
20 | |
21 namespace rtc { | |
22 | |
23 /////////////////////////////////////////////////////////////////////////////// | |
24 // StreamCache - Caches a set of open streams, defers creation to a separate | |
25 // StreamPool. | |
26 /////////////////////////////////////////////////////////////////////////////// | |
27 | |
28 StreamCache::StreamCache(StreamPool* pool) : pool_(pool) { | |
29 } | |
30 | |
31 StreamCache::~StreamCache() { | |
32 for (ConnectedList::iterator it = active_.begin(); it != active_.end(); | |
33 ++it) { | |
34 delete it->second; | |
35 } | |
36 for (ConnectedList::iterator it = cached_.begin(); it != cached_.end(); | |
37 ++it) { | |
38 delete it->second; | |
39 } | |
40 } | |
41 | |
42 StreamInterface* StreamCache::RequestConnectedStream( | |
43 const SocketAddress& remote, int* err) { | |
44 LOG_F(LS_VERBOSE) << "(" << remote << ")"; | |
45 for (ConnectedList::iterator it = cached_.begin(); it != cached_.end(); | |
46 ++it) { | |
47 if (remote == it->first) { | |
48 it->second->SignalEvent.disconnect(this); | |
49 // Move from cached_ to active_ | |
50 active_.push_front(*it); | |
51 cached_.erase(it); | |
52 if (err) | |
53 *err = 0; | |
54 LOG_F(LS_VERBOSE) << "Providing cached stream"; | |
55 return active_.front().second; | |
56 } | |
57 } | |
58 if (StreamInterface* stream = pool_->RequestConnectedStream(remote, err)) { | |
59 // We track active streams so that we can remember their address | |
60 active_.push_front(ConnectedStream(remote, stream)); | |
61 LOG_F(LS_VERBOSE) << "Providing new stream"; | |
62 return active_.front().second; | |
63 } | |
64 return NULL; | |
65 } | |
66 | |
67 void StreamCache::ReturnConnectedStream(StreamInterface* stream) { | |
68 for (ConnectedList::iterator it = active_.begin(); it != active_.end(); | |
69 ++it) { | |
70 if (stream == it->second) { | |
71 LOG_F(LS_VERBOSE) << "(" << it->first << ")"; | |
72 if (stream->GetState() == SS_CLOSED) { | |
73 // Return closed streams | |
74 LOG_F(LS_VERBOSE) << "Returning closed stream"; | |
75 pool_->ReturnConnectedStream(it->second); | |
76 } else { | |
77 // Monitor open streams | |
78 stream->SignalEvent.connect(this, &StreamCache::OnStreamEvent); | |
79 LOG_F(LS_VERBOSE) << "Caching stream"; | |
80 cached_.push_front(*it); | |
81 } | |
82 active_.erase(it); | |
83 return; | |
84 } | |
85 } | |
86 RTC_NOTREACHED(); | |
87 } | |
88 | |
89 void StreamCache::OnStreamEvent(StreamInterface* stream, int events, int err) { | |
90 if ((events & SE_CLOSE) == 0) { | |
91 LOG_F(LS_WARNING) << "(" << events << ", " << err | |
92 << ") received non-close event"; | |
93 return; | |
94 } | |
95 for (ConnectedList::iterator it = cached_.begin(); it != cached_.end(); | |
96 ++it) { | |
97 if (stream == it->second) { | |
98 LOG_F(LS_VERBOSE) << "(" << it->first << ")"; | |
99 // We don't cache closed streams, so return it. | |
100 it->second->SignalEvent.disconnect(this); | |
101 LOG_F(LS_VERBOSE) << "Returning closed stream"; | |
102 pool_->ReturnConnectedStream(it->second); | |
103 cached_.erase(it); | |
104 return; | |
105 } | |
106 } | |
107 RTC_NOTREACHED(); | |
108 } | |
109 | |
110 ////////////////////////////////////////////////////////////////////// | |
111 // NewSocketPool | |
112 ////////////////////////////////////////////////////////////////////// | |
113 | |
114 NewSocketPool::NewSocketPool(SocketFactory* factory) : factory_(factory) { | |
115 } | |
116 | |
117 NewSocketPool::~NewSocketPool() { | |
118 } | |
119 | |
120 StreamInterface* | |
121 NewSocketPool::RequestConnectedStream(const SocketAddress& remote, int* err) { | |
122 AsyncSocket* socket = | |
123 factory_->CreateAsyncSocket(remote.family(), SOCK_STREAM); | |
124 if (!socket) { | |
125 if (err) | |
126 *err = -1; | |
127 return NULL; | |
128 } | |
129 if ((socket->Connect(remote) != 0) && !socket->IsBlocking()) { | |
130 if (err) | |
131 *err = socket->GetError(); | |
132 delete socket; | |
133 return NULL; | |
134 } | |
135 if (err) | |
136 *err = 0; | |
137 return new SocketStream(socket); | |
138 } | |
139 | |
140 void | |
141 NewSocketPool::ReturnConnectedStream(StreamInterface* stream) { | |
142 Thread::Current()->Dispose(stream); | |
143 } | |
144 | |
145 ////////////////////////////////////////////////////////////////////// | |
146 // ReuseSocketPool | |
147 ////////////////////////////////////////////////////////////////////// | |
148 | |
149 ReuseSocketPool::ReuseSocketPool(SocketFactory* factory) | |
150 : factory_(factory), stream_(NULL), checked_out_(false) { | |
151 } | |
152 | |
153 ReuseSocketPool::~ReuseSocketPool() { | |
154 RTC_DCHECK(!checked_out_); | |
155 delete stream_; | |
156 } | |
157 | |
158 StreamInterface* | |
159 ReuseSocketPool::RequestConnectedStream(const SocketAddress& remote, int* err) { | |
160 // Only one socket can be used from this "pool" at a time | |
161 RTC_DCHECK(!checked_out_); | |
162 if (!stream_) { | |
163 LOG_F(LS_VERBOSE) << "Creating new socket"; | |
164 int family = remote.family(); | |
165 // TODO: Deal with this when we/I clean up DNS resolution. | |
166 if (remote.IsUnresolvedIP()) { | |
167 family = AF_INET; | |
168 } | |
169 AsyncSocket* socket = | |
170 factory_->CreateAsyncSocket(family, SOCK_STREAM); | |
171 if (!socket) { | |
172 if (err) | |
173 *err = -1; | |
174 return NULL; | |
175 } | |
176 stream_ = new SocketStream(socket); | |
177 } | |
178 if ((stream_->GetState() == SS_OPEN) && (remote == remote_)) { | |
179 LOG_F(LS_VERBOSE) << "Reusing connection to: " << remote_; | |
180 } else { | |
181 remote_ = remote; | |
182 stream_->Close(); | |
183 if ((stream_->GetSocket()->Connect(remote_) != 0) | |
184 && !stream_->GetSocket()->IsBlocking()) { | |
185 if (err) | |
186 *err = stream_->GetSocket()->GetError(); | |
187 return NULL; | |
188 } else { | |
189 LOG_F(LS_VERBOSE) << "Opening connection to: " << remote_; | |
190 } | |
191 } | |
192 stream_->SignalEvent.disconnect(this); | |
193 checked_out_ = true; | |
194 if (err) | |
195 *err = 0; | |
196 return stream_; | |
197 } | |
198 | |
199 void | |
200 ReuseSocketPool::ReturnConnectedStream(StreamInterface* stream) { | |
201 RTC_DCHECK(stream == stream_); | |
202 RTC_DCHECK(checked_out_); | |
203 checked_out_ = false; | |
204 // Until the socket is reused, monitor it to determine if it closes. | |
205 stream_->SignalEvent.connect(this, &ReuseSocketPool::OnStreamEvent); | |
206 } | |
207 | |
208 void | |
209 ReuseSocketPool::OnStreamEvent(StreamInterface* stream, int events, int err) { | |
210 RTC_DCHECK(stream == stream_); | |
211 RTC_DCHECK(!checked_out_); | |
212 | |
213 // If the stream was written to and then immediately returned to us then | |
214 // we may get a writable notification for it, which we should ignore. | |
215 if (events == SE_WRITE) { | |
216 LOG_F(LS_VERBOSE) << "Pooled Socket unexpectedly writable: ignoring"; | |
217 return; | |
218 } | |
219 | |
220 // If the peer sent data, we can't process it, so drop the connection. | |
221 // If the socket has closed, clean it up. | |
222 // In either case, we'll reconnect it the next time it is used. | |
223 RTC_DCHECK(0 != (events & (SE_READ | SE_CLOSE))); | |
224 if (0 != (events & SE_CLOSE)) { | |
225 LOG_F(LS_VERBOSE) << "Connection closed with error: " << err; | |
226 } else { | |
227 LOG_F(LS_VERBOSE) << "Pooled Socket unexpectedly readable: closing"; | |
228 } | |
229 stream_->Close(); | |
230 } | |
231 | |
232 /////////////////////////////////////////////////////////////////////////////// | |
233 // LoggingPoolAdapter - Adapts a StreamPool to supply streams with attached | |
234 // LoggingAdapters. | |
235 /////////////////////////////////////////////////////////////////////////////// | |
236 | |
237 LoggingPoolAdapter::LoggingPoolAdapter( | |
238 StreamPool* pool, LoggingSeverity level, const std::string& label, | |
239 bool binary_mode) | |
240 : pool_(pool), level_(level), label_(label), binary_mode_(binary_mode) { | |
241 } | |
242 | |
243 LoggingPoolAdapter::~LoggingPoolAdapter() { | |
244 for (StreamList::iterator it = recycle_bin_.begin(); | |
245 it != recycle_bin_.end(); ++it) { | |
246 delete *it; | |
247 } | |
248 } | |
249 | |
250 StreamInterface* LoggingPoolAdapter::RequestConnectedStream( | |
251 const SocketAddress& remote, int* err) { | |
252 if (StreamInterface* stream = pool_->RequestConnectedStream(remote, err)) { | |
253 RTC_DCHECK(SS_CLOSED != stream->GetState()); | |
254 std::stringstream ss; | |
255 ss << label_ << "(0x" << std::setfill('0') << std::hex << std::setw(8) | |
256 << stream << ")"; | |
257 LOG_V(level_) << ss.str() | |
258 << ((SS_OPEN == stream->GetState()) ? " Connected" | |
259 : " Connecting") | |
260 << " to " << remote; | |
261 if (recycle_bin_.empty()) { | |
262 return new LoggingAdapter(stream, level_, ss.str(), binary_mode_); | |
263 } | |
264 LoggingAdapter* logging = recycle_bin_.front(); | |
265 recycle_bin_.pop_front(); | |
266 logging->set_label(ss.str()); | |
267 logging->Attach(stream); | |
268 return logging; | |
269 } | |
270 return NULL; | |
271 } | |
272 | |
273 void LoggingPoolAdapter::ReturnConnectedStream(StreamInterface* stream) { | |
274 LoggingAdapter* logging = static_cast<LoggingAdapter*>(stream); | |
275 pool_->ReturnConnectedStream(logging->Detach()); | |
276 recycle_bin_.push_back(logging); | |
277 } | |
278 | |
279 /////////////////////////////////////////////////////////////////////////////// | |
280 | |
281 } // namespace rtc | |
OLD | NEW |