Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(4)

Side by Side Diff: webrtc/p2p/base/session.cc

Issue 1246913005: TransportController refactoring (Closed) Base URL: https://chromium.googlesource.com/external/webrtc.git@master
Patch Set: Set media engine on voice channel Created 5 years, 4 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 /* 1 /*
2 * Copyright 2004 The WebRTC Project Authors. All rights reserved. 2 * Copyright 2004 The WebRTC Project Authors. All rights reserved.
3 * 3 *
4 * Use of this source code is governed by a BSD-style license 4 * Use of this source code is governed by a BSD-style license
5 * that can be found in the LICENSE file in the root of the source 5 * that can be found in the LICENSE file in the root of the source
6 * tree. An additional intellectual property rights grant can be found 6 * tree. An additional intellectual property rights grant can be found
7 * in the file PATENTS. All contributing project authors may 7 * in the file PATENTS. All contributing project authors may
8 * be found in the AUTHORS file in the root of the source tree. 8 * be found in the AUTHORS file in the root of the source tree.
9 */ 9 */
10 10
11 #include "webrtc/p2p/base/session.h" 11 #include "webrtc/p2p/base/session.h"
12 12
13 #include "webrtc/p2p/base/dtlstransport.h"
14 #include "webrtc/p2p/base/p2ptransport.h"
15 #include "webrtc/p2p/base/transport.h"
16 #include "webrtc/p2p/base/transportchannelproxy.h"
17 #include "webrtc/p2p/base/transportinfo.h"
18 #include "webrtc/base/bind.h" 13 #include "webrtc/base/bind.h"
19 #include "webrtc/base/common.h" 14 #include "webrtc/base/common.h"
20 #include "webrtc/base/helpers.h" 15 #include "webrtc/base/helpers.h"
21 #include "webrtc/base/logging.h" 16 #include "webrtc/base/logging.h"
22 #include "webrtc/base/scoped_ptr.h" 17 #include "webrtc/base/scoped_ptr.h"
23 #include "webrtc/base/stringencode.h" 18 #include "webrtc/base/stringencode.h"
24 #include "webrtc/base/sslstreamadapter.h" 19 #include "webrtc/base/sslstreamadapter.h"
25 20 #include "webrtc/p2p/base/transport.h"
21 #include "webrtc/p2p/base/transportchannelproxy.h"
22 #include "webrtc/p2p/base/transportinfo.h"
23 #include "webrtc/p2p/base/transportcontroller.h"
26 #include "webrtc/p2p/base/constants.h" 24 #include "webrtc/p2p/base/constants.h"
27 25
28 namespace cricket { 26 namespace cricket {
29 27
30 using rtc::Bind; 28 using rtc::Bind;
31 29
32 TransportProxy::~TransportProxy() {
33 for (ChannelMap::iterator iter = channels_.begin();
34 iter != channels_.end(); ++iter) {
35 iter->second->SignalDestroyed(iter->second);
36 delete iter->second;
37 }
38 }
39
40 const std::string& TransportProxy::type() const {
41 return transport_->get()->type();
42 }
43
44 TransportChannel* TransportProxy::GetChannel(int component) {
45 ASSERT(rtc::Thread::Current() == worker_thread_);
46 return GetChannelProxy(component);
47 }
48
49 TransportChannel* TransportProxy::CreateChannel(int component) {
50 ASSERT(rtc::Thread::Current() == worker_thread_);
51 ASSERT(GetChannel(component) == NULL);
52 ASSERT(!transport_->get()->HasChannel(component));
53
54 // We always create a proxy in case we need to change out the transport later.
55 TransportChannelProxy* channel_proxy =
56 new TransportChannelProxy(content_name(), component);
57 channels_[component] = channel_proxy;
58
59 // If we're already negotiated, create an impl and hook it up to the proxy
60 // channel. If we're connecting, create an impl but don't hook it up yet.
61 if (negotiated_) {
62 CreateChannelImpl_w(component);
63 SetChannelImplFromTransport_w(channel_proxy, component);
64 } else if (connecting_) {
65 CreateChannelImpl_w(component);
66 }
67 return channel_proxy;
68 }
69
70 bool TransportProxy::HasChannel(int component) {
71 return transport_->get()->HasChannel(component);
72 }
73
74 void TransportProxy::DestroyChannel(int component) {
75 ASSERT(rtc::Thread::Current() == worker_thread_);
76 TransportChannelProxy* channel_proxy = GetChannelProxy(component);
77 if (channel_proxy) {
78 // If the state of TransportProxy is not NEGOTIATED then
79 // TransportChannelProxy and its impl are not connected. Both must
80 // be connected before deletion.
81 //
82 // However, if we haven't entered the connecting state then there
83 // is no implementation to hook up.
84 if (connecting_ && !negotiated_) {
85 SetChannelImplFromTransport_w(channel_proxy, component);
86 }
87
88 channels_.erase(component);
89 channel_proxy->SignalDestroyed(channel_proxy);
90 delete channel_proxy;
91 }
92 }
93
94 void TransportProxy::ConnectChannels() {
95 if (!connecting_) {
96 if (!negotiated_) {
97 for (auto& iter : channels_) {
98 CreateChannelImpl(iter.first);
99 }
100 }
101 connecting_ = true;
102 }
103 // TODO(juberti): Right now Transport::ConnectChannels doesn't work if we
104 // don't have any channels yet, so we need to allow this method to be called
105 // multiple times. Once we fix Transport, we can move this call inside the
106 // if (!connecting_) block.
107 transport_->get()->ConnectChannels();
108 }
109
110 void TransportProxy::CompleteNegotiation() {
111 if (!negotiated_) {
112 // Negotiating assumes connecting_ has happened and
113 // implementations exist. If not we need to create the
114 // implementations.
115 for (auto& iter : channels_) {
116 if (!connecting_) {
117 CreateChannelImpl(iter.first);
118 }
119 SetChannelImplFromTransport(iter.second, iter.first);
120 }
121 negotiated_ = true;
122 }
123 }
124
125 void TransportProxy::AddSentCandidates(const Candidates& candidates) {
126 for (Candidates::const_iterator cand = candidates.begin();
127 cand != candidates.end(); ++cand) {
128 sent_candidates_.push_back(*cand);
129 }
130 }
131
132 void TransportProxy::AddUnsentCandidates(const Candidates& candidates) {
133 for (Candidates::const_iterator cand = candidates.begin();
134 cand != candidates.end(); ++cand) {
135 unsent_candidates_.push_back(*cand);
136 }
137 }
138
139 TransportChannelProxy* TransportProxy::GetChannelProxy(int component) const {
140 ChannelMap::const_iterator iter = channels_.find(component);
141 return (iter != channels_.end()) ? iter->second : NULL;
142 }
143
144 void TransportProxy::CreateChannelImpl(int component) {
145 worker_thread_->Invoke<void>(Bind(
146 &TransportProxy::CreateChannelImpl_w, this, component));
147 }
148
149 void TransportProxy::CreateChannelImpl_w(int component) {
150 ASSERT(rtc::Thread::Current() == worker_thread_);
151 transport_->get()->CreateChannel(component);
152 }
153
154 void TransportProxy::SetChannelImplFromTransport(TransportChannelProxy* proxy,
155 int component) {
156 worker_thread_->Invoke<void>(Bind(
157 &TransportProxy::SetChannelImplFromTransport_w, this, proxy, component));
158 }
159
160 void TransportProxy::SetChannelImplFromTransport_w(TransportChannelProxy* proxy,
161 int component) {
162 ASSERT(rtc::Thread::Current() == worker_thread_);
163 TransportChannelImpl* impl = transport_->get()->GetChannel(component);
164 ASSERT(impl != NULL);
165 ReplaceChannelImpl_w(proxy, impl);
166 }
167
168 void TransportProxy::ReplaceChannelImpl(TransportChannelProxy* proxy,
169 TransportChannelImpl* impl) {
170 worker_thread_->Invoke<void>(Bind(
171 &TransportProxy::ReplaceChannelImpl_w, this, proxy, impl));
172 }
173
174 void TransportProxy::ReplaceChannelImpl_w(TransportChannelProxy* proxy,
175 TransportChannelImpl* impl) {
176 ASSERT(rtc::Thread::Current() == worker_thread_);
177 ASSERT(proxy != NULL);
178 proxy->SetImplementation(impl);
179 }
180
181 // This function muxes |this| onto |target| by repointing |this| at
182 // |target|'s transport and setting our TransportChannelProxies
183 // to point to |target|'s underlying implementations.
184 bool TransportProxy::SetupMux(TransportProxy* target) {
185 // Bail out if there's nothing to do.
186 if (transport_ == target->transport_) {
187 return true;
188 }
189
190 // Run through all channels and remove any non-rtp transport channels before
191 // setting target transport channels.
192 for (ChannelMap::const_iterator iter = channels_.begin();
193 iter != channels_.end(); ++iter) {
194 if (!target->transport_->get()->HasChannel(iter->first)) {
195 // Remove if channel doesn't exist in |transport_|.
196 ReplaceChannelImpl(iter->second, NULL);
197 } else {
198 // Replace the impl for all the TransportProxyChannels with the channels
199 // from |target|'s transport. Fail if there's not an exact match.
200 ReplaceChannelImpl(
201 iter->second, target->transport_->get()->CreateChannel(iter->first));
202 }
203 }
204
205 // Now replace our transport. Must happen afterwards because
206 // it deletes all impls as a side effect.
207 transport_ = target->transport_;
208 transport_->get()->SignalCandidatesReady.connect(
209 this, &TransportProxy::OnTransportCandidatesReady);
210 set_candidates_allocated(target->candidates_allocated());
211 return true;
212 }
213
214 void TransportProxy::SetIceRole(IceRole role) {
215 transport_->get()->SetIceRole(role);
216 }
217
218 bool TransportProxy::SetLocalTransportDescription(
219 const TransportDescription& description,
220 ContentAction action,
221 std::string* error_desc) {
222 // If this is an answer, finalize the negotiation.
223 if (action == CA_ANSWER) {
224 CompleteNegotiation();
225 }
226 bool result = transport_->get()->SetLocalTransportDescription(description,
227 action,
228 error_desc);
229 if (result)
230 local_description_set_ = true;
231 return result;
232 }
233
234 bool TransportProxy::SetRemoteTransportDescription(
235 const TransportDescription& description,
236 ContentAction action,
237 std::string* error_desc) {
238 // If this is an answer, finalize the negotiation.
239 if (action == CA_ANSWER) {
240 CompleteNegotiation();
241 }
242 bool result = transport_->get()->SetRemoteTransportDescription(description,
243 action,
244 error_desc);
245 if (result)
246 remote_description_set_ = true;
247 return result;
248 }
249
250 void TransportProxy::OnSignalingReady() {
251 // If we're starting a new allocation sequence, reset our state.
252 set_candidates_allocated(false);
253 transport_->get()->OnSignalingReady();
254 }
255
256 bool TransportProxy::OnRemoteCandidates(const Candidates& candidates,
257 std::string* error) {
258 // Ensure the transport is negotiated before handling candidates.
259 // TODO(juberti): Remove this once everybody calls SetLocalTD.
260 CompleteNegotiation();
261
262 // Ignore candidates for if the proxy content_name doesn't match the content
263 // name of the actual transport. This stops video candidates from being sent
264 // down to the audio transport when BUNDLE is enabled.
265 if (content_name_ != transport_->get()->content_name()) {
266 return true;
267 }
268
269 // Verify each candidate before passing down to transport layer.
270 for (Candidates::const_iterator cand = candidates.begin();
271 cand != candidates.end(); ++cand) {
272 if (!transport_->get()->VerifyCandidate(*cand, error))
273 return false;
274 if (!HasChannel(cand->component())) {
275 *error = "Candidate has unknown component: " + cand->ToString() +
276 " for content: " + content_name_;
277 return false;
278 }
279 }
280 transport_->get()->OnRemoteCandidates(candidates);
281 return true;
282 }
283
284 void TransportProxy::SetIdentity(
285 rtc::SSLIdentity* identity) {
286 transport_->get()->SetIdentity(identity);
287 }
288
289 std::string BaseSession::StateToString(State state) { 30 std::string BaseSession::StateToString(State state) {
290 switch (state) { 31 switch (state) {
291 case STATE_INIT: 32 case STATE_INIT:
292 return "STATE_INIT"; 33 return "STATE_INIT";
293 case STATE_SENTINITIATE: 34 case STATE_SENTINITIATE:
294 return "STATE_SENTINITIATE"; 35 return "STATE_SENTINITIATE";
295 case STATE_RECEIVEDINITIATE: 36 case STATE_RECEIVEDINITIATE:
296 return "STATE_RECEIVEDINITIATE"; 37 return "STATE_RECEIVEDINITIATE";
297 case STATE_SENTPRACCEPT: 38 case STATE_SENTPRACCEPT:
298 return "STATE_SENTPRACCEPT"; 39 return "STATE_SENTPRACCEPT";
(...skipping 24 matching lines...) Expand all
323 default: 64 default:
324 break; 65 break;
325 } 66 }
326 return "STATE_" + rtc::ToString(state); 67 return "STATE_" + rtc::ToString(state);
327 } 68 }
328 69
329 BaseSession::BaseSession(rtc::Thread* signaling_thread, 70 BaseSession::BaseSession(rtc::Thread* signaling_thread,
330 rtc::Thread* worker_thread, 71 rtc::Thread* worker_thread,
331 PortAllocator* port_allocator, 72 PortAllocator* port_allocator,
332 const std::string& sid, 73 const std::string& sid,
333 const std::string& content_type,
334 bool initiator) 74 bool initiator)
335 : state_(STATE_INIT), 75 : state_(STATE_INIT),
336 error_(ERROR_NONE), 76 error_(ERROR_NONE),
337 signaling_thread_(signaling_thread), 77 signaling_thread_(signaling_thread),
338 worker_thread_(worker_thread), 78 worker_thread_(worker_thread),
339 port_allocator_(port_allocator), 79 port_allocator_(port_allocator),
340 sid_(sid), 80 sid_(sid),
341 content_type_(content_type), 81 transport_controller_(new TransportController(signaling_thread,
342 transport_type_(NS_GINGLE_P2P), 82 worker_thread,
343 initiator_(initiator), 83 port_allocator)) {
344 identity_(NULL),
345 ssl_max_version_(rtc::SSL_PROTOCOL_DTLS_10),
346 ice_tiebreaker_(rtc::CreateRandomId64()),
347 role_switch_(false),
348 ice_receiving_timeout_(-1) {
349 ASSERT(signaling_thread->IsCurrent()); 84 ASSERT(signaling_thread->IsCurrent());
85 set_initiator(initiator);
350 } 86 }
351 87
352 BaseSession::~BaseSession() { 88 BaseSession::~BaseSession() {
353 ASSERT(signaling_thread()->IsCurrent()); 89 ASSERT(signaling_thread()->IsCurrent());
354 90
355 ASSERT(state_ != STATE_DEINIT); 91 ASSERT(state_ != STATE_DEINIT);
356 LogState(state_, STATE_DEINIT); 92 LogState(state_, STATE_DEINIT);
357 state_ = STATE_DEINIT; 93 state_ = STATE_DEINIT;
358 SignalState(this, state_); 94 SignalState(this, state_);
359
360 for (TransportMap::iterator iter = transports_.begin();
361 iter != transports_.end(); ++iter) {
362 delete iter->second;
363 }
364 } 95 }
365 96
366 const SessionDescription* BaseSession::local_description() const { 97 const SessionDescription* BaseSession::local_description() const {
367 // TODO(tommi): Assert on thread correctness. 98 // TODO(tommi): Assert on thread correctness.
368 return local_description_.get(); 99 return local_description_.get();
369 } 100 }
370 101
371 const SessionDescription* BaseSession::remote_description() const { 102 const SessionDescription* BaseSession::remote_description() const {
372 // TODO(tommi): Assert on thread correctness. 103 // TODO(tommi): Assert on thread correctness.
373 return remote_description_.get(); 104 return remote_description_.get();
374 } 105 }
375 106
376 SessionDescription* BaseSession::remote_description() { 107 SessionDescription* BaseSession::remote_description() {
377 // TODO(tommi): Assert on thread correctness. 108 // TODO(tommi): Assert on thread correctness.
378 return remote_description_.get(); 109 return remote_description_.get();
379 } 110 }
380 111
381 void BaseSession::set_local_description(const SessionDescription* sdesc) { 112 void BaseSession::set_local_description(const SessionDescription* sdesc) {
382 // TODO(tommi): Assert on thread correctness. 113 // TODO(tommi): Assert on thread correctness.
383 if (sdesc != local_description_.get()) 114 if (sdesc != local_description_.get())
384 local_description_.reset(sdesc); 115 local_description_.reset(sdesc);
385 } 116 }
386 117
387 void BaseSession::set_remote_description(SessionDescription* sdesc) { 118 void BaseSession::set_remote_description(SessionDescription* sdesc) {
388 // TODO(tommi): Assert on thread correctness. 119 // TODO(tommi): Assert on thread correctness.
389 if (sdesc != remote_description_) 120 if (sdesc != remote_description_)
390 remote_description_.reset(sdesc); 121 remote_description_.reset(sdesc);
391 } 122 }
392 123
124 void BaseSession::set_initiator(bool initiator) {
125 initiator_ = initiator;
126
127 IceRole ice_role = initiator ? ICEROLE_CONTROLLING : ICEROLE_CONTROLLED;
128 transport_controller_->SetIceRole(ice_role);
129 }
130
393 const SessionDescription* BaseSession::initiator_description() const { 131 const SessionDescription* BaseSession::initiator_description() const {
394 // TODO(tommi): Assert on thread correctness. 132 // TODO(tommi): Assert on thread correctness.
395 return initiator_ ? local_description_.get() : remote_description_.get(); 133 return initiator_ ? local_description_.get() : remote_description_.get();
396 } 134 }
397 135
398 bool BaseSession::SetIdentity(rtc::SSLIdentity* identity) {
399 if (identity_)
400 return false;
401 identity_ = identity;
402 for (TransportMap::iterator iter = transports_.begin();
403 iter != transports_.end(); ++iter) {
404 iter->second->SetIdentity(identity_);
405 }
406 return true;
407 }
408
409 bool BaseSession::SetSslMaxProtocolVersion(rtc::SSLProtocolVersion version) {
410 if (state_ != STATE_INIT) {
411 return false;
412 }
413
414 ssl_max_version_ = version;
415 return true;
416 }
417
418 bool BaseSession::PushdownTransportDescription(ContentSource source, 136 bool BaseSession::PushdownTransportDescription(ContentSource source,
419 ContentAction action, 137 ContentAction action,
420 std::string* error_desc) { 138 std::string* error_desc) {
139 ASSERT(signaling_thread()->IsCurrent());
140
421 if (source == CS_LOCAL) { 141 if (source == CS_LOCAL) {
422 return PushdownLocalTransportDescription(local_description(), 142 return PushdownLocalTransportDescription(local_description(),
423 action, 143 action,
424 error_desc); 144 error_desc);
425 } 145 }
426 return PushdownRemoteTransportDescription(remote_description(), 146 return PushdownRemoteTransportDescription(remote_description(),
427 action, 147 action,
428 error_desc); 148 error_desc);
429 } 149 }
430 150
431 bool BaseSession::PushdownLocalTransportDescription( 151 bool BaseSession::PushdownLocalTransportDescription(
432 const SessionDescription* sdesc, 152 const SessionDescription* sdesc,
433 ContentAction action, 153 ContentAction action,
434 std::string* error_desc) { 154 std::string* err) {
435 // Update the Transports with the right information, and trigger them to 155 ASSERT(signaling_thread()->IsCurrent());
436 // start connecting.
437 for (TransportMap::iterator iter = transports_.begin();
438 iter != transports_.end(); ++iter) {
439 // If no transport info was in this session description, ret == false
440 // and we just skip this one.
441 TransportDescription tdesc;
442 bool ret = GetTransportDescription(
443 sdesc, iter->second->content_name(), &tdesc);
444 if (ret) {
445 if (!iter->second->SetLocalTransportDescription(tdesc, action,
446 error_desc)) {
447 return false;
448 }
449 156
450 iter->second->ConnectChannels(); 157 if (!sdesc) {
158 return false;
159 }
160
161 for (const TransportInfo& tinfo : sdesc->transport_infos()) {
162 if (!transport_controller_->SetLocalTransportDescription(
163 tinfo.content_name, tinfo.description, action, err)) {
164 return false;
451 } 165 }
452 } 166 }
453 167
454 return true; 168 return true;
455 } 169 }
456 170
457 bool BaseSession::PushdownRemoteTransportDescription( 171 bool BaseSession::PushdownRemoteTransportDescription(
458 const SessionDescription* sdesc, 172 const SessionDescription* sdesc,
459 ContentAction action, 173 ContentAction action,
460 std::string* error_desc) { 174 std::string* err) {
461 // Update the Transports with the right information. 175 ASSERT(signaling_thread()->IsCurrent());
462 for (TransportMap::iterator iter = transports_.begin();
463 iter != transports_.end(); ++iter) {
464 TransportDescription tdesc;
465 176
466 // If no transport info was in this session description, ret == false 177 if (!sdesc) {
467 // and we just skip this one. 178 return false;
468 bool ret = GetTransportDescription( 179 }
469 sdesc, iter->second->content_name(), &tdesc); 180
470 if (ret) { 181 for (const TransportInfo& tinfo : sdesc->transport_infos()) {
471 if (!iter->second->SetRemoteTransportDescription(tdesc, action, 182 if (!transport_controller_->SetRemoteTransportDescription(
472 error_desc)) { 183 tinfo.content_name, tinfo.description, action, err)) {
473 return false; 184 return false;
474 }
475 } 185 }
476 } 186 }
477 187
478 return true; 188 return true;
479 } 189 }
480 190
481 void BaseSession::SetIceConnectionReceivingTimeout(int timeout_ms) {
482 ice_receiving_timeout_ = timeout_ms;
483 for (const auto& kv : transport_proxies()) {
484 Transport* transport = kv.second->impl();
485 if (transport) {
486 transport->SetChannelReceivingTimeout(timeout_ms);
487 }
488 }
489 }
490
491 TransportChannel* BaseSession::CreateChannel(const std::string& content_name,
492 int component) {
493 // We create the proxy "on demand" here because we need to support
494 // creating channels at any time, even before we send or receive
495 // initiate messages, which is before we create the transports.
496 TransportProxy* transproxy = GetOrCreateTransportProxy(content_name);
497 return transproxy->CreateChannel(component);
498 }
499
500 TransportChannel* BaseSession::GetChannel(const std::string& content_name,
501 int component) {
502 TransportProxy* transproxy = GetTransportProxy(content_name);
503 if (transproxy == NULL)
504 return NULL;
505
506 return transproxy->GetChannel(component);
507 }
508
509 void BaseSession::DestroyChannel(const std::string& content_name,
510 int component) {
511 TransportProxy* transproxy = GetTransportProxy(content_name);
512 ASSERT(transproxy != NULL);
513 transproxy->DestroyChannel(component);
514 }
515
516 TransportProxy* BaseSession::GetOrCreateTransportProxy(
517 const std::string& content_name) {
518 TransportProxy* transproxy = GetTransportProxy(content_name);
519 if (transproxy)
520 return transproxy;
521
522 Transport* transport = CreateTransport(content_name);
523 transport->SetIceRole(initiator_ ? ICEROLE_CONTROLLING : ICEROLE_CONTROLLED);
524 transport->SetIceTiebreaker(ice_tiebreaker_);
525 transport->SetSslMaxProtocolVersion(ssl_max_version_);
526 // TODO: Connect all the Transport signals to TransportProxy
527 // then to the BaseSession.
528 transport->SignalConnecting.connect(
529 this, &BaseSession::OnTransportConnecting);
530 transport->SignalWritableState.connect(
531 this, &BaseSession::OnTransportWritable);
532 transport->SignalReceivingState.connect(
533 this, &BaseSession::OnTransportReceiving);
534 transport->SignalRequestSignaling.connect(
535 this, &BaseSession::OnTransportRequestSignaling);
536 transport->SignalRouteChange.connect(
537 this, &BaseSession::OnTransportRouteChange);
538 transport->SignalCandidatesAllocationDone.connect(
539 this, &BaseSession::OnTransportCandidatesAllocationDone);
540 transport->SignalRoleConflict.connect(
541 this, &BaseSession::OnRoleConflict);
542 transport->SignalCompleted.connect(
543 this, &BaseSession::OnTransportCompleted);
544 transport->SignalFailed.connect(
545 this, &BaseSession::OnTransportFailed);
546
547 transproxy = new TransportProxy(worker_thread_, sid_, content_name,
548 new TransportWrapper(transport));
549 transproxy->SignalCandidatesReady.connect(
550 this, &BaseSession::OnTransportProxyCandidatesReady);
551 if (identity_)
552 transproxy->SetIdentity(identity_);
553 transports_[content_name] = transproxy;
554
555 return transproxy;
556 }
557
558 Transport* BaseSession::GetTransport(const std::string& content_name) {
559 TransportProxy* transproxy = GetTransportProxy(content_name);
560 if (transproxy == NULL)
561 return NULL;
562 return transproxy->impl();
563 }
564
565 TransportProxy* BaseSession::GetTransportProxy(
566 const std::string& content_name) {
567 TransportMap::iterator iter = transports_.find(content_name);
568 return (iter != transports_.end()) ? iter->second : NULL;
569 }
570
571 void BaseSession::DestroyTransportProxy(
572 const std::string& content_name) {
573 TransportMap::iterator iter = transports_.find(content_name);
574 if (iter != transports_.end()) {
575 delete iter->second;
576 transports_.erase(content_name);
577 }
578 }
579
580 Transport* BaseSession::CreateTransport(const std::string& content_name) {
581 ASSERT(transport_type_ == NS_GINGLE_P2P);
582 Transport* transport = new DtlsTransport<P2PTransport>(
583 signaling_thread(), worker_thread(), content_name, port_allocator(),
584 identity_);
585 transport->SetChannelReceivingTimeout(ice_receiving_timeout_);
586 return transport;
587 }
588
589 void BaseSession::SetState(State state) { 191 void BaseSession::SetState(State state) {
590 ASSERT(signaling_thread_->IsCurrent()); 192 ASSERT(signaling_thread_->IsCurrent());
591 if (state != state_) { 193 if (state != state_) {
592 LogState(state_, state); 194 LogState(state_, state);
593 state_ = state; 195 state_ = state;
594 SignalState(this, state_); 196 SignalState(this, state_);
595 signaling_thread_->Post(this, MSG_STATE); 197 signaling_thread_->Post(this, MSG_STATE);
596 } 198 }
597 } 199 }
598 200
599 void BaseSession::SetError(Error error, const std::string& error_desc) { 201 void BaseSession::SetError(Error error, const std::string& error_desc) {
600 ASSERT(signaling_thread_->IsCurrent()); 202 ASSERT(signaling_thread_->IsCurrent());
601 if (error != error_) { 203 if (error != error_) {
602 error_ = error; 204 error_ = error;
603 error_desc_ = error_desc; 205 error_desc_ = error_desc;
604 SignalError(this, error); 206 SignalError(this, error);
605 } 207 }
606 } 208 }
607 209
608 void BaseSession::OnSignalingReady() { 210 void BaseSession::SetIceConnectionReceivingTimeout(int timeout_ms) {
609 ASSERT(signaling_thread()->IsCurrent()); 211 transport_controller_->SetIceConnectionReceivingTimeout(timeout_ms);
610 for (TransportMap::iterator iter = transports_.begin();
611 iter != transports_.end(); ++iter) {
612 iter->second->OnSignalingReady();
613 }
614 }
615
616 // TODO(juberti): Since PushdownLocalTD now triggers the connection process to
617 // start, remove this method once everyone calls PushdownLocalTD.
618 void BaseSession::SpeculativelyConnectAllTransportChannels() {
619 // Put all transports into the connecting state.
620 for (TransportMap::iterator iter = transports_.begin();
621 iter != transports_.end(); ++iter) {
622 iter->second->ConnectChannels();
623 }
624 }
625
626 bool BaseSession::OnRemoteCandidates(const std::string& content_name,
627 const Candidates& candidates,
628 std::string* error) {
629 // Give candidates to the appropriate transport, and tell that transport
630 // to start connecting, if it's not already doing so.
631 TransportProxy* transproxy = GetTransportProxy(content_name);
632 if (!transproxy) {
633 *error = "Unknown content name " + content_name;
634 return false;
635 }
636 if (!transproxy->OnRemoteCandidates(candidates, error)) {
637 return false;
638 }
639 // TODO(juberti): Remove this call once we can be sure that we always have
640 // a local transport description (which will trigger the connection).
641 transproxy->ConnectChannels();
642 return true;
643 }
644
645 bool BaseSession::MaybeEnableMuxingSupport() {
646 // We need both a local and remote description to decide if we should mux.
647 if ((state_ == STATE_SENTINITIATE ||
648 state_ == STATE_RECEIVEDINITIATE) &&
649 ((local_description_ == NULL) ||
650 (remote_description_ == NULL))) {
651 return false;
652 }
653
654 // In order to perform the multiplexing, we need all proxies to be in the
655 // negotiated state, i.e. to have implementations underneath.
656 // Ensure that this is the case, regardless of whether we are going to mux.
657 for (TransportMap::iterator iter = transports_.begin();
658 iter != transports_.end(); ++iter) {
659 ASSERT(iter->second->negotiated());
660 if (!iter->second->negotiated()) {
661 return false;
662 }
663 }
664
665 // If both sides agree to BUNDLE, mux all the specified contents onto the
666 // transport belonging to the first content name in the BUNDLE group.
667 // If the contents are already muxed, this will be a no-op.
668 // TODO(juberti): Should this check that local and remote have configured
669 // BUNDLE the same way?
670 bool candidates_allocated = IsCandidateAllocationDone();
671 const ContentGroup* local_bundle_group =
672 local_description_->GetGroupByName(GROUP_TYPE_BUNDLE);
673 const ContentGroup* remote_bundle_group =
674 remote_description_->GetGroupByName(GROUP_TYPE_BUNDLE);
675 if (local_bundle_group && remote_bundle_group) {
676 if (!BundleContentGroup(local_bundle_group)) {
677 LOG(LS_WARNING) << "Failed to set up BUNDLE";
678 return false;
679 }
680
681 // If we weren't done gathering before, we might be done now, as a result
682 // of enabling mux.
683 if (!candidates_allocated) {
684 MaybeCandidateAllocationDone();
685 }
686 } else {
687 LOG(LS_INFO) << "BUNDLE group missing from remote or local description.";
688 }
689 return true;
690 }
691
692 bool BaseSession::BundleContentGroup(const ContentGroup* bundle_group) {
693 const std::string* content_name = bundle_group->FirstContentName();
694 if (!content_name) {
695 LOG(LS_INFO) << "No content names specified in BUNDLE group.";
696 return true;
697 }
698
699 TransportProxy* selected_proxy = GetTransportProxy(*content_name);
700 if (!selected_proxy) {
701 LOG(LS_WARNING) << "No transport found for content \""
702 << *content_name << "\".";
703 return false;
704 }
705
706 for (TransportMap::iterator iter = transports_.begin();
707 iter != transports_.end(); ++iter) {
708 // If content is part of the mux group, then repoint its proxy at the
709 // transport object that we have chosen to mux onto. If the proxy
710 // is already pointing at the right object, it will be a no-op.
711 if (bundle_group->HasContentName(iter->first) &&
712 !iter->second->SetupMux(selected_proxy)) {
713 LOG(LS_WARNING) << "Failed to bundle " << iter->first << " to "
714 << *content_name;
715 return false;
716 }
717 LOG(LS_INFO) << "Bundling " << iter->first << " to " << *content_name;
718 }
719
720 return true;
721 }
722
723 void BaseSession::OnTransportCandidatesAllocationDone(Transport* transport) {
724 // TODO(juberti): This is a clunky way of processing the done signal. Instead,
725 // TransportProxy should receive the done signal directly, set its allocated
726 // flag internally, and then reissue the done signal to Session.
727 // Overall we should make TransportProxy receive *all* the signals from
728 // Transport, since this removes the need to manually iterate over all
729 // the transports, as is needed to make sure signals are handled properly
730 // when BUNDLEing.
731 // TODO(juberti): Per b/7998978, devs and QA are hitting this assert in ways
732 // that make it prohibitively difficult to run dbg builds. Disabled for now.
733 //ASSERT(!IsCandidateAllocationDone());
734 for (TransportMap::iterator iter = transports_.begin();
735 iter != transports_.end(); ++iter) {
736 if (iter->second->impl() == transport) {
737 iter->second->set_candidates_allocated(true);
738 }
739 }
740 MaybeCandidateAllocationDone();
741 }
742
743 bool BaseSession::IsCandidateAllocationDone() const {
744 for (TransportMap::const_iterator iter = transports_.begin();
745 iter != transports_.end(); ++iter) {
746 if (!iter->second->candidates_allocated()) {
747 LOG(LS_INFO) << "Candidate allocation not done for "
748 << iter->second->content_name();
749 return false;
750 }
751 }
752 return true;
753 }
754
755 void BaseSession::MaybeCandidateAllocationDone() {
756 if (IsCandidateAllocationDone()) {
757 LOG(LS_INFO) << "Candidate gathering is complete.";
758 OnCandidatesAllocationDone();
759 }
760 }
761
762 void BaseSession::OnRoleConflict() {
763 if (role_switch_) {
764 LOG(LS_WARNING) << "Repeat of role conflict signal from Transport.";
765 return;
766 }
767
768 role_switch_ = true;
769 for (TransportMap::iterator iter = transports_.begin();
770 iter != transports_.end(); ++iter) {
771 // Role will be reverse of initial role setting.
772 IceRole role = initiator_ ? ICEROLE_CONTROLLED : ICEROLE_CONTROLLING;
773 iter->second->SetIceRole(role);
774 }
775 } 212 }
776 213
777 void BaseSession::LogState(State old_state, State new_state) { 214 void BaseSession::LogState(State old_state, State new_state) {
778 LOG(LS_INFO) << "Session:" << id() 215 LOG(LS_INFO) << "Session:" << id()
779 << " Old state:" << StateToString(old_state) 216 << " Old state:" << StateToString(old_state)
780 << " New state:" << StateToString(new_state) 217 << " New state:" << StateToString(new_state);
781 << " Type:" << content_type()
782 << " Transport:" << transport_type();
783 } 218 }
784 219
785 // static 220 // static
786 bool BaseSession::GetTransportDescription(const SessionDescription* description, 221 bool BaseSession::GetTransportDescription(const SessionDescription* description,
787 const std::string& content_name, 222 const std::string& content_name,
788 TransportDescription* tdesc) { 223 TransportDescription* tdesc) {
789 if (!description || !tdesc) { 224 if (!description || !tdesc) {
790 return false; 225 return false;
791 } 226 }
792 const TransportInfo* transport_info = 227 const TransportInfo* transport_info =
(...skipping 21 matching lines...) Expand all
814 249
815 default: 250 default:
816 // Explicitly ignoring some states here. 251 // Explicitly ignoring some states here.
817 break; 252 break;
818 } 253 }
819 break; 254 break;
820 } 255 }
821 } 256 }
822 257
823 } // namespace cricket 258 } // namespace cricket
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698