Index: webrtc/p2p/base/transport.cc |
diff --git a/webrtc/p2p/base/transport.cc b/webrtc/p2p/base/transport.cc |
index b7aba7540a5699e165994a143bbae9c5852ec5a8..d626ad3d65acf284a6022428cbfb43615a6d07e3 100644 |
--- a/webrtc/p2p/base/transport.cc |
+++ b/webrtc/p2p/base/transport.cc |
@@ -8,8 +8,6 @@ |
* be found in the AUTHORS file in the root of the source tree. |
*/ |
-#include <utility> // for std::pair |
- |
#include "webrtc/p2p/base/transport.h" |
#include "webrtc/p2p/base/candidate.h" |
@@ -23,6 +21,39 @@ |
namespace cricket { |
using rtc::Bind; |
+ |
+enum { |
+ MSG_ONSIGNALINGREADY = 1, |
+ MSG_ONREMOTECANDIDATE, |
+ MSG_WRITESTATE, |
+ MSG_REQUESTSIGNALING, |
+ MSG_CANDIDATEREADY, |
+ MSG_ROUTECHANGE, |
+ MSG_CONNECTING, |
+ MSG_CANDIDATEALLOCATIONCOMPLETE, |
+ MSG_ROLECONFLICT, |
+ MSG_COMPLETED, |
+ MSG_FAILED, |
+ MSG_RECEIVINGSTATE, |
+}; |
+ |
+struct ChannelParams : public rtc::MessageData { |
+ ChannelParams() : channel(NULL), candidate(NULL) {} |
+ explicit ChannelParams(int component) |
+ : component(component), channel(NULL), candidate(NULL) {} |
+ explicit ChannelParams(Candidate* candidate) |
+ : channel(NULL), candidate(candidate) { |
+ } |
+ |
+ ~ChannelParams() { |
+ delete candidate; |
+ } |
+ |
+ std::string name; |
+ int component; |
+ TransportChannelImpl* channel; |
+ Candidate* candidate; |
+}; |
static bool VerifyIceParams(const TransportDescription& desc) { |
// For legacy protocols. |
@@ -65,59 +96,58 @@ |
new_desc.ice_ufrag, new_desc.ice_pwd); |
} |
-Transport::Transport(const std::string& name, PortAllocator* allocator) |
- : name_(name), allocator_(allocator) {} |
+Transport::Transport(rtc::Thread* signaling_thread, |
+ rtc::Thread* worker_thread, |
+ const std::string& content_name, |
+ PortAllocator* allocator) |
+ : signaling_thread_(signaling_thread), |
+ worker_thread_(worker_thread), |
+ content_name_(content_name), |
+ allocator_(allocator), |
+ destroyed_(false), |
+ readable_(TRANSPORT_STATE_NONE), |
+ writable_(TRANSPORT_STATE_NONE), |
+ receiving_(TRANSPORT_STATE_NONE), |
+ was_writable_(false), |
+ connect_requested_(false), |
+ ice_role_(ICEROLE_UNKNOWN), |
+ tiebreaker_(0), |
+ remote_ice_mode_(ICEMODE_FULL), |
+ channel_receiving_timeout_(-1) { |
+} |
Transport::~Transport() { |
- ASSERT(channels_destroyed_); |
-} |
- |
-bool Transport::AllChannelsCompleted() const { |
- // We aren't completed until at least one channel is complete, so if there |
- // are no channels, we aren't complete yet. |
- if (channels_.empty()) { |
- LOG(LS_INFO) << name() << " transport is not complete" |
- << " because it has no TransportChannels"; |
- return false; |
- } |
- |
- // A Transport's ICE process is completed if all of its channels are writable, |
- // have finished allocating candidates, and have pruned all but one of their |
- // connections. |
- for (const auto& iter : channels_) { |
- const TransportChannelImpl* channel = iter.second.get(); |
- bool complete = |
- channel->writable() && |
- channel->GetState() == TransportChannelState::STATE_COMPLETED && |
- channel->GetIceRole() == ICEROLE_CONTROLLING && |
- channel->gathering_state() == kIceGatheringComplete; |
- if (!complete) { |
- LOG(LS_INFO) << name() << " transport is not complete" |
- << " because a channel is still incomplete."; |
- return false; |
- } |
- } |
- |
- return true; |
-} |
- |
-bool Transport::AnyChannelFailed() const { |
- for (const auto& iter : channels_) { |
- if (iter.second->GetState() == TransportChannelState::STATE_FAILED) { |
- return true; |
- } |
- } |
- return false; |
+ ASSERT(signaling_thread_->IsCurrent()); |
+ ASSERT(destroyed_); |
} |
void Transport::SetIceRole(IceRole role) { |
- ice_role_ = role; |
- for (auto& iter : channels_) { |
- iter.second->SetIceRole(ice_role_); |
- } |
+ worker_thread_->Invoke<void>(Bind(&Transport::SetIceRole_w, this, role)); |
+} |
+ |
+void Transport::SetCertificate( |
+ const rtc::scoped_refptr<rtc::RTCCertificate>& certificate) { |
+ worker_thread_->Invoke<void>(Bind(&Transport::SetCertificate_w, this, |
+ certificate)); |
+} |
+ |
+bool Transport::GetCertificate( |
+ rtc::scoped_refptr<rtc::RTCCertificate>* certificate) { |
+ // The identity is set on the worker thread, so for safety it must also be |
+ // acquired on the worker thread. |
+ return worker_thread_->Invoke<bool>( |
+ Bind(&Transport::GetCertificate_w, this, certificate)); |
} |
bool Transport::GetRemoteSSLCertificate(rtc::SSLCertificate** cert) { |
+ // Channels can be deleted on the worker thread, so for safety the remote |
+ // certificate is acquired on the worker thread. |
+ return worker_thread_->Invoke<bool>( |
+ Bind(&Transport::GetRemoteSSLCertificate_w, this, cert)); |
+} |
+ |
+bool Transport::GetRemoteSSLCertificate_w(rtc::SSLCertificate** cert) { |
+ ASSERT(worker_thread()->IsCurrent()); |
if (channels_.empty()) |
return false; |
@@ -126,6 +156,12 @@ |
} |
void Transport::SetChannelReceivingTimeout(int timeout_ms) { |
+ worker_thread_->Invoke<void>( |
+ Bind(&Transport::SetChannelReceivingTimeout_w, this, timeout_ms)); |
+} |
+ |
+void Transport::SetChannelReceivingTimeout_w(int timeout_ms) { |
+ ASSERT(worker_thread()->IsCurrent()); |
channel_receiving_timeout_ = timeout_ms; |
for (const auto& kv : channels_) { |
kv.second->SetReceivingTimeout(timeout_ms); |
@@ -136,73 +172,35 @@ |
const TransportDescription& description, |
ContentAction action, |
std::string* error_desc) { |
- bool ret = true; |
- |
- if (!VerifyIceParams(description)) { |
- return BadTransportDescription("Invalid ice-ufrag or ice-pwd length", |
- error_desc); |
- } |
- |
- if (local_description_ && |
- IceCredentialsChanged(*local_description_, description)) { |
- IceRole new_ice_role = |
- (action == CA_OFFER) ? ICEROLE_CONTROLLING : ICEROLE_CONTROLLED; |
- |
- // It must be called before ApplyLocalTransportDescription, which may |
- // trigger an ICE restart and depends on the new ICE role. |
- SetIceRole(new_ice_role); |
- } |
- |
- local_description_.reset(new TransportDescription(description)); |
- |
- for (auto& iter : channels_) { |
- ret &= ApplyLocalTransportDescription(iter.second.get(), error_desc); |
- } |
- if (!ret) { |
- return false; |
- } |
- |
- // If PRANSWER/ANSWER is set, we should decide transport protocol type. |
- if (action == CA_PRANSWER || action == CA_ANSWER) { |
- ret &= NegotiateTransportDescription(action, error_desc); |
- } |
- if (ret) { |
- local_description_set_ = true; |
- ConnectChannels(); |
- } |
- |
- return ret; |
+ return worker_thread_->Invoke<bool>(Bind( |
+ &Transport::SetLocalTransportDescription_w, this, |
+ description, action, error_desc)); |
} |
bool Transport::SetRemoteTransportDescription( |
const TransportDescription& description, |
ContentAction action, |
std::string* error_desc) { |
- bool ret = true; |
- |
- if (!VerifyIceParams(description)) { |
- return BadTransportDescription("Invalid ice-ufrag or ice-pwd length", |
- error_desc); |
- } |
- |
- remote_description_.reset(new TransportDescription(description)); |
- for (auto& iter : channels_) { |
- ret &= ApplyRemoteTransportDescription(iter.second.get(), error_desc); |
- } |
- |
- // If PRANSWER/ANSWER is set, we should decide transport protocol type. |
- if (action == CA_PRANSWER || action == CA_ANSWER) { |
- ret = NegotiateTransportDescription(CA_OFFER, error_desc); |
- } |
- if (ret) { |
- remote_description_set_ = true; |
- } |
- |
- return ret; |
+ return worker_thread_->Invoke<bool>(Bind( |
+ &Transport::SetRemoteTransportDescription_w, this, |
+ description, action, error_desc)); |
} |
TransportChannelImpl* Transport::CreateChannel(int component) { |
+ return worker_thread_->Invoke<TransportChannelImpl*>(Bind( |
+ &Transport::CreateChannel_w, this, component)); |
+} |
+ |
+TransportChannelImpl* Transport::CreateChannel_w(int component) { |
+ ASSERT(worker_thread()->IsCurrent()); |
TransportChannelImpl* impl; |
+ // TODO(tommi): We don't really need to grab the lock until the actual call |
+ // to insert() below and presumably hold it throughout initialization of |
+ // |impl| after the impl_exists check. Maybe we can factor that out to |
+ // a separate function and not grab the lock in this function. |
+ // Actually, we probably don't need to hold the lock while initializing |
+ // |impl| since we can just do the insert when that's done. |
+ rtc::CritScope cs(&crit_); |
// Create the entry if it does not exist. |
bool impl_exists = false; |
@@ -218,7 +216,7 @@ |
// Increase the ref count. |
iterator->second.AddRef(); |
- channels_destroyed_ = false; |
+ destroyed_ = false; |
if (impl_exists) { |
// If this is an existing channel, we should just return it without |
@@ -230,21 +228,23 @@ |
impl->SetIceRole(ice_role_); |
impl->SetIceTiebreaker(tiebreaker_); |
impl->SetReceivingTimeout(channel_receiving_timeout_); |
- // TODO(ronghuawu): Change CreateChannel to be able to return error since |
- // below Apply**Description calls can fail. |
+ // TODO(ronghuawu): Change CreateChannel_w to be able to return error since |
+ // below Apply**Description_w calls can fail. |
if (local_description_) |
- ApplyLocalTransportDescription(impl, NULL); |
+ ApplyLocalTransportDescription_w(impl, NULL); |
if (remote_description_) |
- ApplyRemoteTransportDescription(impl, NULL); |
+ ApplyRemoteTransportDescription_w(impl, NULL); |
if (local_description_ && remote_description_) |
- ApplyNegotiatedTransportDescription(impl, NULL); |
+ ApplyNegotiatedTransportDescription_w(impl, NULL); |
impl->SignalWritableState.connect(this, &Transport::OnChannelWritableState); |
impl->SignalReceivingState.connect(this, &Transport::OnChannelReceivingState); |
- impl->SignalGatheringState.connect(this, &Transport::OnChannelGatheringState); |
- impl->SignalCandidateGathered.connect(this, |
- &Transport::OnChannelCandidateGathered); |
+ impl->SignalRequestSignaling.connect( |
+ this, &Transport::OnChannelRequestSignaling); |
+ impl->SignalCandidateReady.connect(this, &Transport::OnChannelCandidateReady); |
impl->SignalRouteChange.connect(this, &Transport::OnChannelRouteChange); |
+ impl->SignalCandidatesAllocationDone.connect( |
+ this, &Transport::OnChannelCandidatesAllocationDone); |
impl->SignalRoleConflict.connect(this, &Transport::OnRoleConflict); |
impl->SignalConnectionRemoved.connect( |
this, &Transport::OnChannelConnectionRemoved); |
@@ -254,22 +254,36 @@ |
if (channels_.size() == 1) { |
// If this is the first channel, then indicate that we have started |
// connecting. |
- SignalConnecting(this); |
+ signaling_thread()->Post(this, MSG_CONNECTING, NULL); |
} |
} |
return impl; |
} |
TransportChannelImpl* Transport::GetChannel(int component) { |
+ // TODO(tommi,pthatcher): Since we're returning a pointer from the channels_ |
+ // map, shouldn't we assume that we're on the worker thread? (The pointer |
+ // will be used outside of the lock). |
+ // And if we're on the worker thread, which is the only thread that modifies |
+ // channels_, can we skip grabbing the lock? |
+ rtc::CritScope cs(&crit_); |
ChannelMap::iterator iter = channels_.find(component); |
return (iter != channels_.end()) ? iter->second.get() : NULL; |
} |
bool Transport::HasChannels() { |
+ rtc::CritScope cs(&crit_); |
return !channels_.empty(); |
} |
void Transport::DestroyChannel(int component) { |
+ worker_thread_->Invoke<void>(Bind( |
+ &Transport::DestroyChannel_w, this, component)); |
+} |
+ |
+void Transport::DestroyChannel_w(int component) { |
+ ASSERT(worker_thread()->IsCurrent()); |
+ |
ChannelMap::iterator iter = channels_.find(component); |
if (iter == channels_.end()) |
return; |
@@ -279,30 +293,34 @@ |
iter->second.DecRef(); |
if (!iter->second.ref()) { |
impl = iter->second.get(); |
+ rtc::CritScope cs(&crit_); |
channels_.erase(iter); |
} |
if (connect_requested_ && channels_.empty()) { |
// We're no longer attempting to connect. |
- SignalConnecting(this); |
+ signaling_thread()->Post(this, MSG_CONNECTING, NULL); |
} |
if (impl) { |
+ // Check in case the deleted channel was the only non-writable channel. |
+ OnChannelWritableState(impl); |
DestroyTransportChannel(impl); |
- // Need to update aggregate state after destroying a channel, |
- // for example if it was the only one that wasn't yet writable. |
- UpdateWritableState(); |
- UpdateReceivingState(); |
- UpdateGatheringState(); |
- MaybeSignalCompleted(); |
} |
} |
void Transport::ConnectChannels() { |
+ ASSERT(signaling_thread()->IsCurrent()); |
+ worker_thread_->Invoke<void>(Bind(&Transport::ConnectChannels_w, this)); |
+} |
+ |
+void Transport::ConnectChannels_w() { |
+ ASSERT(worker_thread()->IsCurrent()); |
if (connect_requested_ || channels_.empty()) |
return; |
connect_requested_ = true; |
+ signaling_thread()->Post(this, MSG_CANDIDATEREADY, NULL); |
if (!local_description_) { |
// TOOD(mallinath) : TransportDescription(TD) shouldn't be generated here. |
@@ -311,28 +329,38 @@ |
// Session. |
// Session must generate local TD before remote candidates pushed when |
// initiate request initiated by the remote. |
- LOG(LS_INFO) << "Transport::ConnectChannels: No local description has " |
+ LOG(LS_INFO) << "Transport::ConnectChannels_w: No local description has " |
<< "been set. Will generate one."; |
- TransportDescription desc( |
- std::vector<std::string>(), rtc::CreateRandomString(ICE_UFRAG_LENGTH), |
- rtc::CreateRandomString(ICE_PWD_LENGTH), ICEMODE_FULL, |
- CONNECTIONROLE_NONE, NULL, Candidates()); |
- SetLocalTransportDescription(desc, CA_OFFER, NULL); |
- } |
- |
- CallChannels(&TransportChannelImpl::Connect); |
- if (HasChannels()) { |
- SignalConnecting(this); |
- } |
-} |
- |
-void Transport::MaybeStartGathering() { |
- if (connect_requested_) { |
- CallChannels(&TransportChannelImpl::MaybeStartGathering); |
- } |
+ TransportDescription desc(std::vector<std::string>(), |
+ rtc::CreateRandomString(ICE_UFRAG_LENGTH), |
+ rtc::CreateRandomString(ICE_PWD_LENGTH), |
+ ICEMODE_FULL, CONNECTIONROLE_NONE, NULL, |
+ Candidates()); |
+ SetLocalTransportDescription_w(desc, CA_OFFER, NULL); |
+ } |
+ |
+ CallChannels_w(&TransportChannelImpl::Connect); |
+ if (!channels_.empty()) { |
+ signaling_thread()->Post(this, MSG_CONNECTING, NULL); |
+ } |
+} |
+ |
+void Transport::OnConnecting_s() { |
+ ASSERT(signaling_thread()->IsCurrent()); |
+ SignalConnecting(this); |
} |
void Transport::DestroyAllChannels() { |
+ ASSERT(signaling_thread()->IsCurrent()); |
+ worker_thread_->Invoke<void>(Bind(&Transport::DestroyAllChannels_w, this)); |
+ worker_thread()->Clear(this); |
+ signaling_thread()->Clear(this); |
+ destroyed_ = true; |
+} |
+ |
+void Transport::DestroyAllChannels_w() { |
+ ASSERT(worker_thread()->IsCurrent()); |
+ |
std::vector<TransportChannelImpl*> impls; |
for (auto& iter : channels_) { |
iter.second.DecRef(); |
@@ -340,15 +368,27 @@ |
impls.push_back(iter.second.get()); |
} |
- channels_.clear(); |
- |
- for (TransportChannelImpl* impl : impls) { |
- DestroyTransportChannel(impl); |
- } |
- channels_destroyed_ = true; |
-} |
- |
-void Transport::CallChannels(TransportChannelFunc func) { |
+ { |
+ rtc::CritScope cs(&crit_); |
+ channels_.clear(); |
+ } |
+ |
+ for (size_t i = 0; i < impls.size(); ++i) |
+ DestroyTransportChannel(impls[i]); |
+} |
+ |
+void Transport::OnSignalingReady() { |
+ ASSERT(signaling_thread()->IsCurrent()); |
+ if (destroyed_) return; |
+ |
+ worker_thread()->Post(this, MSG_ONSIGNALINGREADY, NULL); |
+ |
+ // Notify the subclass. |
+ OnTransportSignalingReady(); |
+} |
+ |
+void Transport::CallChannels_w(TransportChannelFunc func) { |
+ ASSERT(worker_thread()->IsCurrent()); |
for (const auto& iter : channels_) { |
((iter.second.get())->*func)(); |
} |
@@ -387,7 +427,14 @@ |
bool Transport::GetStats(TransportStats* stats) { |
- stats->transport_name = name(); |
+ ASSERT(signaling_thread()->IsCurrent()); |
+ return worker_thread_->Invoke<bool>(Bind( |
+ &Transport::GetStats_w, this, stats)); |
+} |
+ |
+bool Transport::GetStats_w(TransportStats* stats) { |
+ ASSERT(worker_thread()->IsCurrent()); |
+ stats->content_name = content_name(); |
stats->channel_stats.clear(); |
for (auto iter : channels_) { |
ChannelMapEntry& entry = iter.second; |
@@ -403,45 +450,82 @@ |
return true; |
} |
-bool Transport::AddRemoteCandidates(const std::vector<Candidate>& candidates, |
- std::string* error) { |
- ASSERT(!channels_destroyed_); |
- // Verify each candidate before passing down to transport layer. |
- for (const Candidate& cand : candidates) { |
- if (!VerifyCandidate(cand, error)) { |
- return false; |
- } |
- if (!HasChannel(cand.component())) { |
- *error = "Candidate has unknown component: " + cand.ToString() + |
- " for content: " + name(); |
- return false; |
- } |
- } |
- |
+bool Transport::GetSslRole(rtc::SSLRole* ssl_role) const { |
+ return worker_thread_->Invoke<bool>(Bind( |
+ &Transport::GetSslRole_w, this, ssl_role)); |
+} |
+ |
+bool Transport::SetSslMaxProtocolVersion(rtc::SSLProtocolVersion version) { |
+ return worker_thread_->Invoke<bool>(Bind( |
+ &Transport::SetSslMaxProtocolVersion_w, this, version)); |
+} |
+ |
+void Transport::OnRemoteCandidates(const std::vector<Candidate>& candidates) { |
for (std::vector<Candidate>::const_iterator iter = candidates.begin(); |
iter != candidates.end(); |
++iter) { |
- TransportChannelImpl* channel = GetChannel(iter->component()); |
- if (channel != NULL) { |
- channel->AddRemoteCandidate(*iter); |
- } |
- } |
- return true; |
+ OnRemoteCandidate(*iter); |
+ } |
+} |
+ |
+void Transport::OnRemoteCandidate(const Candidate& candidate) { |
+ ASSERT(signaling_thread()->IsCurrent()); |
+ if (destroyed_) return; |
+ |
+ if (!HasChannel(candidate.component())) { |
+ LOG(LS_WARNING) << "Ignoring candidate for unknown component " |
+ << candidate.component(); |
+ return; |
+ } |
+ |
+ ChannelParams* params = new ChannelParams(new Candidate(candidate)); |
+ worker_thread()->Post(this, MSG_ONREMOTECANDIDATE, params); |
+} |
+ |
+void Transport::OnRemoteCandidate_w(const Candidate& candidate) { |
+ ASSERT(worker_thread()->IsCurrent()); |
+ ChannelMap::iterator iter = channels_.find(candidate.component()); |
+ // It's ok for a channel to go away while this message is in transit. |
+ if (iter != channels_.end()) { |
+ iter->second->OnCandidate(candidate); |
+ } |
} |
void Transport::OnChannelWritableState(TransportChannel* channel) { |
- LOG(LS_INFO) << name() << " TransportChannel " << channel->component() |
- << " writability changed to " << channel->writable() |
- << ". Check if transport is complete."; |
- UpdateWritableState(); |
- MaybeSignalCompleted(); |
+ ASSERT(worker_thread()->IsCurrent()); |
+ signaling_thread()->Post(this, MSG_WRITESTATE, NULL); |
+ |
+ MaybeCompleted_w(); |
+} |
+ |
+void Transport::OnChannelWritableState_s() { |
+ ASSERT(signaling_thread()->IsCurrent()); |
+ TransportState writable = GetTransportState_s(TRANSPORT_WRITABLE_STATE); |
+ if (writable_ != writable) { |
+ was_writable_ = (writable_ == TRANSPORT_STATE_ALL); |
+ writable_ = writable; |
+ SignalWritableState(this); |
+ } |
} |
void Transport::OnChannelReceivingState(TransportChannel* channel) { |
- UpdateReceivingState(); |
-} |
- |
-TransportState Transport::GetTransportState(TransportStateType state_type) { |
+ ASSERT(worker_thread()->IsCurrent()); |
+ signaling_thread()->Post(this, MSG_RECEIVINGSTATE); |
+} |
+ |
+void Transport::OnChannelReceivingState_s() { |
+ ASSERT(signaling_thread()->IsCurrent()); |
+ TransportState receiving = GetTransportState_s(TRANSPORT_RECEIVING_STATE); |
+ if (receiving_ != receiving) { |
+ receiving_ = receiving; |
+ SignalReceivingState(this); |
+ } |
+} |
+ |
+TransportState Transport::GetTransportState_s(TransportStateType state_type) { |
+ ASSERT(signaling_thread()->IsCurrent()); |
+ |
+ rtc::CritScope cs(&crit_); |
bool any = false; |
bool all = !channels_.empty(); |
for (const auto iter : channels_) { |
@@ -469,44 +553,106 @@ |
return TRANSPORT_STATE_NONE; |
} |
-void Transport::OnChannelGatheringState(TransportChannelImpl* channel) { |
- ASSERT(channels_.find(channel->component()) != channels_.end()); |
- UpdateGatheringState(); |
- if (gathering_state_ == kIceGatheringComplete) { |
- // If UpdateGatheringState brought us to kIceGatheringComplete, check if |
- // our connection state is also "Completed". Otherwise, there's no point in |
- // checking (since it would only produce log messages). |
- MaybeSignalCompleted(); |
- } |
-} |
- |
-void Transport::OnChannelCandidateGathered(TransportChannelImpl* channel, |
- const Candidate& candidate) { |
+void Transport::OnChannelRequestSignaling(TransportChannelImpl* channel) { |
+ ASSERT(worker_thread()->IsCurrent()); |
+ // Resetting ICE state for the channel. |
+ ChannelMap::iterator iter = channels_.find(channel->component()); |
+ if (iter != channels_.end()) |
+ iter->second.set_candidates_allocated(false); |
+ signaling_thread()->Post(this, MSG_REQUESTSIGNALING, nullptr); |
+} |
+ |
+void Transport::OnChannelRequestSignaling_s() { |
+ ASSERT(signaling_thread()->IsCurrent()); |
+ LOG(LS_INFO) << "Transport: " << content_name_ << ", allocating candidates"; |
+ SignalRequestSignaling(this); |
+} |
+ |
+void Transport::OnChannelCandidateReady(TransportChannelImpl* channel, |
+ const Candidate& candidate) { |
// We should never signal peer-reflexive candidates. |
if (candidate.type() == PRFLX_PORT_TYPE) { |
ASSERT(false); |
return; |
} |
+ ASSERT(worker_thread()->IsCurrent()); |
+ rtc::CritScope cs(&crit_); |
+ ready_candidates_.push_back(candidate); |
+ |
+ // We hold any messages until the client lets us connect. |
+ if (connect_requested_) { |
+ signaling_thread()->Post( |
+ this, MSG_CANDIDATEREADY, NULL); |
+ } |
+} |
+ |
+void Transport::OnChannelCandidateReady_s() { |
+ ASSERT(signaling_thread()->IsCurrent()); |
ASSERT(connect_requested_); |
+ |
std::vector<Candidate> candidates; |
- candidates.push_back(candidate); |
- SignalCandidatesGathered(this, candidates); |
+ { |
+ rtc::CritScope cs(&crit_); |
+ candidates.swap(ready_candidates_); |
+ } |
+ |
+ // we do the deleting of Candidate* here to keep the new above and |
+ // delete below close to each other |
+ if (!candidates.empty()) { |
+ SignalCandidatesReady(this, candidates); |
+ } |
} |
void Transport::OnChannelRouteChange(TransportChannel* channel, |
const Candidate& remote_candidate) { |
+ ASSERT(worker_thread()->IsCurrent()); |
+ ChannelParams* params = new ChannelParams(new Candidate(remote_candidate)); |
+ params->channel = static_cast<cricket::TransportChannelImpl*>(channel); |
+ signaling_thread()->Post(this, MSG_ROUTECHANGE, params); |
+} |
+ |
+void Transport::OnChannelRouteChange_s(const TransportChannel* channel, |
+ const Candidate& remote_candidate) { |
+ ASSERT(signaling_thread()->IsCurrent()); |
SignalRouteChange(this, remote_candidate.component(), remote_candidate); |
} |
+void Transport::OnChannelCandidatesAllocationDone( |
+ TransportChannelImpl* channel) { |
+ ASSERT(worker_thread()->IsCurrent()); |
+ ChannelMap::iterator iter = channels_.find(channel->component()); |
+ ASSERT(iter != channels_.end()); |
+ LOG(LS_INFO) << "Transport: " << content_name_ << ", component " |
+ << channel->component() << " allocation complete"; |
+ |
+ iter->second.set_candidates_allocated(true); |
+ |
+ // If all channels belonging to this Transport got signal, then |
+ // forward this signal to upper layer. |
+ // Can this signal arrive before all transport channels are created? |
+ for (auto& iter : channels_) { |
+ if (!iter.second.candidates_allocated()) |
+ return; |
+ } |
+ signaling_thread_->Post(this, MSG_CANDIDATEALLOCATIONCOMPLETE); |
+ |
+ MaybeCompleted_w(); |
+} |
+ |
+void Transport::OnChannelCandidatesAllocationDone_s() { |
+ ASSERT(signaling_thread()->IsCurrent()); |
+ LOG(LS_INFO) << "Transport: " << content_name_ << " allocation complete"; |
+ SignalCandidatesAllocationDone(this); |
+} |
+ |
void Transport::OnRoleConflict(TransportChannelImpl* channel) { |
- SignalRoleConflict(); |
+ signaling_thread_->Post(this, MSG_ROLECONFLICT); |
} |
void Transport::OnChannelConnectionRemoved(TransportChannelImpl* channel) { |
- LOG(LS_INFO) << name() << " TransportChannel " << channel->component() |
- << " connection removed. Check if transport is complete."; |
- MaybeSignalCompleted(); |
+ ASSERT(worker_thread()->IsCurrent()); |
+ MaybeCompleted_w(); |
// Check if the state is now Failed. |
// Failed is only available in the Controlling ICE role. |
@@ -514,97 +660,158 @@ |
return; |
} |
- // Failed can only occur after candidate gathering has stopped. |
- if (channel->gathering_state() != kIceGatheringComplete) { |
+ ChannelMap::iterator iter = channels_.find(channel->component()); |
+ ASSERT(iter != channels_.end()); |
+ // Failed can only occur after candidate allocation has stopped. |
+ if (!iter->second.candidates_allocated()) { |
return; |
} |
if (channel->GetState() == TransportChannelState::STATE_FAILED) { |
// A Transport has failed if any of its channels have no remaining |
// connections. |
- SignalFailed(this); |
- } |
-} |
- |
-void Transport::MaybeSignalCompleted() { |
- if (AllChannelsCompleted()) { |
- LOG(LS_INFO) << name() << " transport is complete" |
- << " because all the channels are complete."; |
- SignalCompleted(this); |
- } |
- // TODO(deadbeef): Should we do anything if we previously were completed, |
- // but now are not (if, for example, a new remote candidate is added)? |
-} |
- |
-void Transport::UpdateGatheringState() { |
- IceGatheringState new_state = kIceGatheringNew; |
- bool any_gathering = false; |
- bool all_complete = !channels_.empty(); |
- for (const auto& kv : channels_) { |
- any_gathering = |
- any_gathering || kv.second->gathering_state() != kIceGatheringNew; |
- all_complete = |
- all_complete && kv.second->gathering_state() == kIceGatheringComplete; |
- } |
- if (all_complete) { |
- new_state = kIceGatheringComplete; |
- } else if (any_gathering) { |
- new_state = kIceGatheringGathering; |
- } |
- |
- if (gathering_state_ != new_state) { |
- gathering_state_ = new_state; |
- if (gathering_state_ == kIceGatheringGathering) { |
- LOG(LS_INFO) << "Transport: " << name_ << ", gathering candidates"; |
- } else if (gathering_state_ == kIceGatheringComplete) { |
- LOG(LS_INFO) << "Transport " << name() << " gathering complete."; |
+ signaling_thread_->Post(this, MSG_FAILED); |
+ } |
+} |
+ |
+void Transport::MaybeCompleted_w() { |
+ ASSERT(worker_thread()->IsCurrent()); |
+ |
+ // When there is no channel created yet, calling this function could fire an |
+ // IceConnectionCompleted event prematurely. |
+ if (channels_.empty()) { |
+ return; |
+ } |
+ |
+ // A Transport's ICE process is completed if all of its channels are writable, |
+ // have finished allocating candidates, and have pruned all but one of their |
+ // connections. |
+ for (const auto& iter : channels_) { |
+ const TransportChannelImpl* channel = iter.second.get(); |
+ if (!(channel->writable() && |
+ channel->GetState() == TransportChannelState::STATE_COMPLETED && |
+ channel->GetIceRole() == ICEROLE_CONTROLLING && |
+ iter.second.candidates_allocated())) { |
+ return; |
} |
- SignalGatheringState(this); |
- } |
-} |
- |
-void Transport::UpdateReceivingState() { |
- TransportState receiving = GetTransportState(TRANSPORT_RECEIVING_STATE); |
- if (receiving_ != receiving) { |
- receiving_ = receiving; |
- SignalReceivingState(this); |
- } |
-} |
- |
-void Transport::UpdateWritableState() { |
- TransportState writable = GetTransportState(TRANSPORT_WRITABLE_STATE); |
- LOG(LS_INFO) << name() << " transport writable state changed? " << writable_ |
- << " => " << writable; |
- if (writable_ != writable) { |
- was_writable_ = (writable_ == TRANSPORT_STATE_ALL); |
- writable_ = writable; |
- SignalWritableState(this); |
- } |
-} |
- |
-bool Transport::ApplyLocalTransportDescription(TransportChannelImpl* ch, |
- std::string* error_desc) { |
+ } |
+ |
+ signaling_thread_->Post(this, MSG_COMPLETED); |
+} |
+ |
+void Transport::SetIceRole_w(IceRole role) { |
+ ASSERT(worker_thread()->IsCurrent()); |
+ rtc::CritScope cs(&crit_); |
+ ice_role_ = role; |
+ for (auto& iter : channels_) { |
+ iter.second->SetIceRole(ice_role_); |
+ } |
+} |
+ |
+void Transport::SetRemoteIceMode_w(IceMode mode) { |
+ ASSERT(worker_thread()->IsCurrent()); |
+ remote_ice_mode_ = mode; |
+ // Shouldn't channels be created after this method executed? |
+ for (auto& iter : channels_) { |
+ iter.second->SetRemoteIceMode(remote_ice_mode_); |
+ } |
+} |
+ |
+bool Transport::SetLocalTransportDescription_w( |
+ const TransportDescription& desc, |
+ ContentAction action, |
+ std::string* error_desc) { |
+ ASSERT(worker_thread()->IsCurrent()); |
+ bool ret = true; |
+ |
+ if (!VerifyIceParams(desc)) { |
+ return BadTransportDescription("Invalid ice-ufrag or ice-pwd length", |
+ error_desc); |
+ } |
+ |
+ // TODO(tommi,pthatcher): I'm not sure why we need to grab this lock at this |
+ // point. |local_description_| seems to always be modified on the worker |
+ // thread, so we should be able to use it here without grabbing the lock. |
+ // However, we _might_ need it before the call to reset() below? |
+ // Raw access to |local_description_| is granted to derived transports outside |
+ // of locking (see local_description() in the header file). |
+ // The contract is that the derived implementations must be aware of when the |
+ // description might change and do appropriate synchronization. |
+ rtc::CritScope cs(&crit_); |
+ if (local_description_ && IceCredentialsChanged(*local_description_, desc)) { |
+ IceRole new_ice_role = (action == CA_OFFER) ? ICEROLE_CONTROLLING |
+ : ICEROLE_CONTROLLED; |
+ |
+ // It must be called before ApplyLocalTransportDescription_w, which may |
+ // trigger an ICE restart and depends on the new ICE role. |
+ SetIceRole_w(new_ice_role); |
+ } |
+ |
+ local_description_.reset(new TransportDescription(desc)); |
+ |
+ for (auto& iter : channels_) { |
+ ret &= ApplyLocalTransportDescription_w(iter.second.get(), error_desc); |
+ } |
+ if (!ret) |
+ return false; |
+ |
+ // If PRANSWER/ANSWER is set, we should decide transport protocol type. |
+ if (action == CA_PRANSWER || action == CA_ANSWER) { |
+ ret &= NegotiateTransportDescription_w(action, error_desc); |
+ } |
+ return ret; |
+} |
+ |
+bool Transport::SetRemoteTransportDescription_w( |
+ const TransportDescription& desc, |
+ ContentAction action, |
+ std::string* error_desc) { |
+ bool ret = true; |
+ |
+ if (!VerifyIceParams(desc)) { |
+ return BadTransportDescription("Invalid ice-ufrag or ice-pwd length", |
+ error_desc); |
+ } |
+ |
+ // TODO(tommi,pthatcher): See todo for local_description_ above. |
+ rtc::CritScope cs(&crit_); |
+ remote_description_.reset(new TransportDescription(desc)); |
+ for (auto& iter : channels_) { |
+ ret &= ApplyRemoteTransportDescription_w(iter.second.get(), error_desc); |
+ } |
+ |
+ // If PRANSWER/ANSWER is set, we should decide transport protocol type. |
+ if (action == CA_PRANSWER || action == CA_ANSWER) { |
+ ret = NegotiateTransportDescription_w(CA_OFFER, error_desc); |
+ } |
+ return ret; |
+} |
+ |
+bool Transport::ApplyLocalTransportDescription_w(TransportChannelImpl* ch, |
+ std::string* error_desc) { |
+ ASSERT(worker_thread()->IsCurrent()); |
ch->SetIceCredentials(local_description_->ice_ufrag, |
local_description_->ice_pwd); |
return true; |
} |
-bool Transport::ApplyRemoteTransportDescription(TransportChannelImpl* ch, |
- std::string* error_desc) { |
+bool Transport::ApplyRemoteTransportDescription_w(TransportChannelImpl* ch, |
+ std::string* error_desc) { |
ch->SetRemoteIceCredentials(remote_description_->ice_ufrag, |
remote_description_->ice_pwd); |
return true; |
} |
-bool Transport::ApplyNegotiatedTransportDescription( |
- TransportChannelImpl* channel, |
- std::string* error_desc) { |
+bool Transport::ApplyNegotiatedTransportDescription_w( |
+ TransportChannelImpl* channel, std::string* error_desc) { |
+ ASSERT(worker_thread()->IsCurrent()); |
channel->SetRemoteIceMode(remote_ice_mode_); |
return true; |
} |
-bool Transport::NegotiateTransportDescription(ContentAction local_role, |
- std::string* error_desc) { |
+bool Transport::NegotiateTransportDescription_w(ContentAction local_role, |
+ std::string* error_desc) { |
+ ASSERT(worker_thread()->IsCurrent()); |
// TODO(ekr@rtfm.com): This is ICE-specific stuff. Refactor into |
// P2PTransport. |
@@ -612,7 +819,7 @@ |
// ice_lite, this local end point should take CONTROLLING role. |
if (ice_role_ == ICEROLE_CONTROLLED && |
remote_description_->ice_mode == ICEMODE_LITE) { |
- SetIceRole(ICEROLE_CONTROLLING); |
+ SetIceRole_w(ICEROLE_CONTROLLING); |
} |
// Update remote ice_mode to all existing channels. |
@@ -624,10 +831,57 @@ |
// creation, we have the negotiation state saved until a new |
// negotiation happens. |
for (auto& iter : channels_) { |
- if (!ApplyNegotiatedTransportDescription(iter.second.get(), error_desc)) |
+ if (!ApplyNegotiatedTransportDescription_w(iter.second.get(), error_desc)) |
return false; |
} |
return true; |
} |
+void Transport::OnMessage(rtc::Message* msg) { |
+ switch (msg->message_id) { |
+ case MSG_ONSIGNALINGREADY: |
+ CallChannels_w(&TransportChannelImpl::OnSignalingReady); |
+ break; |
+ case MSG_ONREMOTECANDIDATE: { |
+ ChannelParams* params = static_cast<ChannelParams*>(msg->pdata); |
+ OnRemoteCandidate_w(*params->candidate); |
+ delete params; |
+ } |
+ break; |
+ case MSG_CONNECTING: |
+ OnConnecting_s(); |
+ break; |
+ case MSG_WRITESTATE: |
+ OnChannelWritableState_s(); |
+ break; |
+ case MSG_RECEIVINGSTATE: |
+ OnChannelReceivingState_s(); |
+ break; |
+ case MSG_REQUESTSIGNALING: |
+ OnChannelRequestSignaling_s(); |
+ break; |
+ case MSG_CANDIDATEREADY: |
+ OnChannelCandidateReady_s(); |
+ break; |
+ case MSG_ROUTECHANGE: { |
+ ChannelParams* params = static_cast<ChannelParams*>(msg->pdata); |
+ OnChannelRouteChange_s(params->channel, *params->candidate); |
+ delete params; |
+ } |
+ break; |
+ case MSG_CANDIDATEALLOCATIONCOMPLETE: |
+ OnChannelCandidatesAllocationDone_s(); |
+ break; |
+ case MSG_ROLECONFLICT: |
+ SignalRoleConflict(); |
+ break; |
+ case MSG_COMPLETED: |
+ SignalCompleted(this); |
+ break; |
+ case MSG_FAILED: |
+ SignalFailed(this); |
+ break; |
+ } |
+} |
+ |
} // namespace cricket |