Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(3)

Side by Side Diff: webrtc/p2p/base/session.cc

Issue 1350523003: TransportController refactoring. (Closed) Base URL: https://chromium.googlesource.com/external/webrtc.git@master
Patch Set: Fixing Mac test. Created 5 years, 2 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « webrtc/p2p/base/session.h ('k') | webrtc/p2p/base/session_unittest.cc » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 /* 1 /*
2 * Copyright 2004 The WebRTC Project Authors. All rights reserved. 2 * Copyright 2004 The WebRTC Project Authors. All rights reserved.
3 * 3 *
4 * Use of this source code is governed by a BSD-style license 4 * Use of this source code is governed by a BSD-style license
5 * that can be found in the LICENSE file in the root of the source 5 * that can be found in the LICENSE file in the root of the source
6 * tree. An additional intellectual property rights grant can be found 6 * tree. An additional intellectual property rights grant can be found
7 * in the file PATENTS. All contributing project authors may 7 * in the file PATENTS. All contributing project authors may
8 * be found in the AUTHORS file in the root of the source tree. 8 * be found in the AUTHORS file in the root of the source tree.
9 */ 9 */
10 10
11 #include "webrtc/p2p/base/session.h" 11 #include "webrtc/p2p/base/session.h"
12 12
13 #include "webrtc/p2p/base/dtlstransport.h"
14 #include "webrtc/p2p/base/p2ptransport.h"
15 #include "webrtc/p2p/base/transport.h"
16 #include "webrtc/p2p/base/transportchannelproxy.h"
17 #include "webrtc/p2p/base/transportinfo.h"
18 #include "webrtc/base/bind.h" 13 #include "webrtc/base/bind.h"
19 #include "webrtc/base/common.h" 14 #include "webrtc/base/common.h"
20 #include "webrtc/base/helpers.h" 15 #include "webrtc/base/helpers.h"
21 #include "webrtc/base/logging.h" 16 #include "webrtc/base/logging.h"
22 #include "webrtc/base/scoped_ptr.h" 17 #include "webrtc/base/scoped_ptr.h"
23 #include "webrtc/base/stringencode.h" 18 #include "webrtc/base/stringencode.h"
24 #include "webrtc/base/sslstreamadapter.h" 19 #include "webrtc/base/sslstreamadapter.h"
25 20 #include "webrtc/p2p/base/transport.h"
21 #include "webrtc/p2p/base/transportinfo.h"
22 #include "webrtc/p2p/base/transportcontroller.h"
26 #include "webrtc/p2p/base/constants.h" 23 #include "webrtc/p2p/base/constants.h"
27 24
28 namespace cricket { 25 namespace cricket {
29 26
30 using rtc::Bind; 27 using rtc::Bind;
31 28
32 TransportProxy::~TransportProxy() {
33 for (ChannelMap::iterator iter = channels_.begin();
34 iter != channels_.end(); ++iter) {
35 iter->second->SignalDestroyed(iter->second);
36 delete iter->second;
37 }
38 }
39
40 TransportChannel* TransportProxy::GetChannel(int component) {
41 ASSERT(rtc::Thread::Current() == worker_thread_);
42 return GetChannelProxy(component);
43 }
44
45 TransportChannel* TransportProxy::CreateChannel(int component) {
46 ASSERT(rtc::Thread::Current() == worker_thread_);
47 ASSERT(GetChannel(component) == NULL);
48 ASSERT(!transport_->get()->HasChannel(component));
49
50 // We always create a proxy in case we need to change out the transport later.
51 TransportChannelProxy* channel_proxy =
52 new TransportChannelProxy(content_name(), component);
53 channels_[component] = channel_proxy;
54
55 // If we're already negotiated, create an impl and hook it up to the proxy
56 // channel. If we're connecting, create an impl but don't hook it up yet.
57 if (negotiated_) {
58 CreateChannelImpl_w(component);
59 SetChannelImplFromTransport_w(channel_proxy, component);
60 } else if (connecting_) {
61 CreateChannelImpl_w(component);
62 }
63 return channel_proxy;
64 }
65
66 bool TransportProxy::HasChannel(int component) {
67 return transport_->get()->HasChannel(component);
68 }
69
70 void TransportProxy::DestroyChannel(int component) {
71 ASSERT(rtc::Thread::Current() == worker_thread_);
72 TransportChannelProxy* channel_proxy = GetChannelProxy(component);
73 if (channel_proxy) {
74 // If the state of TransportProxy is not NEGOTIATED then
75 // TransportChannelProxy and its impl are not connected. Both must
76 // be connected before deletion.
77 //
78 // However, if we haven't entered the connecting state then there
79 // is no implementation to hook up.
80 if (connecting_ && !negotiated_) {
81 SetChannelImplFromTransport_w(channel_proxy, component);
82 }
83
84 channels_.erase(component);
85 channel_proxy->SignalDestroyed(channel_proxy);
86 delete channel_proxy;
87 }
88 }
89
90 void TransportProxy::ConnectChannels() {
91 if (!connecting_) {
92 if (!negotiated_) {
93 for (auto& iter : channels_) {
94 CreateChannelImpl(iter.first);
95 }
96 }
97 connecting_ = true;
98 }
99 // TODO(juberti): Right now Transport::ConnectChannels doesn't work if we
100 // don't have any channels yet, so we need to allow this method to be called
101 // multiple times. Once we fix Transport, we can move this call inside the
102 // if (!connecting_) block.
103 transport_->get()->ConnectChannels();
104 }
105
106 void TransportProxy::CompleteNegotiation() {
107 if (!negotiated_) {
108 // Negotiating assumes connecting_ has happened and
109 // implementations exist. If not we need to create the
110 // implementations.
111 for (auto& iter : channels_) {
112 if (!connecting_) {
113 CreateChannelImpl(iter.first);
114 }
115 SetChannelImplFromTransport(iter.second, iter.first);
116 }
117 negotiated_ = true;
118 }
119 }
120
121 void TransportProxy::AddSentCandidates(const Candidates& candidates) {
122 for (Candidates::const_iterator cand = candidates.begin();
123 cand != candidates.end(); ++cand) {
124 sent_candidates_.push_back(*cand);
125 }
126 }
127
128 void TransportProxy::AddUnsentCandidates(const Candidates& candidates) {
129 for (Candidates::const_iterator cand = candidates.begin();
130 cand != candidates.end(); ++cand) {
131 unsent_candidates_.push_back(*cand);
132 }
133 }
134
135 TransportChannelProxy* TransportProxy::GetChannelProxy(int component) const {
136 ChannelMap::const_iterator iter = channels_.find(component);
137 return (iter != channels_.end()) ? iter->second : NULL;
138 }
139
140 void TransportProxy::CreateChannelImpl(int component) {
141 worker_thread_->Invoke<void>(Bind(
142 &TransportProxy::CreateChannelImpl_w, this, component));
143 }
144
145 void TransportProxy::CreateChannelImpl_w(int component) {
146 ASSERT(rtc::Thread::Current() == worker_thread_);
147 transport_->get()->CreateChannel(component);
148 }
149
150 void TransportProxy::SetChannelImplFromTransport(TransportChannelProxy* proxy,
151 int component) {
152 worker_thread_->Invoke<void>(Bind(
153 &TransportProxy::SetChannelImplFromTransport_w, this, proxy, component));
154 }
155
156 void TransportProxy::SetChannelImplFromTransport_w(TransportChannelProxy* proxy,
157 int component) {
158 ASSERT(rtc::Thread::Current() == worker_thread_);
159 TransportChannelImpl* impl = transport_->get()->GetChannel(component);
160 ASSERT(impl != NULL);
161 ReplaceChannelImpl_w(proxy, impl);
162 }
163
164 void TransportProxy::ReplaceChannelImpl(TransportChannelProxy* proxy,
165 TransportChannelImpl* impl) {
166 worker_thread_->Invoke<void>(Bind(
167 &TransportProxy::ReplaceChannelImpl_w, this, proxy, impl));
168 }
169
170 void TransportProxy::ReplaceChannelImpl_w(TransportChannelProxy* proxy,
171 TransportChannelImpl* impl) {
172 ASSERT(rtc::Thread::Current() == worker_thread_);
173 ASSERT(proxy != NULL);
174 proxy->SetImplementation(impl);
175 }
176
177 // This function muxes |this| onto |target| by repointing |this| at
178 // |target|'s transport and setting our TransportChannelProxies
179 // to point to |target|'s underlying implementations.
180 bool TransportProxy::SetupMux(TransportProxy* target) {
181 // Bail out if there's nothing to do.
182 if (transport_ == target->transport_) {
183 return true;
184 }
185
186 // Run through all channels and remove any non-rtp transport channels before
187 // setting target transport channels.
188 for (ChannelMap::const_iterator iter = channels_.begin();
189 iter != channels_.end(); ++iter) {
190 if (!target->transport_->get()->HasChannel(iter->first)) {
191 // Remove if channel doesn't exist in |transport_|.
192 ReplaceChannelImpl(iter->second, NULL);
193 } else {
194 // Replace the impl for all the TransportProxyChannels with the channels
195 // from |target|'s transport. Fail if there's not an exact match.
196 ReplaceChannelImpl(
197 iter->second, target->transport_->get()->CreateChannel(iter->first));
198 }
199 }
200
201 // Now replace our transport. Must happen afterwards because
202 // it deletes all impls as a side effect.
203 transport_ = target->transport_;
204 transport_->get()->SignalCandidatesReady.connect(
205 this, &TransportProxy::OnTransportCandidatesReady);
206 set_candidates_allocated(target->candidates_allocated());
207 return true;
208 }
209
210 void TransportProxy::SetIceRole(IceRole role) {
211 transport_->get()->SetIceRole(role);
212 }
213
214 bool TransportProxy::SetLocalTransportDescription(
215 const TransportDescription& description,
216 ContentAction action,
217 std::string* error_desc) {
218 // If this is an answer, finalize the negotiation.
219 if (action == CA_ANSWER) {
220 CompleteNegotiation();
221 }
222 bool result = transport_->get()->SetLocalTransportDescription(description,
223 action,
224 error_desc);
225 if (result)
226 local_description_set_ = true;
227 return result;
228 }
229
230 bool TransportProxy::SetRemoteTransportDescription(
231 const TransportDescription& description,
232 ContentAction action,
233 std::string* error_desc) {
234 // If this is an answer, finalize the negotiation.
235 if (action == CA_ANSWER) {
236 CompleteNegotiation();
237 }
238 bool result = transport_->get()->SetRemoteTransportDescription(description,
239 action,
240 error_desc);
241 if (result)
242 remote_description_set_ = true;
243 return result;
244 }
245
246 void TransportProxy::OnSignalingReady() {
247 // If we're starting a new allocation sequence, reset our state.
248 set_candidates_allocated(false);
249 transport_->get()->OnSignalingReady();
250 }
251
252 bool TransportProxy::OnRemoteCandidates(const Candidates& candidates,
253 std::string* error) {
254 // Ensure the transport is negotiated before handling candidates.
255 // TODO(juberti): Remove this once everybody calls SetLocalTD.
256 CompleteNegotiation();
257
258 // Ignore candidates for if the proxy content_name doesn't match the content
259 // name of the actual transport. This stops video candidates from being sent
260 // down to the audio transport when BUNDLE is enabled.
261 if (content_name_ != transport_->get()->content_name()) {
262 return true;
263 }
264
265 // Verify each candidate before passing down to transport layer.
266 for (Candidates::const_iterator cand = candidates.begin();
267 cand != candidates.end(); ++cand) {
268 if (!transport_->get()->VerifyCandidate(*cand, error))
269 return false;
270 if (!HasChannel(cand->component())) {
271 *error = "Candidate has unknown component: " + cand->ToString() +
272 " for content: " + content_name_;
273 return false;
274 }
275 }
276 transport_->get()->OnRemoteCandidates(candidates);
277 return true;
278 }
279
280 void TransportProxy::SetCertificate(
281 const rtc::scoped_refptr<rtc::RTCCertificate>& certificate) {
282 transport_->get()->SetCertificate(certificate);
283 }
284
285 std::string BaseSession::StateToString(State state) { 29 std::string BaseSession::StateToString(State state) {
286 switch (state) { 30 switch (state) {
287 case STATE_INIT: 31 case STATE_INIT:
288 return "STATE_INIT"; 32 return "STATE_INIT";
289 case STATE_SENTINITIATE: 33 case STATE_SENTINITIATE:
290 return "STATE_SENTINITIATE"; 34 return "STATE_SENTINITIATE";
291 case STATE_RECEIVEDINITIATE: 35 case STATE_RECEIVEDINITIATE:
292 return "STATE_RECEIVEDINITIATE"; 36 return "STATE_RECEIVEDINITIATE";
293 case STATE_SENTPRACCEPT: 37 case STATE_SENTPRACCEPT:
294 return "STATE_SENTPRACCEPT"; 38 return "STATE_SENTPRACCEPT";
(...skipping 24 matching lines...) Expand all
319 default: 63 default:
320 break; 64 break;
321 } 65 }
322 return "STATE_" + rtc::ToString(state); 66 return "STATE_" + rtc::ToString(state);
323 } 67 }
324 68
325 BaseSession::BaseSession(rtc::Thread* signaling_thread, 69 BaseSession::BaseSession(rtc::Thread* signaling_thread,
326 rtc::Thread* worker_thread, 70 rtc::Thread* worker_thread,
327 PortAllocator* port_allocator, 71 PortAllocator* port_allocator,
328 const std::string& sid, 72 const std::string& sid,
329 const std::string& content_type,
330 bool initiator) 73 bool initiator)
331 : state_(STATE_INIT), 74 : state_(STATE_INIT),
332 error_(ERROR_NONE), 75 error_(ERROR_NONE),
333 signaling_thread_(signaling_thread), 76 signaling_thread_(signaling_thread),
334 worker_thread_(worker_thread), 77 worker_thread_(worker_thread),
335 port_allocator_(port_allocator), 78 port_allocator_(port_allocator),
336 sid_(sid), 79 sid_(sid),
337 content_type_(content_type), 80 transport_controller_(new TransportController(signaling_thread,
338 initiator_(initiator), 81 worker_thread,
339 ssl_max_version_(rtc::SSL_PROTOCOL_DTLS_10), 82 port_allocator)) {
340 ice_tiebreaker_(rtc::CreateRandomId64()),
341 role_switch_(false),
342 ice_receiving_timeout_(-1) {
343 ASSERT(signaling_thread->IsCurrent()); 83 ASSERT(signaling_thread->IsCurrent());
84 set_initiator(initiator);
344 } 85 }
345 86
346 BaseSession::~BaseSession() { 87 BaseSession::~BaseSession() {
347 ASSERT(signaling_thread()->IsCurrent()); 88 ASSERT(signaling_thread()->IsCurrent());
348 89
349 ASSERT(state_ != STATE_DEINIT); 90 ASSERT(state_ != STATE_DEINIT);
350 LogState(state_, STATE_DEINIT); 91 LogState(state_, STATE_DEINIT);
351 state_ = STATE_DEINIT; 92 state_ = STATE_DEINIT;
352 SignalState(this, state_); 93 SignalState(this, state_);
353
354 for (TransportMap::iterator iter = transports_.begin();
355 iter != transports_.end(); ++iter) {
356 delete iter->second;
357 }
358 } 94 }
359 95
360 const SessionDescription* BaseSession::local_description() const { 96 const SessionDescription* BaseSession::local_description() const {
361 // TODO(tommi): Assert on thread correctness. 97 // TODO(tommi): Assert on thread correctness.
362 return local_description_.get(); 98 return local_description_.get();
363 } 99 }
364 100
365 const SessionDescription* BaseSession::remote_description() const { 101 const SessionDescription* BaseSession::remote_description() const {
366 // TODO(tommi): Assert on thread correctness. 102 // TODO(tommi): Assert on thread correctness.
367 return remote_description_.get(); 103 return remote_description_.get();
368 } 104 }
369 105
370 SessionDescription* BaseSession::remote_description() { 106 SessionDescription* BaseSession::remote_description() {
371 // TODO(tommi): Assert on thread correctness. 107 // TODO(tommi): Assert on thread correctness.
372 return remote_description_.get(); 108 return remote_description_.get();
373 } 109 }
374 110
375 void BaseSession::set_local_description(const SessionDescription* sdesc) { 111 void BaseSession::set_local_description(const SessionDescription* sdesc) {
376 // TODO(tommi): Assert on thread correctness. 112 // TODO(tommi): Assert on thread correctness.
377 if (sdesc != local_description_.get()) 113 if (sdesc != local_description_.get())
378 local_description_.reset(sdesc); 114 local_description_.reset(sdesc);
379 } 115 }
380 116
381 void BaseSession::set_remote_description(SessionDescription* sdesc) { 117 void BaseSession::set_remote_description(SessionDescription* sdesc) {
382 // TODO(tommi): Assert on thread correctness. 118 // TODO(tommi): Assert on thread correctness.
383 if (sdesc != remote_description_) 119 if (sdesc != remote_description_)
384 remote_description_.reset(sdesc); 120 remote_description_.reset(sdesc);
385 } 121 }
386 122
123 void BaseSession::set_initiator(bool initiator) {
124 initiator_ = initiator;
125
126 IceRole ice_role = initiator ? ICEROLE_CONTROLLING : ICEROLE_CONTROLLED;
127 transport_controller_->SetIceRole(ice_role);
128 }
129
387 const SessionDescription* BaseSession::initiator_description() const { 130 const SessionDescription* BaseSession::initiator_description() const {
388 // TODO(tommi): Assert on thread correctness. 131 // TODO(tommi): Assert on thread correctness.
389 return initiator_ ? local_description_.get() : remote_description_.get(); 132 return initiator_ ? local_description_.get() : remote_description_.get();
390 } 133 }
391 134
392 bool BaseSession::SetCertificate(
393 const rtc::scoped_refptr<rtc::RTCCertificate>& certificate) {
394 if (certificate_)
395 return false;
396 if (!certificate)
397 return false;
398 certificate_ = certificate;
399 for (TransportMap::iterator iter = transports_.begin();
400 iter != transports_.end(); ++iter) {
401 iter->second->SetCertificate(certificate_);
402 }
403 return true;
404 }
405
406 bool BaseSession::SetSslMaxProtocolVersion(rtc::SSLProtocolVersion version) {
407 if (state_ != STATE_INIT) {
408 return false;
409 }
410
411 ssl_max_version_ = version;
412 return true;
413 }
414
415 bool BaseSession::PushdownTransportDescription(ContentSource source, 135 bool BaseSession::PushdownTransportDescription(ContentSource source,
416 ContentAction action, 136 ContentAction action,
417 std::string* error_desc) { 137 std::string* error_desc) {
138 ASSERT(signaling_thread()->IsCurrent());
139
418 if (source == CS_LOCAL) { 140 if (source == CS_LOCAL) {
419 return PushdownLocalTransportDescription(local_description(), 141 return PushdownLocalTransportDescription(local_description(),
420 action, 142 action,
421 error_desc); 143 error_desc);
422 } 144 }
423 return PushdownRemoteTransportDescription(remote_description(), 145 return PushdownRemoteTransportDescription(remote_description(),
424 action, 146 action,
425 error_desc); 147 error_desc);
426 } 148 }
427 149
428 bool BaseSession::PushdownLocalTransportDescription( 150 bool BaseSession::PushdownLocalTransportDescription(
429 const SessionDescription* sdesc, 151 const SessionDescription* sdesc,
430 ContentAction action, 152 ContentAction action,
431 std::string* error_desc) { 153 std::string* err) {
432 // Update the Transports with the right information, and trigger them to 154 ASSERT(signaling_thread()->IsCurrent());
433 // start connecting.
434 for (TransportMap::iterator iter = transports_.begin();
435 iter != transports_.end(); ++iter) {
436 // If no transport info was in this session description, ret == false
437 // and we just skip this one.
438 TransportDescription tdesc;
439 bool ret = GetTransportDescription(
440 sdesc, iter->second->content_name(), &tdesc);
441 if (ret) {
442 if (!iter->second->SetLocalTransportDescription(tdesc, action,
443 error_desc)) {
444 return false;
445 }
446 155
447 iter->second->ConnectChannels(); 156 if (!sdesc) {
157 return false;
158 }
159
160 for (const TransportInfo& tinfo : sdesc->transport_infos()) {
161 if (!transport_controller_->SetLocalTransportDescription(
162 tinfo.content_name, tinfo.description, action, err)) {
163 return false;
448 } 164 }
449 } 165 }
450 166
451 return true; 167 return true;
452 } 168 }
453 169
454 bool BaseSession::PushdownRemoteTransportDescription( 170 bool BaseSession::PushdownRemoteTransportDescription(
455 const SessionDescription* sdesc, 171 const SessionDescription* sdesc,
456 ContentAction action, 172 ContentAction action,
457 std::string* error_desc) { 173 std::string* err) {
458 // Update the Transports with the right information. 174 ASSERT(signaling_thread()->IsCurrent());
459 for (TransportMap::iterator iter = transports_.begin();
460 iter != transports_.end(); ++iter) {
461 TransportDescription tdesc;
462 175
463 // If no transport info was in this session description, ret == false 176 if (!sdesc) {
464 // and we just skip this one. 177 return false;
465 bool ret = GetTransportDescription( 178 }
466 sdesc, iter->second->content_name(), &tdesc); 179
467 if (ret) { 180 for (const TransportInfo& tinfo : sdesc->transport_infos()) {
468 if (!iter->second->SetRemoteTransportDescription(tdesc, action, 181 if (!transport_controller_->SetRemoteTransportDescription(
469 error_desc)) { 182 tinfo.content_name, tinfo.description, action, err)) {
470 return false; 183 return false;
471 }
472 } 184 }
473 } 185 }
474 186
475 return true; 187 return true;
476 } 188 }
477 189
478 void BaseSession::SetIceConnectionReceivingTimeout(int timeout_ms) {
479 ice_receiving_timeout_ = timeout_ms;
480 for (const auto& kv : transport_proxies()) {
481 Transport* transport = kv.second->impl();
482 if (transport) {
483 transport->SetChannelReceivingTimeout(timeout_ms);
484 }
485 }
486 }
487
488 TransportChannel* BaseSession::CreateChannel(const std::string& content_name,
489 int component) {
490 // We create the proxy "on demand" here because we need to support
491 // creating channels at any time, even before we send or receive
492 // initiate messages, which is before we create the transports.
493 TransportProxy* transproxy = GetOrCreateTransportProxy(content_name);
494 return transproxy->CreateChannel(component);
495 }
496
497 TransportChannel* BaseSession::GetChannel(const std::string& content_name,
498 int component) {
499 TransportProxy* transproxy = GetTransportProxy(content_name);
500 if (transproxy == NULL)
501 return NULL;
502
503 return transproxy->GetChannel(component);
504 }
505
506 void BaseSession::DestroyChannel(const std::string& content_name,
507 int component) {
508 TransportProxy* transproxy = GetTransportProxy(content_name);
509 ASSERT(transproxy != NULL);
510 transproxy->DestroyChannel(component);
511 }
512
513 TransportProxy* BaseSession::GetOrCreateTransportProxy(
514 const std::string& content_name) {
515 TransportProxy* transproxy = GetTransportProxy(content_name);
516 if (transproxy)
517 return transproxy;
518
519 Transport* transport = CreateTransport(content_name);
520 transport->SetIceRole(initiator_ ? ICEROLE_CONTROLLING : ICEROLE_CONTROLLED);
521 transport->SetIceTiebreaker(ice_tiebreaker_);
522 transport->SetSslMaxProtocolVersion(ssl_max_version_);
523 // TODO: Connect all the Transport signals to TransportProxy
524 // then to the BaseSession.
525 transport->SignalConnecting.connect(
526 this, &BaseSession::OnTransportConnecting);
527 transport->SignalWritableState.connect(
528 this, &BaseSession::OnTransportWritable);
529 transport->SignalReceivingState.connect(
530 this, &BaseSession::OnTransportReceiving);
531 transport->SignalRequestSignaling.connect(
532 this, &BaseSession::OnTransportRequestSignaling);
533 transport->SignalRouteChange.connect(
534 this, &BaseSession::OnTransportRouteChange);
535 transport->SignalCandidatesAllocationDone.connect(
536 this, &BaseSession::OnTransportCandidatesAllocationDone);
537 transport->SignalRoleConflict.connect(
538 this, &BaseSession::OnRoleConflict);
539 transport->SignalCompleted.connect(
540 this, &BaseSession::OnTransportCompleted);
541 transport->SignalFailed.connect(
542 this, &BaseSession::OnTransportFailed);
543
544 transproxy = new TransportProxy(worker_thread_, sid_, content_name,
545 new TransportWrapper(transport));
546 transproxy->SignalCandidatesReady.connect(
547 this, &BaseSession::OnTransportProxyCandidatesReady);
548 if (certificate_)
549 transproxy->SetCertificate(certificate_);
550 transports_[content_name] = transproxy;
551
552 return transproxy;
553 }
554
555 Transport* BaseSession::GetTransport(const std::string& content_name) {
556 TransportProxy* transproxy = GetTransportProxy(content_name);
557 if (transproxy == NULL)
558 return NULL;
559 return transproxy->impl();
560 }
561
562 TransportProxy* BaseSession::GetTransportProxy(
563 const std::string& content_name) {
564 TransportMap::iterator iter = transports_.find(content_name);
565 return (iter != transports_.end()) ? iter->second : NULL;
566 }
567
568 void BaseSession::DestroyTransportProxy(
569 const std::string& content_name) {
570 TransportMap::iterator iter = transports_.find(content_name);
571 if (iter != transports_.end()) {
572 delete iter->second;
573 transports_.erase(content_name);
574 }
575 }
576
577 Transport* BaseSession::CreateTransport(const std::string& content_name) {
578 Transport* transport = new DtlsTransport<P2PTransport>(
579 signaling_thread(), worker_thread(), content_name, port_allocator(),
580 certificate_);
581 transport->SetChannelReceivingTimeout(ice_receiving_timeout_);
582 return transport;
583 }
584
585 void BaseSession::SetState(State state) { 190 void BaseSession::SetState(State state) {
586 ASSERT(signaling_thread_->IsCurrent()); 191 ASSERT(signaling_thread_->IsCurrent());
587 if (state != state_) { 192 if (state != state_) {
588 LogState(state_, state); 193 LogState(state_, state);
589 state_ = state; 194 state_ = state;
590 SignalState(this, state_); 195 SignalState(this, state_);
591 signaling_thread_->Post(this, MSG_STATE); 196 signaling_thread_->Post(this, MSG_STATE);
592 } 197 }
593 } 198 }
594 199
595 void BaseSession::SetError(Error error, const std::string& error_desc) { 200 void BaseSession::SetError(Error error, const std::string& error_desc) {
596 ASSERT(signaling_thread_->IsCurrent()); 201 ASSERT(signaling_thread_->IsCurrent());
597 if (error != error_) { 202 if (error != error_) {
598 error_ = error; 203 error_ = error;
599 error_desc_ = error_desc; 204 error_desc_ = error_desc;
600 SignalError(this, error); 205 SignalError(this, error);
601 } 206 }
602 } 207 }
603 208
604 void BaseSession::OnSignalingReady() { 209 void BaseSession::SetIceConnectionReceivingTimeout(int timeout_ms) {
605 ASSERT(signaling_thread()->IsCurrent()); 210 transport_controller_->SetIceConnectionReceivingTimeout(timeout_ms);
606 for (TransportMap::iterator iter = transports_.begin();
607 iter != transports_.end(); ++iter) {
608 iter->second->OnSignalingReady();
609 }
610 } 211 }
611 212
612 // TODO(juberti): Since PushdownLocalTD now triggers the connection process to 213 void BaseSession::MaybeStartGathering() {
613 // start, remove this method once everyone calls PushdownLocalTD. 214 transport_controller_->MaybeStartGathering();
614 void BaseSession::SpeculativelyConnectAllTransportChannels() {
615 // Put all transports into the connecting state.
616 for (TransportMap::iterator iter = transports_.begin();
617 iter != transports_.end(); ++iter) {
618 iter->second->ConnectChannels();
619 }
620 }
621
622 bool BaseSession::OnRemoteCandidates(const std::string& content_name,
623 const Candidates& candidates,
624 std::string* error) {
625 // Give candidates to the appropriate transport, and tell that transport
626 // to start connecting, if it's not already doing so.
627 TransportProxy* transproxy = GetTransportProxy(content_name);
628 if (!transproxy) {
629 *error = "Unknown content name " + content_name;
630 return false;
631 }
632 if (!transproxy->OnRemoteCandidates(candidates, error)) {
633 return false;
634 }
635 // TODO(juberti): Remove this call once we can be sure that we always have
636 // a local transport description (which will trigger the connection).
637 transproxy->ConnectChannels();
638 return true;
639 }
640
641 bool BaseSession::MaybeEnableMuxingSupport() {
642 // We need both a local and remote description to decide if we should mux.
643 if ((state_ == STATE_SENTINITIATE ||
644 state_ == STATE_RECEIVEDINITIATE) &&
645 ((local_description_ == NULL) ||
646 (remote_description_ == NULL))) {
647 return false;
648 }
649
650 // In order to perform the multiplexing, we need all proxies to be in the
651 // negotiated state, i.e. to have implementations underneath.
652 // Ensure that this is the case, regardless of whether we are going to mux.
653 for (TransportMap::iterator iter = transports_.begin();
654 iter != transports_.end(); ++iter) {
655 ASSERT(iter->second->negotiated());
656 if (!iter->second->negotiated()) {
657 return false;
658 }
659 }
660
661 // If both sides agree to BUNDLE, mux all the specified contents onto the
662 // transport belonging to the first content name in the BUNDLE group.
663 // If the contents are already muxed, this will be a no-op.
664 // TODO(juberti): Should this check that local and remote have configured
665 // BUNDLE the same way?
666 bool candidates_allocated = IsCandidateAllocationDone();
667 const ContentGroup* local_bundle_group =
668 local_description_->GetGroupByName(GROUP_TYPE_BUNDLE);
669 const ContentGroup* remote_bundle_group =
670 remote_description_->GetGroupByName(GROUP_TYPE_BUNDLE);
671 if (local_bundle_group && remote_bundle_group) {
672 if (!BundleContentGroup(local_bundle_group)) {
673 LOG(LS_WARNING) << "Failed to set up BUNDLE";
674 return false;
675 }
676
677 // If we weren't done gathering before, we might be done now, as a result
678 // of enabling mux.
679 if (!candidates_allocated) {
680 MaybeCandidateAllocationDone();
681 }
682 } else {
683 LOG(LS_INFO) << "BUNDLE group missing from remote or local description.";
684 }
685 return true;
686 }
687
688 bool BaseSession::BundleContentGroup(const ContentGroup* bundle_group) {
689 const std::string* content_name = bundle_group->FirstContentName();
690 if (!content_name) {
691 LOG(LS_INFO) << "No content names specified in BUNDLE group.";
692 return true;
693 }
694
695 TransportProxy* selected_proxy = GetTransportProxy(*content_name);
696 if (!selected_proxy) {
697 LOG(LS_WARNING) << "No transport found for content \""
698 << *content_name << "\".";
699 return false;
700 }
701
702 for (TransportMap::iterator iter = transports_.begin();
703 iter != transports_.end(); ++iter) {
704 // If content is part of the mux group, then repoint its proxy at the
705 // transport object that we have chosen to mux onto. If the proxy
706 // is already pointing at the right object, it will be a no-op.
707 if (bundle_group->HasContentName(iter->first) &&
708 !iter->second->SetupMux(selected_proxy)) {
709 LOG(LS_WARNING) << "Failed to bundle " << iter->first << " to "
710 << *content_name;
711 return false;
712 }
713 LOG(LS_INFO) << "Bundling " << iter->first << " to " << *content_name;
714 }
715
716 return true;
717 }
718
719 void BaseSession::OnTransportCandidatesAllocationDone(Transport* transport) {
720 // TODO(juberti): This is a clunky way of processing the done signal. Instead,
721 // TransportProxy should receive the done signal directly, set its allocated
722 // flag internally, and then reissue the done signal to Session.
723 // Overall we should make TransportProxy receive *all* the signals from
724 // Transport, since this removes the need to manually iterate over all
725 // the transports, as is needed to make sure signals are handled properly
726 // when BUNDLEing.
727 // TODO(juberti): Per b/7998978, devs and QA are hitting this assert in ways
728 // that make it prohibitively difficult to run dbg builds. Disabled for now.
729 //ASSERT(!IsCandidateAllocationDone());
730 for (TransportMap::iterator iter = transports_.begin();
731 iter != transports_.end(); ++iter) {
732 if (iter->second->impl() == transport) {
733 iter->second->set_candidates_allocated(true);
734 }
735 }
736 MaybeCandidateAllocationDone();
737 }
738
739 bool BaseSession::IsCandidateAllocationDone() const {
740 for (TransportMap::const_iterator iter = transports_.begin();
741 iter != transports_.end(); ++iter) {
742 if (!iter->second->candidates_allocated()) {
743 LOG(LS_INFO) << "Candidate allocation not done for "
744 << iter->second->content_name();
745 return false;
746 }
747 }
748 return true;
749 }
750
751 void BaseSession::MaybeCandidateAllocationDone() {
752 if (IsCandidateAllocationDone()) {
753 LOG(LS_INFO) << "Candidate gathering is complete.";
754 OnCandidatesAllocationDone();
755 }
756 }
757
758 void BaseSession::OnRoleConflict() {
759 if (role_switch_) {
760 LOG(LS_WARNING) << "Repeat of role conflict signal from Transport.";
761 return;
762 }
763
764 role_switch_ = true;
765 for (TransportMap::iterator iter = transports_.begin();
766 iter != transports_.end(); ++iter) {
767 // Role will be reverse of initial role setting.
768 IceRole role = initiator_ ? ICEROLE_CONTROLLED : ICEROLE_CONTROLLING;
769 iter->second->SetIceRole(role);
770 }
771 } 215 }
772 216
773 void BaseSession::LogState(State old_state, State new_state) { 217 void BaseSession::LogState(State old_state, State new_state) {
774 LOG(LS_INFO) << "Session:" << id() 218 LOG(LS_INFO) << "Session:" << id()
775 << " Old state:" << StateToString(old_state) 219 << " Old state:" << StateToString(old_state)
776 << " New state:" << StateToString(new_state) 220 << " New state:" << StateToString(new_state);
777 << " Type:" << content_type();
778 } 221 }
779 222
780 // static 223 // static
781 bool BaseSession::GetTransportDescription(const SessionDescription* description, 224 bool BaseSession::GetTransportDescription(const SessionDescription* description,
782 const std::string& content_name, 225 const std::string& content_name,
783 TransportDescription* tdesc) { 226 TransportDescription* tdesc) {
784 if (!description || !tdesc) { 227 if (!description || !tdesc) {
785 return false; 228 return false;
786 } 229 }
787 const TransportInfo* transport_info = 230 const TransportInfo* transport_info =
(...skipping 21 matching lines...) Expand all
809 252
810 default: 253 default:
811 // Explicitly ignoring some states here. 254 // Explicitly ignoring some states here.
812 break; 255 break;
813 } 256 }
814 break; 257 break;
815 } 258 }
816 } 259 }
817 260
818 } // namespace cricket 261 } // namespace cricket
OLDNEW
« no previous file with comments | « webrtc/p2p/base/session.h ('k') | webrtc/p2p/base/session_unittest.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698