Index: webrtc/p2p/base/transport.cc |
diff --git a/webrtc/p2p/base/transport.cc b/webrtc/p2p/base/transport.cc |
index d626ad3d65acf284a6022428cbfb43615a6d07e3..b7aba7540a5699e165994a143bbae9c5852ec5a8 100644 |
--- a/webrtc/p2p/base/transport.cc |
+++ b/webrtc/p2p/base/transport.cc |
@@ -8,6 +8,8 @@ |
* be found in the AUTHORS file in the root of the source tree. |
*/ |
+#include <utility> // for std::pair |
+ |
#include "webrtc/p2p/base/transport.h" |
#include "webrtc/p2p/base/candidate.h" |
@@ -21,39 +23,6 @@ |
namespace cricket { |
using rtc::Bind; |
- |
-enum { |
- MSG_ONSIGNALINGREADY = 1, |
- MSG_ONREMOTECANDIDATE, |
- MSG_WRITESTATE, |
- MSG_REQUESTSIGNALING, |
- MSG_CANDIDATEREADY, |
- MSG_ROUTECHANGE, |
- MSG_CONNECTING, |
- MSG_CANDIDATEALLOCATIONCOMPLETE, |
- MSG_ROLECONFLICT, |
- MSG_COMPLETED, |
- MSG_FAILED, |
- MSG_RECEIVINGSTATE, |
-}; |
- |
-struct ChannelParams : public rtc::MessageData { |
- ChannelParams() : channel(NULL), candidate(NULL) {} |
- explicit ChannelParams(int component) |
- : component(component), channel(NULL), candidate(NULL) {} |
- explicit ChannelParams(Candidate* candidate) |
- : channel(NULL), candidate(candidate) { |
- } |
- |
- ~ChannelParams() { |
- delete candidate; |
- } |
- |
- std::string name; |
- int component; |
- TransportChannelImpl* channel; |
- Candidate* candidate; |
-}; |
static bool VerifyIceParams(const TransportDescription& desc) { |
// For legacy protocols. |
@@ -96,58 +65,59 @@ |
new_desc.ice_ufrag, new_desc.ice_pwd); |
} |
-Transport::Transport(rtc::Thread* signaling_thread, |
- rtc::Thread* worker_thread, |
- const std::string& content_name, |
- PortAllocator* allocator) |
- : signaling_thread_(signaling_thread), |
- worker_thread_(worker_thread), |
- content_name_(content_name), |
- allocator_(allocator), |
- destroyed_(false), |
- readable_(TRANSPORT_STATE_NONE), |
- writable_(TRANSPORT_STATE_NONE), |
- receiving_(TRANSPORT_STATE_NONE), |
- was_writable_(false), |
- connect_requested_(false), |
- ice_role_(ICEROLE_UNKNOWN), |
- tiebreaker_(0), |
- remote_ice_mode_(ICEMODE_FULL), |
- channel_receiving_timeout_(-1) { |
-} |
+Transport::Transport(const std::string& name, PortAllocator* allocator) |
+ : name_(name), allocator_(allocator) {} |
Transport::~Transport() { |
- ASSERT(signaling_thread_->IsCurrent()); |
- ASSERT(destroyed_); |
+ ASSERT(channels_destroyed_); |
+} |
+ |
+bool Transport::AllChannelsCompleted() const { |
+ // We aren't completed until at least one channel is complete, so if there |
+ // are no channels, we aren't complete yet. |
+ if (channels_.empty()) { |
+ LOG(LS_INFO) << name() << " transport is not complete" |
+ << " because it has no TransportChannels"; |
+ return false; |
+ } |
+ |
+ // A Transport's ICE process is completed if all of its channels are writable, |
+ // have finished allocating candidates, and have pruned all but one of their |
+ // connections. |
+ for (const auto& iter : channels_) { |
+ const TransportChannelImpl* channel = iter.second.get(); |
+ bool complete = |
+ channel->writable() && |
+ channel->GetState() == TransportChannelState::STATE_COMPLETED && |
+ channel->GetIceRole() == ICEROLE_CONTROLLING && |
+ channel->gathering_state() == kIceGatheringComplete; |
+ if (!complete) { |
+ LOG(LS_INFO) << name() << " transport is not complete" |
+ << " because a channel is still incomplete."; |
+ return false; |
+ } |
+ } |
+ |
+ return true; |
+} |
+ |
+bool Transport::AnyChannelFailed() const { |
+ for (const auto& iter : channels_) { |
+ if (iter.second->GetState() == TransportChannelState::STATE_FAILED) { |
+ return true; |
+ } |
+ } |
+ return false; |
} |
void Transport::SetIceRole(IceRole role) { |
- worker_thread_->Invoke<void>(Bind(&Transport::SetIceRole_w, this, role)); |
-} |
- |
-void Transport::SetCertificate( |
- const rtc::scoped_refptr<rtc::RTCCertificate>& certificate) { |
- worker_thread_->Invoke<void>(Bind(&Transport::SetCertificate_w, this, |
- certificate)); |
-} |
- |
-bool Transport::GetCertificate( |
- rtc::scoped_refptr<rtc::RTCCertificate>* certificate) { |
- // The identity is set on the worker thread, so for safety it must also be |
- // acquired on the worker thread. |
- return worker_thread_->Invoke<bool>( |
- Bind(&Transport::GetCertificate_w, this, certificate)); |
+ ice_role_ = role; |
+ for (auto& iter : channels_) { |
+ iter.second->SetIceRole(ice_role_); |
+ } |
} |
bool Transport::GetRemoteSSLCertificate(rtc::SSLCertificate** cert) { |
- // Channels can be deleted on the worker thread, so for safety the remote |
- // certificate is acquired on the worker thread. |
- return worker_thread_->Invoke<bool>( |
- Bind(&Transport::GetRemoteSSLCertificate_w, this, cert)); |
-} |
- |
-bool Transport::GetRemoteSSLCertificate_w(rtc::SSLCertificate** cert) { |
- ASSERT(worker_thread()->IsCurrent()); |
if (channels_.empty()) |
return false; |
@@ -156,12 +126,6 @@ |
} |
void Transport::SetChannelReceivingTimeout(int timeout_ms) { |
- worker_thread_->Invoke<void>( |
- Bind(&Transport::SetChannelReceivingTimeout_w, this, timeout_ms)); |
-} |
- |
-void Transport::SetChannelReceivingTimeout_w(int timeout_ms) { |
- ASSERT(worker_thread()->IsCurrent()); |
channel_receiving_timeout_ = timeout_ms; |
for (const auto& kv : channels_) { |
kv.second->SetReceivingTimeout(timeout_ms); |
@@ -172,35 +136,73 @@ |
const TransportDescription& description, |
ContentAction action, |
std::string* error_desc) { |
- return worker_thread_->Invoke<bool>(Bind( |
- &Transport::SetLocalTransportDescription_w, this, |
- description, action, error_desc)); |
+ bool ret = true; |
+ |
+ if (!VerifyIceParams(description)) { |
+ return BadTransportDescription("Invalid ice-ufrag or ice-pwd length", |
+ error_desc); |
+ } |
+ |
+ if (local_description_ && |
+ IceCredentialsChanged(*local_description_, description)) { |
+ IceRole new_ice_role = |
+ (action == CA_OFFER) ? ICEROLE_CONTROLLING : ICEROLE_CONTROLLED; |
+ |
+ // It must be called before ApplyLocalTransportDescription, which may |
+ // trigger an ICE restart and depends on the new ICE role. |
+ SetIceRole(new_ice_role); |
+ } |
+ |
+ local_description_.reset(new TransportDescription(description)); |
+ |
+ for (auto& iter : channels_) { |
+ ret &= ApplyLocalTransportDescription(iter.second.get(), error_desc); |
+ } |
+ if (!ret) { |
+ return false; |
+ } |
+ |
+ // If PRANSWER/ANSWER is set, we should decide transport protocol type. |
+ if (action == CA_PRANSWER || action == CA_ANSWER) { |
+ ret &= NegotiateTransportDescription(action, error_desc); |
+ } |
+ if (ret) { |
+ local_description_set_ = true; |
+ ConnectChannels(); |
+ } |
+ |
+ return ret; |
} |
bool Transport::SetRemoteTransportDescription( |
const TransportDescription& description, |
ContentAction action, |
std::string* error_desc) { |
- return worker_thread_->Invoke<bool>(Bind( |
- &Transport::SetRemoteTransportDescription_w, this, |
- description, action, error_desc)); |
+ bool ret = true; |
+ |
+ if (!VerifyIceParams(description)) { |
+ return BadTransportDescription("Invalid ice-ufrag or ice-pwd length", |
+ error_desc); |
+ } |
+ |
+ remote_description_.reset(new TransportDescription(description)); |
+ for (auto& iter : channels_) { |
+ ret &= ApplyRemoteTransportDescription(iter.second.get(), error_desc); |
+ } |
+ |
+ // If PRANSWER/ANSWER is set, we should decide transport protocol type. |
+ if (action == CA_PRANSWER || action == CA_ANSWER) { |
+ ret = NegotiateTransportDescription(CA_OFFER, error_desc); |
+ } |
+ if (ret) { |
+ remote_description_set_ = true; |
+ } |
+ |
+ return ret; |
} |
TransportChannelImpl* Transport::CreateChannel(int component) { |
- return worker_thread_->Invoke<TransportChannelImpl*>(Bind( |
- &Transport::CreateChannel_w, this, component)); |
-} |
- |
-TransportChannelImpl* Transport::CreateChannel_w(int component) { |
- ASSERT(worker_thread()->IsCurrent()); |
TransportChannelImpl* impl; |
- // TODO(tommi): We don't really need to grab the lock until the actual call |
- // to insert() below and presumably hold it throughout initialization of |
- // |impl| after the impl_exists check. Maybe we can factor that out to |
- // a separate function and not grab the lock in this function. |
- // Actually, we probably don't need to hold the lock while initializing |
- // |impl| since we can just do the insert when that's done. |
- rtc::CritScope cs(&crit_); |
// Create the entry if it does not exist. |
bool impl_exists = false; |
@@ -216,7 +218,7 @@ |
// Increase the ref count. |
iterator->second.AddRef(); |
- destroyed_ = false; |
+ channels_destroyed_ = false; |
if (impl_exists) { |
// If this is an existing channel, we should just return it without |
@@ -228,23 +230,21 @@ |
impl->SetIceRole(ice_role_); |
impl->SetIceTiebreaker(tiebreaker_); |
impl->SetReceivingTimeout(channel_receiving_timeout_); |
- // TODO(ronghuawu): Change CreateChannel_w to be able to return error since |
- // below Apply**Description_w calls can fail. |
+ // TODO(ronghuawu): Change CreateChannel to be able to return error since |
+ // below Apply**Description calls can fail. |
if (local_description_) |
- ApplyLocalTransportDescription_w(impl, NULL); |
+ ApplyLocalTransportDescription(impl, NULL); |
if (remote_description_) |
- ApplyRemoteTransportDescription_w(impl, NULL); |
+ ApplyRemoteTransportDescription(impl, NULL); |
if (local_description_ && remote_description_) |
- ApplyNegotiatedTransportDescription_w(impl, NULL); |
+ ApplyNegotiatedTransportDescription(impl, NULL); |
impl->SignalWritableState.connect(this, &Transport::OnChannelWritableState); |
impl->SignalReceivingState.connect(this, &Transport::OnChannelReceivingState); |
- impl->SignalRequestSignaling.connect( |
- this, &Transport::OnChannelRequestSignaling); |
- impl->SignalCandidateReady.connect(this, &Transport::OnChannelCandidateReady); |
+ impl->SignalGatheringState.connect(this, &Transport::OnChannelGatheringState); |
+ impl->SignalCandidateGathered.connect(this, |
+ &Transport::OnChannelCandidateGathered); |
impl->SignalRouteChange.connect(this, &Transport::OnChannelRouteChange); |
- impl->SignalCandidatesAllocationDone.connect( |
- this, &Transport::OnChannelCandidatesAllocationDone); |
impl->SignalRoleConflict.connect(this, &Transport::OnRoleConflict); |
impl->SignalConnectionRemoved.connect( |
this, &Transport::OnChannelConnectionRemoved); |
@@ -254,36 +254,22 @@ |
if (channels_.size() == 1) { |
// If this is the first channel, then indicate that we have started |
// connecting. |
- signaling_thread()->Post(this, MSG_CONNECTING, NULL); |
+ SignalConnecting(this); |
} |
} |
return impl; |
} |
TransportChannelImpl* Transport::GetChannel(int component) { |
- // TODO(tommi,pthatcher): Since we're returning a pointer from the channels_ |
- // map, shouldn't we assume that we're on the worker thread? (The pointer |
- // will be used outside of the lock). |
- // And if we're on the worker thread, which is the only thread that modifies |
- // channels_, can we skip grabbing the lock? |
- rtc::CritScope cs(&crit_); |
ChannelMap::iterator iter = channels_.find(component); |
return (iter != channels_.end()) ? iter->second.get() : NULL; |
} |
bool Transport::HasChannels() { |
- rtc::CritScope cs(&crit_); |
return !channels_.empty(); |
} |
void Transport::DestroyChannel(int component) { |
- worker_thread_->Invoke<void>(Bind( |
- &Transport::DestroyChannel_w, this, component)); |
-} |
- |
-void Transport::DestroyChannel_w(int component) { |
- ASSERT(worker_thread()->IsCurrent()); |
- |
ChannelMap::iterator iter = channels_.find(component); |
if (iter == channels_.end()) |
return; |
@@ -293,34 +279,30 @@ |
iter->second.DecRef(); |
if (!iter->second.ref()) { |
impl = iter->second.get(); |
- rtc::CritScope cs(&crit_); |
channels_.erase(iter); |
} |
if (connect_requested_ && channels_.empty()) { |
// We're no longer attempting to connect. |
- signaling_thread()->Post(this, MSG_CONNECTING, NULL); |
+ SignalConnecting(this); |
} |
if (impl) { |
- // Check in case the deleted channel was the only non-writable channel. |
- OnChannelWritableState(impl); |
DestroyTransportChannel(impl); |
+ // Need to update aggregate state after destroying a channel, |
+ // for example if it was the only one that wasn't yet writable. |
+ UpdateWritableState(); |
+ UpdateReceivingState(); |
+ UpdateGatheringState(); |
+ MaybeSignalCompleted(); |
} |
} |
void Transport::ConnectChannels() { |
- ASSERT(signaling_thread()->IsCurrent()); |
- worker_thread_->Invoke<void>(Bind(&Transport::ConnectChannels_w, this)); |
-} |
- |
-void Transport::ConnectChannels_w() { |
- ASSERT(worker_thread()->IsCurrent()); |
if (connect_requested_ || channels_.empty()) |
return; |
connect_requested_ = true; |
- signaling_thread()->Post(this, MSG_CANDIDATEREADY, NULL); |
if (!local_description_) { |
// TOOD(mallinath) : TransportDescription(TD) shouldn't be generated here. |
@@ -329,38 +311,28 @@ |
// Session. |
// Session must generate local TD before remote candidates pushed when |
// initiate request initiated by the remote. |
- LOG(LS_INFO) << "Transport::ConnectChannels_w: No local description has " |
+ LOG(LS_INFO) << "Transport::ConnectChannels: No local description has " |
<< "been set. Will generate one."; |
- TransportDescription desc(std::vector<std::string>(), |
- rtc::CreateRandomString(ICE_UFRAG_LENGTH), |
- rtc::CreateRandomString(ICE_PWD_LENGTH), |
- ICEMODE_FULL, CONNECTIONROLE_NONE, NULL, |
- Candidates()); |
- SetLocalTransportDescription_w(desc, CA_OFFER, NULL); |
- } |
- |
- CallChannels_w(&TransportChannelImpl::Connect); |
- if (!channels_.empty()) { |
- signaling_thread()->Post(this, MSG_CONNECTING, NULL); |
- } |
-} |
- |
-void Transport::OnConnecting_s() { |
- ASSERT(signaling_thread()->IsCurrent()); |
- SignalConnecting(this); |
+ TransportDescription desc( |
+ std::vector<std::string>(), rtc::CreateRandomString(ICE_UFRAG_LENGTH), |
+ rtc::CreateRandomString(ICE_PWD_LENGTH), ICEMODE_FULL, |
+ CONNECTIONROLE_NONE, NULL, Candidates()); |
+ SetLocalTransportDescription(desc, CA_OFFER, NULL); |
+ } |
+ |
+ CallChannels(&TransportChannelImpl::Connect); |
+ if (HasChannels()) { |
+ SignalConnecting(this); |
+ } |
+} |
+ |
+void Transport::MaybeStartGathering() { |
+ if (connect_requested_) { |
+ CallChannels(&TransportChannelImpl::MaybeStartGathering); |
+ } |
} |
void Transport::DestroyAllChannels() { |
- ASSERT(signaling_thread()->IsCurrent()); |
- worker_thread_->Invoke<void>(Bind(&Transport::DestroyAllChannels_w, this)); |
- worker_thread()->Clear(this); |
- signaling_thread()->Clear(this); |
- destroyed_ = true; |
-} |
- |
-void Transport::DestroyAllChannels_w() { |
- ASSERT(worker_thread()->IsCurrent()); |
- |
std::vector<TransportChannelImpl*> impls; |
for (auto& iter : channels_) { |
iter.second.DecRef(); |
@@ -368,27 +340,15 @@ |
impls.push_back(iter.second.get()); |
} |
- { |
- rtc::CritScope cs(&crit_); |
- channels_.clear(); |
- } |
- |
- for (size_t i = 0; i < impls.size(); ++i) |
- DestroyTransportChannel(impls[i]); |
-} |
- |
-void Transport::OnSignalingReady() { |
- ASSERT(signaling_thread()->IsCurrent()); |
- if (destroyed_) return; |
- |
- worker_thread()->Post(this, MSG_ONSIGNALINGREADY, NULL); |
- |
- // Notify the subclass. |
- OnTransportSignalingReady(); |
-} |
- |
-void Transport::CallChannels_w(TransportChannelFunc func) { |
- ASSERT(worker_thread()->IsCurrent()); |
+ channels_.clear(); |
+ |
+ for (TransportChannelImpl* impl : impls) { |
+ DestroyTransportChannel(impl); |
+ } |
+ channels_destroyed_ = true; |
+} |
+ |
+void Transport::CallChannels(TransportChannelFunc func) { |
for (const auto& iter : channels_) { |
((iter.second.get())->*func)(); |
} |
@@ -427,14 +387,7 @@ |
bool Transport::GetStats(TransportStats* stats) { |
- ASSERT(signaling_thread()->IsCurrent()); |
- return worker_thread_->Invoke<bool>(Bind( |
- &Transport::GetStats_w, this, stats)); |
-} |
- |
-bool Transport::GetStats_w(TransportStats* stats) { |
- ASSERT(worker_thread()->IsCurrent()); |
- stats->content_name = content_name(); |
+ stats->transport_name = name(); |
stats->channel_stats.clear(); |
for (auto iter : channels_) { |
ChannelMapEntry& entry = iter.second; |
@@ -450,82 +403,45 @@ |
return true; |
} |
-bool Transport::GetSslRole(rtc::SSLRole* ssl_role) const { |
- return worker_thread_->Invoke<bool>(Bind( |
- &Transport::GetSslRole_w, this, ssl_role)); |
-} |
- |
-bool Transport::SetSslMaxProtocolVersion(rtc::SSLProtocolVersion version) { |
- return worker_thread_->Invoke<bool>(Bind( |
- &Transport::SetSslMaxProtocolVersion_w, this, version)); |
-} |
- |
-void Transport::OnRemoteCandidates(const std::vector<Candidate>& candidates) { |
+bool Transport::AddRemoteCandidates(const std::vector<Candidate>& candidates, |
+ std::string* error) { |
+ ASSERT(!channels_destroyed_); |
+ // Verify each candidate before passing down to transport layer. |
+ for (const Candidate& cand : candidates) { |
+ if (!VerifyCandidate(cand, error)) { |
+ return false; |
+ } |
+ if (!HasChannel(cand.component())) { |
+ *error = "Candidate has unknown component: " + cand.ToString() + |
+ " for content: " + name(); |
+ return false; |
+ } |
+ } |
+ |
for (std::vector<Candidate>::const_iterator iter = candidates.begin(); |
iter != candidates.end(); |
++iter) { |
- OnRemoteCandidate(*iter); |
- } |
-} |
- |
-void Transport::OnRemoteCandidate(const Candidate& candidate) { |
- ASSERT(signaling_thread()->IsCurrent()); |
- if (destroyed_) return; |
- |
- if (!HasChannel(candidate.component())) { |
- LOG(LS_WARNING) << "Ignoring candidate for unknown component " |
- << candidate.component(); |
- return; |
- } |
- |
- ChannelParams* params = new ChannelParams(new Candidate(candidate)); |
- worker_thread()->Post(this, MSG_ONREMOTECANDIDATE, params); |
-} |
- |
-void Transport::OnRemoteCandidate_w(const Candidate& candidate) { |
- ASSERT(worker_thread()->IsCurrent()); |
- ChannelMap::iterator iter = channels_.find(candidate.component()); |
- // It's ok for a channel to go away while this message is in transit. |
- if (iter != channels_.end()) { |
- iter->second->OnCandidate(candidate); |
- } |
+ TransportChannelImpl* channel = GetChannel(iter->component()); |
+ if (channel != NULL) { |
+ channel->AddRemoteCandidate(*iter); |
+ } |
+ } |
+ return true; |
} |
void Transport::OnChannelWritableState(TransportChannel* channel) { |
- ASSERT(worker_thread()->IsCurrent()); |
- signaling_thread()->Post(this, MSG_WRITESTATE, NULL); |
- |
- MaybeCompleted_w(); |
-} |
- |
-void Transport::OnChannelWritableState_s() { |
- ASSERT(signaling_thread()->IsCurrent()); |
- TransportState writable = GetTransportState_s(TRANSPORT_WRITABLE_STATE); |
- if (writable_ != writable) { |
- was_writable_ = (writable_ == TRANSPORT_STATE_ALL); |
- writable_ = writable; |
- SignalWritableState(this); |
- } |
+ LOG(LS_INFO) << name() << " TransportChannel " << channel->component() |
+ << " writability changed to " << channel->writable() |
+ << ". Check if transport is complete."; |
+ UpdateWritableState(); |
+ MaybeSignalCompleted(); |
} |
void Transport::OnChannelReceivingState(TransportChannel* channel) { |
- ASSERT(worker_thread()->IsCurrent()); |
- signaling_thread()->Post(this, MSG_RECEIVINGSTATE); |
-} |
- |
-void Transport::OnChannelReceivingState_s() { |
- ASSERT(signaling_thread()->IsCurrent()); |
- TransportState receiving = GetTransportState_s(TRANSPORT_RECEIVING_STATE); |
- if (receiving_ != receiving) { |
- receiving_ = receiving; |
- SignalReceivingState(this); |
- } |
-} |
- |
-TransportState Transport::GetTransportState_s(TransportStateType state_type) { |
- ASSERT(signaling_thread()->IsCurrent()); |
- |
- rtc::CritScope cs(&crit_); |
+ UpdateReceivingState(); |
+} |
+ |
+TransportState Transport::GetTransportState(TransportStateType state_type) { |
bool any = false; |
bool all = !channels_.empty(); |
for (const auto iter : channels_) { |
@@ -553,106 +469,44 @@ |
return TRANSPORT_STATE_NONE; |
} |
-void Transport::OnChannelRequestSignaling(TransportChannelImpl* channel) { |
- ASSERT(worker_thread()->IsCurrent()); |
- // Resetting ICE state for the channel. |
- ChannelMap::iterator iter = channels_.find(channel->component()); |
- if (iter != channels_.end()) |
- iter->second.set_candidates_allocated(false); |
- signaling_thread()->Post(this, MSG_REQUESTSIGNALING, nullptr); |
-} |
- |
-void Transport::OnChannelRequestSignaling_s() { |
- ASSERT(signaling_thread()->IsCurrent()); |
- LOG(LS_INFO) << "Transport: " << content_name_ << ", allocating candidates"; |
- SignalRequestSignaling(this); |
-} |
- |
-void Transport::OnChannelCandidateReady(TransportChannelImpl* channel, |
- const Candidate& candidate) { |
+void Transport::OnChannelGatheringState(TransportChannelImpl* channel) { |
+ ASSERT(channels_.find(channel->component()) != channels_.end()); |
+ UpdateGatheringState(); |
+ if (gathering_state_ == kIceGatheringComplete) { |
+ // If UpdateGatheringState brought us to kIceGatheringComplete, check if |
+ // our connection state is also "Completed". Otherwise, there's no point in |
+ // checking (since it would only produce log messages). |
+ MaybeSignalCompleted(); |
+ } |
+} |
+ |
+void Transport::OnChannelCandidateGathered(TransportChannelImpl* channel, |
+ const Candidate& candidate) { |
// We should never signal peer-reflexive candidates. |
if (candidate.type() == PRFLX_PORT_TYPE) { |
ASSERT(false); |
return; |
} |
- ASSERT(worker_thread()->IsCurrent()); |
- rtc::CritScope cs(&crit_); |
- ready_candidates_.push_back(candidate); |
- |
- // We hold any messages until the client lets us connect. |
- if (connect_requested_) { |
- signaling_thread()->Post( |
- this, MSG_CANDIDATEREADY, NULL); |
- } |
-} |
- |
-void Transport::OnChannelCandidateReady_s() { |
- ASSERT(signaling_thread()->IsCurrent()); |
ASSERT(connect_requested_); |
- |
std::vector<Candidate> candidates; |
- { |
- rtc::CritScope cs(&crit_); |
- candidates.swap(ready_candidates_); |
- } |
- |
- // we do the deleting of Candidate* here to keep the new above and |
- // delete below close to each other |
- if (!candidates.empty()) { |
- SignalCandidatesReady(this, candidates); |
- } |
+ candidates.push_back(candidate); |
+ SignalCandidatesGathered(this, candidates); |
} |
void Transport::OnChannelRouteChange(TransportChannel* channel, |
const Candidate& remote_candidate) { |
- ASSERT(worker_thread()->IsCurrent()); |
- ChannelParams* params = new ChannelParams(new Candidate(remote_candidate)); |
- params->channel = static_cast<cricket::TransportChannelImpl*>(channel); |
- signaling_thread()->Post(this, MSG_ROUTECHANGE, params); |
-} |
- |
-void Transport::OnChannelRouteChange_s(const TransportChannel* channel, |
- const Candidate& remote_candidate) { |
- ASSERT(signaling_thread()->IsCurrent()); |
SignalRouteChange(this, remote_candidate.component(), remote_candidate); |
} |
-void Transport::OnChannelCandidatesAllocationDone( |
- TransportChannelImpl* channel) { |
- ASSERT(worker_thread()->IsCurrent()); |
- ChannelMap::iterator iter = channels_.find(channel->component()); |
- ASSERT(iter != channels_.end()); |
- LOG(LS_INFO) << "Transport: " << content_name_ << ", component " |
- << channel->component() << " allocation complete"; |
- |
- iter->second.set_candidates_allocated(true); |
- |
- // If all channels belonging to this Transport got signal, then |
- // forward this signal to upper layer. |
- // Can this signal arrive before all transport channels are created? |
- for (auto& iter : channels_) { |
- if (!iter.second.candidates_allocated()) |
- return; |
- } |
- signaling_thread_->Post(this, MSG_CANDIDATEALLOCATIONCOMPLETE); |
- |
- MaybeCompleted_w(); |
-} |
- |
-void Transport::OnChannelCandidatesAllocationDone_s() { |
- ASSERT(signaling_thread()->IsCurrent()); |
- LOG(LS_INFO) << "Transport: " << content_name_ << " allocation complete"; |
- SignalCandidatesAllocationDone(this); |
-} |
- |
void Transport::OnRoleConflict(TransportChannelImpl* channel) { |
- signaling_thread_->Post(this, MSG_ROLECONFLICT); |
+ SignalRoleConflict(); |
} |
void Transport::OnChannelConnectionRemoved(TransportChannelImpl* channel) { |
- ASSERT(worker_thread()->IsCurrent()); |
- MaybeCompleted_w(); |
+ LOG(LS_INFO) << name() << " TransportChannel " << channel->component() |
+ << " connection removed. Check if transport is complete."; |
+ MaybeSignalCompleted(); |
// Check if the state is now Failed. |
// Failed is only available in the Controlling ICE role. |
@@ -660,158 +514,97 @@ |
return; |
} |
- ChannelMap::iterator iter = channels_.find(channel->component()); |
- ASSERT(iter != channels_.end()); |
- // Failed can only occur after candidate allocation has stopped. |
- if (!iter->second.candidates_allocated()) { |
+ // Failed can only occur after candidate gathering has stopped. |
+ if (channel->gathering_state() != kIceGatheringComplete) { |
return; |
} |
if (channel->GetState() == TransportChannelState::STATE_FAILED) { |
// A Transport has failed if any of its channels have no remaining |
// connections. |
- signaling_thread_->Post(this, MSG_FAILED); |
- } |
-} |
- |
-void Transport::MaybeCompleted_w() { |
- ASSERT(worker_thread()->IsCurrent()); |
- |
- // When there is no channel created yet, calling this function could fire an |
- // IceConnectionCompleted event prematurely. |
- if (channels_.empty()) { |
- return; |
- } |
- |
- // A Transport's ICE process is completed if all of its channels are writable, |
- // have finished allocating candidates, and have pruned all but one of their |
- // connections. |
- for (const auto& iter : channels_) { |
- const TransportChannelImpl* channel = iter.second.get(); |
- if (!(channel->writable() && |
- channel->GetState() == TransportChannelState::STATE_COMPLETED && |
- channel->GetIceRole() == ICEROLE_CONTROLLING && |
- iter.second.candidates_allocated())) { |
- return; |
- } |
- } |
- |
- signaling_thread_->Post(this, MSG_COMPLETED); |
-} |
- |
-void Transport::SetIceRole_w(IceRole role) { |
- ASSERT(worker_thread()->IsCurrent()); |
- rtc::CritScope cs(&crit_); |
- ice_role_ = role; |
- for (auto& iter : channels_) { |
- iter.second->SetIceRole(ice_role_); |
- } |
-} |
- |
-void Transport::SetRemoteIceMode_w(IceMode mode) { |
- ASSERT(worker_thread()->IsCurrent()); |
- remote_ice_mode_ = mode; |
- // Shouldn't channels be created after this method executed? |
- for (auto& iter : channels_) { |
- iter.second->SetRemoteIceMode(remote_ice_mode_); |
- } |
-} |
- |
-bool Transport::SetLocalTransportDescription_w( |
- const TransportDescription& desc, |
- ContentAction action, |
- std::string* error_desc) { |
- ASSERT(worker_thread()->IsCurrent()); |
- bool ret = true; |
- |
- if (!VerifyIceParams(desc)) { |
- return BadTransportDescription("Invalid ice-ufrag or ice-pwd length", |
- error_desc); |
- } |
- |
- // TODO(tommi,pthatcher): I'm not sure why we need to grab this lock at this |
- // point. |local_description_| seems to always be modified on the worker |
- // thread, so we should be able to use it here without grabbing the lock. |
- // However, we _might_ need it before the call to reset() below? |
- // Raw access to |local_description_| is granted to derived transports outside |
- // of locking (see local_description() in the header file). |
- // The contract is that the derived implementations must be aware of when the |
- // description might change and do appropriate synchronization. |
- rtc::CritScope cs(&crit_); |
- if (local_description_ && IceCredentialsChanged(*local_description_, desc)) { |
- IceRole new_ice_role = (action == CA_OFFER) ? ICEROLE_CONTROLLING |
- : ICEROLE_CONTROLLED; |
- |
- // It must be called before ApplyLocalTransportDescription_w, which may |
- // trigger an ICE restart and depends on the new ICE role. |
- SetIceRole_w(new_ice_role); |
- } |
- |
- local_description_.reset(new TransportDescription(desc)); |
- |
- for (auto& iter : channels_) { |
- ret &= ApplyLocalTransportDescription_w(iter.second.get(), error_desc); |
- } |
- if (!ret) |
- return false; |
- |
- // If PRANSWER/ANSWER is set, we should decide transport protocol type. |
- if (action == CA_PRANSWER || action == CA_ANSWER) { |
- ret &= NegotiateTransportDescription_w(action, error_desc); |
- } |
- return ret; |
-} |
- |
-bool Transport::SetRemoteTransportDescription_w( |
- const TransportDescription& desc, |
- ContentAction action, |
- std::string* error_desc) { |
- bool ret = true; |
- |
- if (!VerifyIceParams(desc)) { |
- return BadTransportDescription("Invalid ice-ufrag or ice-pwd length", |
- error_desc); |
- } |
- |
- // TODO(tommi,pthatcher): See todo for local_description_ above. |
- rtc::CritScope cs(&crit_); |
- remote_description_.reset(new TransportDescription(desc)); |
- for (auto& iter : channels_) { |
- ret &= ApplyRemoteTransportDescription_w(iter.second.get(), error_desc); |
- } |
- |
- // If PRANSWER/ANSWER is set, we should decide transport protocol type. |
- if (action == CA_PRANSWER || action == CA_ANSWER) { |
- ret = NegotiateTransportDescription_w(CA_OFFER, error_desc); |
- } |
- return ret; |
-} |
- |
-bool Transport::ApplyLocalTransportDescription_w(TransportChannelImpl* ch, |
- std::string* error_desc) { |
- ASSERT(worker_thread()->IsCurrent()); |
+ SignalFailed(this); |
+ } |
+} |
+ |
+void Transport::MaybeSignalCompleted() { |
+ if (AllChannelsCompleted()) { |
+ LOG(LS_INFO) << name() << " transport is complete" |
+ << " because all the channels are complete."; |
+ SignalCompleted(this); |
+ } |
+ // TODO(deadbeef): Should we do anything if we previously were completed, |
+ // but now are not (if, for example, a new remote candidate is added)? |
+} |
+ |
+void Transport::UpdateGatheringState() { |
+ IceGatheringState new_state = kIceGatheringNew; |
+ bool any_gathering = false; |
+ bool all_complete = !channels_.empty(); |
+ for (const auto& kv : channels_) { |
+ any_gathering = |
+ any_gathering || kv.second->gathering_state() != kIceGatheringNew; |
+ all_complete = |
+ all_complete && kv.second->gathering_state() == kIceGatheringComplete; |
+ } |
+ if (all_complete) { |
+ new_state = kIceGatheringComplete; |
+ } else if (any_gathering) { |
+ new_state = kIceGatheringGathering; |
+ } |
+ |
+ if (gathering_state_ != new_state) { |
+ gathering_state_ = new_state; |
+ if (gathering_state_ == kIceGatheringGathering) { |
+ LOG(LS_INFO) << "Transport: " << name_ << ", gathering candidates"; |
+ } else if (gathering_state_ == kIceGatheringComplete) { |
+ LOG(LS_INFO) << "Transport " << name() << " gathering complete."; |
+ } |
+ SignalGatheringState(this); |
+ } |
+} |
+ |
+void Transport::UpdateReceivingState() { |
+ TransportState receiving = GetTransportState(TRANSPORT_RECEIVING_STATE); |
+ if (receiving_ != receiving) { |
+ receiving_ = receiving; |
+ SignalReceivingState(this); |
+ } |
+} |
+ |
+void Transport::UpdateWritableState() { |
+ TransportState writable = GetTransportState(TRANSPORT_WRITABLE_STATE); |
+ LOG(LS_INFO) << name() << " transport writable state changed? " << writable_ |
+ << " => " << writable; |
+ if (writable_ != writable) { |
+ was_writable_ = (writable_ == TRANSPORT_STATE_ALL); |
+ writable_ = writable; |
+ SignalWritableState(this); |
+ } |
+} |
+ |
+bool Transport::ApplyLocalTransportDescription(TransportChannelImpl* ch, |
+ std::string* error_desc) { |
ch->SetIceCredentials(local_description_->ice_ufrag, |
local_description_->ice_pwd); |
return true; |
} |
-bool Transport::ApplyRemoteTransportDescription_w(TransportChannelImpl* ch, |
- std::string* error_desc) { |
+bool Transport::ApplyRemoteTransportDescription(TransportChannelImpl* ch, |
+ std::string* error_desc) { |
ch->SetRemoteIceCredentials(remote_description_->ice_ufrag, |
remote_description_->ice_pwd); |
return true; |
} |
-bool Transport::ApplyNegotiatedTransportDescription_w( |
- TransportChannelImpl* channel, std::string* error_desc) { |
- ASSERT(worker_thread()->IsCurrent()); |
+bool Transport::ApplyNegotiatedTransportDescription( |
+ TransportChannelImpl* channel, |
+ std::string* error_desc) { |
channel->SetRemoteIceMode(remote_ice_mode_); |
return true; |
} |
-bool Transport::NegotiateTransportDescription_w(ContentAction local_role, |
- std::string* error_desc) { |
- ASSERT(worker_thread()->IsCurrent()); |
+bool Transport::NegotiateTransportDescription(ContentAction local_role, |
+ std::string* error_desc) { |
// TODO(ekr@rtfm.com): This is ICE-specific stuff. Refactor into |
// P2PTransport. |
@@ -819,7 +612,7 @@ |
// ice_lite, this local end point should take CONTROLLING role. |
if (ice_role_ == ICEROLE_CONTROLLED && |
remote_description_->ice_mode == ICEMODE_LITE) { |
- SetIceRole_w(ICEROLE_CONTROLLING); |
+ SetIceRole(ICEROLE_CONTROLLING); |
} |
// Update remote ice_mode to all existing channels. |
@@ -831,57 +624,10 @@ |
// creation, we have the negotiation state saved until a new |
// negotiation happens. |
for (auto& iter : channels_) { |
- if (!ApplyNegotiatedTransportDescription_w(iter.second.get(), error_desc)) |
+ if (!ApplyNegotiatedTransportDescription(iter.second.get(), error_desc)) |
return false; |
} |
return true; |
} |
-void Transport::OnMessage(rtc::Message* msg) { |
- switch (msg->message_id) { |
- case MSG_ONSIGNALINGREADY: |
- CallChannels_w(&TransportChannelImpl::OnSignalingReady); |
- break; |
- case MSG_ONREMOTECANDIDATE: { |
- ChannelParams* params = static_cast<ChannelParams*>(msg->pdata); |
- OnRemoteCandidate_w(*params->candidate); |
- delete params; |
- } |
- break; |
- case MSG_CONNECTING: |
- OnConnecting_s(); |
- break; |
- case MSG_WRITESTATE: |
- OnChannelWritableState_s(); |
- break; |
- case MSG_RECEIVINGSTATE: |
- OnChannelReceivingState_s(); |
- break; |
- case MSG_REQUESTSIGNALING: |
- OnChannelRequestSignaling_s(); |
- break; |
- case MSG_CANDIDATEREADY: |
- OnChannelCandidateReady_s(); |
- break; |
- case MSG_ROUTECHANGE: { |
- ChannelParams* params = static_cast<ChannelParams*>(msg->pdata); |
- OnChannelRouteChange_s(params->channel, *params->candidate); |
- delete params; |
- } |
- break; |
- case MSG_CANDIDATEALLOCATIONCOMPLETE: |
- OnChannelCandidatesAllocationDone_s(); |
- break; |
- case MSG_ROLECONFLICT: |
- SignalRoleConflict(); |
- break; |
- case MSG_COMPLETED: |
- SignalCompleted(this); |
- break; |
- case MSG_FAILED: |
- SignalFailed(this); |
- break; |
- } |
-} |
- |
} // namespace cricket |