Index: webrtc/p2p/base/transportcontroller.cc |
diff --git a/webrtc/p2p/base/transportcontroller.cc b/webrtc/p2p/base/transportcontroller.cc |
new file mode 100644 |
index 0000000000000000000000000000000000000000..e85296d484ae4871845ec522efa8b1d8e6d625ca |
--- /dev/null |
+++ b/webrtc/p2p/base/transportcontroller.cc |
@@ -0,0 +1,598 @@ |
+/* |
+ * Copyright 2015 The WebRTC Project Authors. All rights reserved. |
+ * |
+ * Use of this source code is governed by a BSD-style license |
+ * that can be found in the LICENSE file in the root of the source |
+ * tree. An additional intellectual property rights grant can be found |
+ * in the file PATENTS. All contributing project authors may |
+ * be found in the AUTHORS file in the root of the source tree. |
+ */ |
+ |
+#include "webrtc/p2p/base/transportcontroller.h" |
+ |
+#include "webrtc/base/bind.h" |
+#include "webrtc/base/thread.h" |
+#include "webrtc/p2p/base/dtlstransport.h" |
+#include "webrtc/p2p/base/p2ptransport.h" |
+ |
+namespace cricket { |
+ |
+enum { |
+ MSG_CONNECTIONSTATE, |
+ MSG_RECEIVING, |
+ MSG_CANDIDATESALLOCATIONSTARTED, |
+ MSG_CANDIDATESALLOCATIONDONE, |
+ MSG_CANDIDATESREADY, |
+}; |
+ |
+struct CandidatesData : public rtc::MessageData { |
+ CandidatesData(const std::string& transport_name, |
+ const Candidates& candidates) |
+ : transport_name(transport_name), |
+ candidates(candidates) { |
+ } |
+ |
+ std::string transport_name; |
+ Candidates candidates; |
+}; |
+ |
+TransportController::TransportController(rtc::Thread* signaling_thread, |
+ rtc::Thread* worker_thread, |
+ PortAllocator* port_allocator) |
+ : signaling_thread_(signaling_thread), |
+ worker_thread_(worker_thread), |
+ port_allocator_(port_allocator), |
+ ice_receiving_timeout_ms_(-1), |
+ ice_role_(ICEROLE_CONTROLLING), |
+ ssl_max_version_(rtc::SSL_PROTOCOL_DTLS_10), |
+ ice_role_switch_(false), |
+ ice_tiebreaker_(rtc::CreateRandomId64()), |
+ connection_state_(kConnecting), |
+ receiving_(false), |
+ candidate_gathering_state_(Transport::kGatheringNew) { |
+ ASSERT(signaling_thread_->IsCurrent()); |
+} |
+ |
+TransportController::~TransportController() { |
+ ASSERT(signaling_thread_->IsCurrent()); |
+ worker_thread_->Invoke<void>( |
+ rtc::Bind(&TransportController::DestroyAllTransports_w, this)); |
+ signaling_thread_->Clear(this); |
+} |
+ |
+void TransportController::SetIceConnectionReceivingTimeout(int timeout_ms) { |
+ ASSERT(signaling_thread_->IsCurrent()); |
+ worker_thread_->Invoke<void>( |
+ rtc::Bind(&TransportController::SetIceConnectionReceivingTimeout_w, this, |
+ timeout_ms)); |
+} |
+ |
+void TransportController::SetIceRole(IceRole ice_role) { |
+ ASSERT(signaling_thread_->IsCurrent()); |
+ worker_thread_->Invoke<void>( |
+ rtc::Bind(&TransportController::SetIceRole_w, this, ice_role)); |
+} |
+ |
+bool TransportController::SetIdentity(rtc::SSLIdentity* identity) { |
+ ASSERT(signaling_thread_->IsCurrent()); |
+ return worker_thread_->Invoke<bool>( |
+ rtc::Bind(&TransportController::SetIdentity_w, this, identity)); |
+} |
+ |
+bool TransportController::GetIdentity(const std::string& transport_name, |
+ rtc::SSLIdentity** identity) { |
+ ASSERT(signaling_thread_->IsCurrent()); |
+ return worker_thread_->Invoke<bool>( |
+ rtc::Bind(&TransportController::GetIdentity_w, this, transport_name, |
+ identity)); |
+} |
+ |
+bool TransportController::GetRemoteCertificate( |
+ const std::string& transport_name, |
+ rtc::SSLCertificate** cert) { |
+ ASSERT(signaling_thread_->IsCurrent()); |
+ return worker_thread_->Invoke<bool>( |
+ rtc::Bind(&TransportController::GetRemoteCertificate_w, this, |
+ transport_name, cert)); |
+} |
+ |
+bool TransportController::SetSslMaxProtocolVersion( |
+ rtc::SSLProtocolVersion version) { |
+ ASSERT(signaling_thread_->IsCurrent()); |
+ return worker_thread_->Invoke<bool>( |
+ rtc::Bind(&TransportController::SetSslMaxProtocolVersion_w, this, |
+ version)); |
+} |
+ |
+bool TransportController::SetLocalTransportDescription( |
+ const std::string& transport_name, |
+ const TransportDescription& tdesc, |
+ ContentAction action, |
+ std::string* err) { |
+ ASSERT(signaling_thread_->IsCurrent()); |
+ return worker_thread_->Invoke<bool>( |
+ rtc::Bind(&TransportController::SetLocalTransportDescription_w, this, |
+ transport_name, tdesc, action, err)); |
+} |
+ |
+bool TransportController::SetRemoteTransportDescription( |
+ const std::string& transport_name, |
+ const TransportDescription& tdesc, |
+ ContentAction action, |
+ std::string* err) { |
+ ASSERT(signaling_thread_->IsCurrent()); |
+ return worker_thread_->Invoke<bool>( |
+ rtc::Bind(&TransportController::SetRemoteTransportDescription_w, this, |
+ transport_name, tdesc, action, err)); |
+} |
+ |
+bool TransportController::AddRemoteCandidates( |
+ const std::string& transport_name, |
+ const Candidates& candidates, |
+ std::string* err) { |
+ ASSERT(signaling_thread_->IsCurrent()); |
+ return worker_thread_->Invoke<bool>( |
+ rtc::Bind(&TransportController::AddRemoteCandidates_w, this, |
+ transport_name, candidates, err)); |
+} |
+ |
+bool TransportController::ReadyForRemoteCandidates( |
+ const std::string& transport_name) { |
+ ASSERT(signaling_thread_->IsCurrent()); |
+ return worker_thread_->Invoke<bool>( |
+ rtc::Bind(&TransportController::ReadyForRemoteCandidates_w, this, |
+ transport_name)); |
+} |
+ |
+bool TransportController::GetSslRole(rtc::SSLRole* role) { |
+ ASSERT(signaling_thread_->IsCurrent()); |
+ return worker_thread_->Invoke<bool>( |
+ rtc::Bind(&TransportController::GetSslRole_w, this, role)); |
+} |
+ |
+bool TransportController::GetStats( |
+ const std::string& transport_name, |
+ TransportStats* stats) { |
+ ASSERT(signaling_thread_->IsCurrent()); |
+ return worker_thread_->Invoke<bool>( |
+ rtc::Bind(&TransportController::GetStats_w, this, |
+ transport_name, stats)); |
+} |
+ |
+TransportChannel* TransportController::CreateTransportChannel_w( |
+ const std::string& transport_name, int component) { |
+ ASSERT(worker_thread_->IsCurrent()); |
+ |
+ Transport* transport = GetOrCreateTransport_w(transport_name); |
+ return transport->CreateChannel(component); |
+} |
+ |
+void TransportController::DestroyTransportChannel_w( |
+ const std::string& transport_name, int component) { |
+ ASSERT(worker_thread_->IsCurrent()); |
+ |
+ Transport* transport = GetTransport_w(transport_name); |
+ ASSERT(transport != nullptr); |
+ transport->DestroyChannel(component); |
+ |
+ // Just as we create a Transport when its first channel is created, |
+ // we delete it when its last channel is deleted. |
+ if (!transport->HasChannels()) { |
+ DestroyTransport_w(transport_name); |
+ UpdateState_w(); |
+ CheckIfCandidatesAllocationDone_w(); |
+ } |
+} |
+ |
+void TransportController::OnMessage(rtc::Message* pmsg) { |
+ ASSERT(signaling_thread_->IsCurrent()); |
+ |
+ switch (pmsg->message_id) { |
+ case MSG_CONNECTIONSTATE: { |
+ rtc::TypedMessageData<ConnectionState>* data = |
+ static_cast<rtc::TypedMessageData<ConnectionState>*>(pmsg->pdata); |
+ SignalConnectionStateChanged(data->data()); |
+ delete data; |
+ break; |
+ } |
+ case MSG_RECEIVING: { |
+ rtc::TypedMessageData<bool>* data = |
+ static_cast<rtc::TypedMessageData<bool>*>(pmsg->pdata); |
+ SignalReceiving(data->data()); |
+ delete data; |
+ break; |
+ } |
+ case MSG_CANDIDATESALLOCATIONSTARTED: |
+ SignalCandidatesAllocationStarted(); |
+ break; |
+ case MSG_CANDIDATESALLOCATIONDONE: |
+ SignalCandidatesAllocationDone(); |
+ break; |
+ case MSG_CANDIDATESREADY: { |
+ CandidatesData* data = static_cast<CandidatesData*>(pmsg->pdata); |
+ SignalCandidatesReady(data->transport_name, data->candidates); |
+ delete data; |
+ break; |
+ } |
+ default: |
+ ASSERT(false); |
+ } |
+} |
+ |
+Transport* TransportController::CreateTransport_w( |
+ const std::string& transport_name) { |
+ ASSERT(worker_thread_->IsCurrent()); |
+ |
+ Transport* transport = new DtlsTransport<P2PTransport>( |
+ transport_name, port_allocator(), identity_.get()); |
+ return transport; |
+} |
+ |
+Transport* TransportController::GetOrCreateTransport_w( |
+ const std::string& transport_name) { |
+ ASSERT(worker_thread_->IsCurrent()); |
+ |
+ Transport* transport = GetTransport_w(transport_name); |
+ if (transport) |
+ return transport; |
+ |
+ transport = CreateTransport_w(transport_name); |
+ transport->SetChannelReceivingTimeout(ice_receiving_timeout_ms_); |
+ transport->SetIceRole(ice_role_); |
+ transport->SetIceTiebreaker(ice_tiebreaker_); |
+ transport->SetSslMaxProtocolVersion(ssl_max_version_); |
+ transport->SignalConnecting.connect( |
+ this, &TransportController::OnTransportConnecting_w); |
+ transport->SignalWritableState.connect( |
+ this, &TransportController::OnTransportWritableState_w); |
+ transport->SignalReceivingState.connect( |
+ this, &TransportController::OnTransportReceivingState_w); |
+ transport->SignalCandidatesAllocationStarted.connect( |
+ this, &TransportController::OnTransportCandidatesAllocationStarted_w); |
+ transport->SignalRouteChange.connect( |
+ this, &TransportController::OnTransportRouteChange_w); |
+ transport->SignalCandidatesAllocationDone.connect( |
+ this, &TransportController::OnTransportCandidatesAllocationDone_w); |
+ transport->SignalRoleConflict.connect( |
+ this, &TransportController::OnTransportRoleConflict_w); |
+ transport->SignalCompleted.connect( |
+ this, &TransportController::OnTransportCompleted_w); |
+ transport->SignalFailed.connect( |
+ this, &TransportController::OnTransportFailed_w); |
+ transport->SignalCandidatesReady.connect( |
+ this, &TransportController::OnTransportCandidatesReady_w); |
+ if (identity_) |
+ transport->SetIdentity(identity_.get()); |
+ transports_[transport_name] = transport; |
+ |
+ return transport; |
+} |
+ |
+Transport* TransportController::GetTransport_w( |
+ const std::string& transport_name) { |
+ ASSERT(worker_thread_->IsCurrent()); |
+ |
+ auto iter = transports_.find(transport_name); |
+ return (iter != transports_.end()) ? iter->second : nullptr; |
+} |
+ |
+void TransportController::DestroyTransport_w( |
+ const std::string& transport_name) { |
+ ASSERT(worker_thread_->IsCurrent()); |
+ |
+ auto iter = transports_.find(transport_name); |
+ if (iter != transports_.end()) { |
+ delete iter->second; |
+ transports_.erase(transport_name); |
+ } |
+} |
+ |
+void TransportController::DestroyAllTransports_w() { |
+ ASSERT(worker_thread_->IsCurrent()); |
+ |
+ for (const auto& kv : transports_) { |
+ delete kv.second; |
+ } |
+ transports_.clear(); |
+} |
+ |
+void TransportController::SetIceConnectionReceivingTimeout_w(int timeout_ms) { |
+ ASSERT(worker_thread_->IsCurrent()); |
+ ice_receiving_timeout_ms_ = timeout_ms; |
+ for (const auto& kv : transports_) { |
+ kv.second->SetChannelReceivingTimeout(timeout_ms); |
+ } |
+} |
+ |
+void TransportController::SetIceRole_w(IceRole ice_role) { |
+ ASSERT(worker_thread_->IsCurrent()); |
+ ice_role_ = ice_role; |
+ for (const auto& kv : transports_) { |
+ kv.second->SetIceRole(ice_role_); |
+ } |
+} |
+ |
+bool TransportController::SetIdentity_w(rtc::SSLIdentity* identity) { |
+ ASSERT(worker_thread_->IsCurrent()); |
+ |
+ if (identity_) |
+ return false; |
+ identity_.reset(identity); |
+ |
+ for (const auto& kv : transports_) { |
+ kv.second->SetIdentity(identity_.get()); |
+ } |
+ return true; |
+} |
+ |
+bool TransportController::GetIdentity_w(const std::string& transport_name, |
+ rtc::SSLIdentity** identity) { |
+ ASSERT(worker_thread_->IsCurrent()); |
+ |
+ Transport* t = GetTransport_w(transport_name); |
+ if (!t) { |
+ return false; |
+ } |
+ |
+ return t->GetIdentity(identity); |
+} |
+ |
+bool TransportController::GetRemoteCertificate_w( |
+ const std::string& transport_name, |
+ rtc::SSLCertificate** cert) { |
+ ASSERT(worker_thread_->IsCurrent()); |
+ |
+ Transport* t = GetTransport_w(transport_name); |
+ if (!t) { |
+ return false; |
+ } |
+ |
+ return t->GetRemoteCertificate(cert); |
+} |
+ |
+bool TransportController::SetSslMaxProtocolVersion_w( |
+ rtc::SSLProtocolVersion version) { |
+ ASSERT(worker_thread_->IsCurrent()); |
+ |
+ // Max SSL version can only be set before transports are created |
+ if (!transports_.empty()) { |
+ return false; |
+ } |
+ |
+ ssl_max_version_ = version; |
+ return true; |
+} |
+ |
+bool TransportController::SetLocalTransportDescription_w( |
+ const std::string& transport_name, |
+ const TransportDescription& tdesc, |
+ ContentAction action, |
+ std::string* err) { |
+ ASSERT(worker_thread()->IsCurrent()); |
+ |
+ Transport* transport = GetTransport_w(transport_name); |
+ if (!transport) { |
+ // If we didn't find a transport, that's not an error; |
+ // it could have been deleted as a result of bundling. |
+ return true; |
+ } |
+ |
+ if (!transport->SetLocalTransportDescription(tdesc, action, err)) { |
+ return false; |
+ } |
+ transport->ConnectChannels(); |
+ return true; |
+} |
+ |
+bool TransportController::SetRemoteTransportDescription_w( |
+ const std::string& transport_name, |
+ const TransportDescription& tdesc, |
+ ContentAction action, |
+ std::string* err) { |
+ ASSERT(worker_thread()->IsCurrent()); |
+ |
+ Transport* transport = GetTransport_w(transport_name); |
+ if (!transport) { |
+ // If we didn't find a transport, that's not an error; |
+ // it could have been deleted as a result of bundling. |
+ return true; |
+ } |
+ |
+ return transport->SetRemoteTransportDescription(tdesc, action, err); |
+} |
+ |
+bool TransportController::AddRemoteCandidates_w( |
+ const std::string& transport_name, |
+ const Candidates& candidates, |
+ std::string* err) { |
+ ASSERT(worker_thread()->IsCurrent()); |
+ |
+ Transport* transport = GetTransport_w(transport_name); |
+ if (!transport) { |
+ // If we didn't find a transport, that's not an error; |
+ // it could have been deleted as a result of bundling. |
+ return true; |
+ } |
+ |
+ return transport->AddRemoteCandidates(candidates, err); |
+} |
+ |
+bool TransportController::ReadyForRemoteCandidates_w( |
+ const std::string& transport_name) { |
+ ASSERT(worker_thread()->IsCurrent()); |
+ |
+ Transport* transport = GetTransport_w(transport_name); |
+ if (!transport) { |
+ return false; |
+ } |
+ return transport->local_description_set() && |
+ transport->remote_description_set(); |
+} |
+ |
+bool TransportController::GetSslRole_w(rtc::SSLRole* role) { |
+ ASSERT(worker_thread()->IsCurrent()); |
+ |
+ // TODO(mallinath) - Return role of each transport, as role may differ from |
+ // one another. |
+ // In current implementaion we just return the role of first transport in the |
+ // transport map. |
+ for (const auto& kv : transports_) { |
+ return kv.second->GetSslRole(role); |
+ } |
+ return false; |
+} |
+ |
+bool TransportController::GetStats_w( |
+ const std::string& transport_name, |
+ TransportStats* stats) { |
+ ASSERT(worker_thread()->IsCurrent()); |
+ |
+ Transport* transport = GetTransport_w(transport_name); |
+ if (!transport) { |
+ return false; |
+ } |
+ return transport->GetStats(stats); |
+} |
+ |
+void TransportController::OnTransportConnecting_w(Transport* transport) { |
+ ASSERT(worker_thread_->IsCurrent()); |
+ UpdateState_w(); |
+} |
+ |
+void TransportController::OnTransportWritableState_w(Transport* transport) { |
+ ASSERT(worker_thread_->IsCurrent()); |
+ UpdateState_w(); |
+} |
+ |
+void TransportController::OnTransportReceivingState_w(Transport* transport) { |
+ ASSERT(worker_thread_->IsCurrent()); |
+ UpdateState_w(); |
+} |
+ |
+void TransportController::OnTransportCandidatesAllocationStarted_w( |
+ Transport* transport) { |
+ ASSERT(worker_thread_->IsCurrent()); |
+ if (candidate_gathering_state_ != Transport::kGatheringGathering) { |
+ candidate_gathering_state_ = Transport::kGatheringGathering; |
+ signaling_thread_->Post(this, MSG_CANDIDATESALLOCATIONSTARTED); |
+ } |
+} |
+ |
+void TransportController::OnTransportRouteChange_w(Transport* transport, |
+ int component, |
+ const Candidate& candidate) { |
+ ASSERT(worker_thread_->IsCurrent()); |
+ // This signal is not currently used for anything |
+} |
+ |
+void TransportController::OnTransportCandidatesAllocationDone_w( |
+ Transport* transport) { |
+ ASSERT(worker_thread_->IsCurrent()); |
+ CheckIfCandidatesAllocationDone_w(); |
+} |
+ |
+void TransportController::OnTransportRoleConflict_w() { |
+ ASSERT(worker_thread_->IsCurrent()); |
+ |
+ if (ice_role_switch_) { |
+ LOG(LS_WARNING) << "Repeat of role conflic signal from Transport."; |
+ return; |
+ } |
+ |
+ ice_role_switch_ = true; |
+ IceRole reversed_role = (ice_role_ == ICEROLE_CONTROLLING) |
+ ? ICEROLE_CONTROLLED |
+ : ICEROLE_CONTROLLING; |
+ for (const auto& kv : transports_) { |
+ kv.second->SetIceRole(reversed_role); |
+ } |
+} |
+ |
+void TransportController::OnTransportCompleted_w(Transport* transport) { |
+ ASSERT(worker_thread_->IsCurrent()); |
+ UpdateState_w(); |
+} |
+ |
+void TransportController::OnTransportFailed_w(Transport* transport) { |
+ ASSERT(worker_thread_->IsCurrent()); |
+ UpdateState_w(); |
+} |
+ |
+void TransportController::OnTransportCandidatesReady_w(Transport* transport, |
+ const std::vector<Candidate>& candidates) { |
+ ASSERT(worker_thread_->IsCurrent()); |
+ CandidatesData* data = new CandidatesData(transport->content_name(), |
+ candidates); |
+ signaling_thread_->Post(this, MSG_CANDIDATESREADY, data); |
+} |
+ |
+void TransportController::UpdateState_w() { |
+ ASSERT(worker_thread_->IsCurrent()); |
+ |
+ ConnectionState new_state = kConnecting; |
+ bool receiving = false; |
+ // If we don't have ANY Transports, we shouldn't signal that we're completed |
+ if (!transports_.empty()) { |
+ bool failed = false; |
+ bool connected = true; |
+ bool completed = true; |
+ for (const auto& kv : transports_) { |
+ if (!kv.second->Completed()) { |
+ completed = false; |
+ } |
+ if (!kv.second->all_channels_writable()) { |
+ connected = false; |
+ } |
+ if (kv.second->any_channel_receiving()) { |
+ // The connection is considered receiving if at least one transport is |
+ // receiving on any channel. |
+ receiving = true; |
+ } |
+ } |
+ |
+ if (failed) { |
+ new_state = kFailed; |
+ } else if (completed) { |
+ new_state = kCompleted; |
+ } else if (connected) { |
+ new_state = kConnected; |
+ } |
+ } |
+ |
+ if (connection_state_ != new_state) { |
+ connection_state_ = new_state; |
+ signaling_thread_->Post( |
+ this, MSG_CONNECTIONSTATE, |
+ new rtc::TypedMessageData<ConnectionState>(new_state)); |
+ } |
+ |
+ if (receiving_ != receiving) { |
+ receiving_ = receiving; |
+ signaling_thread_->Post(this, MSG_RECEIVING, |
+ new rtc::TypedMessageData<bool>(receiving)); |
+ } |
+} |
+ |
+void TransportController::CheckIfCandidatesAllocationDone_w() { |
+ ASSERT(worker_thread_->IsCurrent()); |
+ |
+ // If already allocated, no point in checking. |
+ if (candidate_gathering_state_ == Transport::kGatheringDone) { |
+ return; |
+ } |
+ |
+ if (transports_.empty()) { |
+ return; |
+ } |
+ |
+ for (const auto& kv : transports_) { |
+ if (kv.second->candidate_gathering_state() != Transport::kGatheringDone) { |
+ return; |
+ } |
+ } |
+ |
+ candidate_gathering_state_ = Transport::kGatheringDone; |
+ signaling_thread_->Post(this, MSG_CANDIDATESALLOCATIONDONE); |
+} |
+ |
+} // namespace cricket |