| Index: webrtc/p2p/base/transport.cc
|
| diff --git a/webrtc/p2p/base/transport.cc b/webrtc/p2p/base/transport.cc
|
| index d626ad3d65acf284a6022428cbfb43615a6d07e3..c96ddc724532eb90cadff2990f9423f6c5846ced 100644
|
| --- a/webrtc/p2p/base/transport.cc
|
| +++ b/webrtc/p2p/base/transport.cc
|
| @@ -8,6 +8,8 @@
|
| * be found in the AUTHORS file in the root of the source tree.
|
| */
|
|
|
| +#include <utility> // for std::pair
|
| +
|
| #include "webrtc/p2p/base/transport.h"
|
|
|
| #include "webrtc/p2p/base/candidate.h"
|
| @@ -22,39 +24,6 @@ namespace cricket {
|
|
|
| using rtc::Bind;
|
|
|
| -enum {
|
| - MSG_ONSIGNALINGREADY = 1,
|
| - MSG_ONREMOTECANDIDATE,
|
| - MSG_WRITESTATE,
|
| - MSG_REQUESTSIGNALING,
|
| - MSG_CANDIDATEREADY,
|
| - MSG_ROUTECHANGE,
|
| - MSG_CONNECTING,
|
| - MSG_CANDIDATEALLOCATIONCOMPLETE,
|
| - MSG_ROLECONFLICT,
|
| - MSG_COMPLETED,
|
| - MSG_FAILED,
|
| - MSG_RECEIVINGSTATE,
|
| -};
|
| -
|
| -struct ChannelParams : public rtc::MessageData {
|
| - ChannelParams() : channel(NULL), candidate(NULL) {}
|
| - explicit ChannelParams(int component)
|
| - : component(component), channel(NULL), candidate(NULL) {}
|
| - explicit ChannelParams(Candidate* candidate)
|
| - : channel(NULL), candidate(candidate) {
|
| - }
|
| -
|
| - ~ChannelParams() {
|
| - delete candidate;
|
| - }
|
| -
|
| - std::string name;
|
| - int component;
|
| - TransportChannelImpl* channel;
|
| - Candidate* candidate;
|
| -};
|
| -
|
| static bool VerifyIceParams(const TransportDescription& desc) {
|
| // For legacy protocols.
|
| if (desc.ice_ufrag.empty() && desc.ice_pwd.empty())
|
| @@ -96,58 +65,59 @@ static bool IceCredentialsChanged(const TransportDescription& old_desc,
|
| new_desc.ice_ufrag, new_desc.ice_pwd);
|
| }
|
|
|
| -Transport::Transport(rtc::Thread* signaling_thread,
|
| - rtc::Thread* worker_thread,
|
| - const std::string& content_name,
|
| - PortAllocator* allocator)
|
| - : signaling_thread_(signaling_thread),
|
| - worker_thread_(worker_thread),
|
| - content_name_(content_name),
|
| - allocator_(allocator),
|
| - destroyed_(false),
|
| - readable_(TRANSPORT_STATE_NONE),
|
| - writable_(TRANSPORT_STATE_NONE),
|
| - receiving_(TRANSPORT_STATE_NONE),
|
| - was_writable_(false),
|
| - connect_requested_(false),
|
| - ice_role_(ICEROLE_UNKNOWN),
|
| - tiebreaker_(0),
|
| - remote_ice_mode_(ICEMODE_FULL),
|
| - channel_receiving_timeout_(-1) {
|
| -}
|
| +Transport::Transport(const std::string& name, PortAllocator* allocator)
|
| + : name_(name), allocator_(allocator) {}
|
|
|
| Transport::~Transport() {
|
| - ASSERT(signaling_thread_->IsCurrent());
|
| - ASSERT(destroyed_);
|
| + ASSERT(channels_destroyed_);
|
| }
|
|
|
| -void Transport::SetIceRole(IceRole role) {
|
| - worker_thread_->Invoke<void>(Bind(&Transport::SetIceRole_w, this, role));
|
| -}
|
| +bool Transport::AllChannelsCompleted() const {
|
| + // We aren't completed until at least one channel is complete, so if there
|
| + // are no channels, we aren't complete yet.
|
| + if (channels_.empty()) {
|
| + LOG(LS_INFO) << name() << " transport is not complete"
|
| + << " because it has no TransportChannels";
|
| + return false;
|
| + }
|
|
|
| -void Transport::SetCertificate(
|
| - const rtc::scoped_refptr<rtc::RTCCertificate>& certificate) {
|
| - worker_thread_->Invoke<void>(Bind(&Transport::SetCertificate_w, this,
|
| - certificate));
|
| + // A Transport's ICE process is completed if all of its channels are writable,
|
| + // have finished allocating candidates, and have pruned all but one of their
|
| + // connections.
|
| + for (const auto& iter : channels_) {
|
| + const TransportChannelImpl* channel = iter.second.get();
|
| + bool complete =
|
| + channel->writable() &&
|
| + channel->GetState() == TransportChannelState::STATE_COMPLETED &&
|
| + channel->GetIceRole() == ICEROLE_CONTROLLING &&
|
| + channel->gathering_state() == kIceGatheringComplete;
|
| + if (!complete) {
|
| + LOG(LS_INFO) << name() << " transport is not complete"
|
| + << " because a channel is still incomplete.";
|
| + return false;
|
| + }
|
| + }
|
| +
|
| + return true;
|
| }
|
|
|
| -bool Transport::GetCertificate(
|
| - rtc::scoped_refptr<rtc::RTCCertificate>* certificate) {
|
| - // The identity is set on the worker thread, so for safety it must also be
|
| - // acquired on the worker thread.
|
| - return worker_thread_->Invoke<bool>(
|
| - Bind(&Transport::GetCertificate_w, this, certificate));
|
| +bool Transport::AnyChannelFailed() const {
|
| + for (const auto& iter : channels_) {
|
| + if (iter.second->GetState() == TransportChannelState::STATE_FAILED) {
|
| + return true;
|
| + }
|
| + }
|
| + return false;
|
| }
|
|
|
| -bool Transport::GetRemoteSSLCertificate(rtc::SSLCertificate** cert) {
|
| - // Channels can be deleted on the worker thread, so for safety the remote
|
| - // certificate is acquired on the worker thread.
|
| - return worker_thread_->Invoke<bool>(
|
| - Bind(&Transport::GetRemoteSSLCertificate_w, this, cert));
|
| +void Transport::SetIceRole(IceRole role) {
|
| + ice_role_ = role;
|
| + for (auto& iter : channels_) {
|
| + iter.second->SetIceRole(ice_role_);
|
| + }
|
| }
|
|
|
| -bool Transport::GetRemoteSSLCertificate_w(rtc::SSLCertificate** cert) {
|
| - ASSERT(worker_thread()->IsCurrent());
|
| +bool Transport::GetRemoteSSLCertificate(rtc::SSLCertificate** cert) {
|
| if (channels_.empty())
|
| return false;
|
|
|
| @@ -156,12 +126,6 @@ bool Transport::GetRemoteSSLCertificate_w(rtc::SSLCertificate** cert) {
|
| }
|
|
|
| void Transport::SetChannelReceivingTimeout(int timeout_ms) {
|
| - worker_thread_->Invoke<void>(
|
| - Bind(&Transport::SetChannelReceivingTimeout_w, this, timeout_ms));
|
| -}
|
| -
|
| -void Transport::SetChannelReceivingTimeout_w(int timeout_ms) {
|
| - ASSERT(worker_thread()->IsCurrent());
|
| channel_receiving_timeout_ = timeout_ms;
|
| for (const auto& kv : channels_) {
|
| kv.second->SetReceivingTimeout(timeout_ms);
|
| @@ -172,35 +136,74 @@ bool Transport::SetLocalTransportDescription(
|
| const TransportDescription& description,
|
| ContentAction action,
|
| std::string* error_desc) {
|
| - return worker_thread_->Invoke<bool>(Bind(
|
| - &Transport::SetLocalTransportDescription_w, this,
|
| - description, action, error_desc));
|
| + bool ret = true;
|
| +
|
| + if (!VerifyIceParams(description)) {
|
| + return BadTransportDescription("Invalid ice-ufrag or ice-pwd length",
|
| + error_desc);
|
| + }
|
| +
|
| + if (local_description_ &&
|
| + IceCredentialsChanged(*local_description_, description)) {
|
| + IceRole new_ice_role =
|
| + (action == CA_OFFER) ? ICEROLE_CONTROLLING : ICEROLE_CONTROLLED;
|
| +
|
| + // It must be called before ApplyLocalTransportDescription, which may
|
| + // trigger an ICE restart and depends on the new ICE role.
|
| + SetIceRole(new_ice_role);
|
| + }
|
| +
|
| + local_description_.reset(new TransportDescription(description));
|
| +
|
| + for (auto& iter : channels_) {
|
| + ret &= ApplyLocalTransportDescription(iter.second.get(), error_desc);
|
| + }
|
| + if (!ret) {
|
| + return false;
|
| + }
|
| +
|
| + // If PRANSWER/ANSWER is set, we should decide transport protocol type.
|
| + if (action == CA_PRANSWER || action == CA_ANSWER) {
|
| + ret &= NegotiateTransportDescription(action, error_desc);
|
| + }
|
| + if (ret) {
|
| + local_description_set_ = true;
|
| + // This kicks off candidate gathering.
|
| + ConnectChannels();
|
| + }
|
| +
|
| + return ret;
|
| }
|
|
|
| bool Transport::SetRemoteTransportDescription(
|
| const TransportDescription& description,
|
| ContentAction action,
|
| std::string* error_desc) {
|
| - return worker_thread_->Invoke<bool>(Bind(
|
| - &Transport::SetRemoteTransportDescription_w, this,
|
| - description, action, error_desc));
|
| -}
|
| + bool ret = true;
|
|
|
| -TransportChannelImpl* Transport::CreateChannel(int component) {
|
| - return worker_thread_->Invoke<TransportChannelImpl*>(Bind(
|
| - &Transport::CreateChannel_w, this, component));
|
| + if (!VerifyIceParams(description)) {
|
| + return BadTransportDescription("Invalid ice-ufrag or ice-pwd length",
|
| + error_desc);
|
| + }
|
| +
|
| + remote_description_.reset(new TransportDescription(description));
|
| + for (auto& iter : channels_) {
|
| + ret &= ApplyRemoteTransportDescription(iter.second.get(), error_desc);
|
| + }
|
| +
|
| + // If PRANSWER/ANSWER is set, we should decide transport protocol type.
|
| + if (action == CA_PRANSWER || action == CA_ANSWER) {
|
| + ret = NegotiateTransportDescription(CA_OFFER, error_desc);
|
| + }
|
| + if (ret) {
|
| + remote_description_set_ = true;
|
| + }
|
| +
|
| + return ret;
|
| }
|
|
|
| -TransportChannelImpl* Transport::CreateChannel_w(int component) {
|
| - ASSERT(worker_thread()->IsCurrent());
|
| +TransportChannelImpl* Transport::CreateChannel(int component) {
|
| TransportChannelImpl* impl;
|
| - // TODO(tommi): We don't really need to grab the lock until the actual call
|
| - // to insert() below and presumably hold it throughout initialization of
|
| - // |impl| after the impl_exists check. Maybe we can factor that out to
|
| - // a separate function and not grab the lock in this function.
|
| - // Actually, we probably don't need to hold the lock while initializing
|
| - // |impl| since we can just do the insert when that's done.
|
| - rtc::CritScope cs(&crit_);
|
|
|
| // Create the entry if it does not exist.
|
| bool impl_exists = false;
|
| @@ -216,7 +219,7 @@ TransportChannelImpl* Transport::CreateChannel_w(int component) {
|
|
|
| // Increase the ref count.
|
| iterator->second.AddRef();
|
| - destroyed_ = false;
|
| + channels_destroyed_ = false;
|
|
|
| if (impl_exists) {
|
| // If this is an existing channel, we should just return it without
|
| @@ -228,23 +231,21 @@ TransportChannelImpl* Transport::CreateChannel_w(int component) {
|
| impl->SetIceRole(ice_role_);
|
| impl->SetIceTiebreaker(tiebreaker_);
|
| impl->SetReceivingTimeout(channel_receiving_timeout_);
|
| - // TODO(ronghuawu): Change CreateChannel_w to be able to return error since
|
| - // below Apply**Description_w calls can fail.
|
| + // TODO(ronghuawu): Change CreateChannel to be able to return error since
|
| + // below Apply**Description calls can fail.
|
| if (local_description_)
|
| - ApplyLocalTransportDescription_w(impl, NULL);
|
| + ApplyLocalTransportDescription(impl, NULL);
|
| if (remote_description_)
|
| - ApplyRemoteTransportDescription_w(impl, NULL);
|
| + ApplyRemoteTransportDescription(impl, NULL);
|
| if (local_description_ && remote_description_)
|
| - ApplyNegotiatedTransportDescription_w(impl, NULL);
|
| + ApplyNegotiatedTransportDescription(impl, NULL);
|
|
|
| impl->SignalWritableState.connect(this, &Transport::OnChannelWritableState);
|
| impl->SignalReceivingState.connect(this, &Transport::OnChannelReceivingState);
|
| - impl->SignalRequestSignaling.connect(
|
| - this, &Transport::OnChannelRequestSignaling);
|
| - impl->SignalCandidateReady.connect(this, &Transport::OnChannelCandidateReady);
|
| + impl->SignalGatheringState.connect(this, &Transport::OnChannelGatheringState);
|
| + impl->SignalCandidateGathered.connect(this,
|
| + &Transport::OnChannelCandidateGathered);
|
| impl->SignalRouteChange.connect(this, &Transport::OnChannelRouteChange);
|
| - impl->SignalCandidatesAllocationDone.connect(
|
| - this, &Transport::OnChannelCandidatesAllocationDone);
|
| impl->SignalRoleConflict.connect(this, &Transport::OnRoleConflict);
|
| impl->SignalConnectionRemoved.connect(
|
| this, &Transport::OnChannelConnectionRemoved);
|
| @@ -254,36 +255,22 @@ TransportChannelImpl* Transport::CreateChannel_w(int component) {
|
| if (channels_.size() == 1) {
|
| // If this is the first channel, then indicate that we have started
|
| // connecting.
|
| - signaling_thread()->Post(this, MSG_CONNECTING, NULL);
|
| + SignalConnecting(this);
|
| }
|
| }
|
| return impl;
|
| }
|
|
|
| TransportChannelImpl* Transport::GetChannel(int component) {
|
| - // TODO(tommi,pthatcher): Since we're returning a pointer from the channels_
|
| - // map, shouldn't we assume that we're on the worker thread? (The pointer
|
| - // will be used outside of the lock).
|
| - // And if we're on the worker thread, which is the only thread that modifies
|
| - // channels_, can we skip grabbing the lock?
|
| - rtc::CritScope cs(&crit_);
|
| ChannelMap::iterator iter = channels_.find(component);
|
| return (iter != channels_.end()) ? iter->second.get() : NULL;
|
| }
|
|
|
| bool Transport::HasChannels() {
|
| - rtc::CritScope cs(&crit_);
|
| return !channels_.empty();
|
| }
|
|
|
| void Transport::DestroyChannel(int component) {
|
| - worker_thread_->Invoke<void>(Bind(
|
| - &Transport::DestroyChannel_w, this, component));
|
| -}
|
| -
|
| -void Transport::DestroyChannel_w(int component) {
|
| - ASSERT(worker_thread()->IsCurrent());
|
| -
|
| ChannelMap::iterator iter = channels_.find(component);
|
| if (iter == channels_.end())
|
| return;
|
| @@ -293,34 +280,35 @@ void Transport::DestroyChannel_w(int component) {
|
| iter->second.DecRef();
|
| if (!iter->second.ref()) {
|
| impl = iter->second.get();
|
| - rtc::CritScope cs(&crit_);
|
| channels_.erase(iter);
|
| }
|
|
|
| if (connect_requested_ && channels_.empty()) {
|
| // We're no longer attempting to connect.
|
| - signaling_thread()->Post(this, MSG_CONNECTING, NULL);
|
| + SignalConnecting(this);
|
| }
|
|
|
| if (impl) {
|
| - // Check in case the deleted channel was the only non-writable channel.
|
| - OnChannelWritableState(impl);
|
| DestroyTransportChannel(impl);
|
| + // Need to update aggregate state after destroying a channel,
|
| + // for example if it was the only one that wasn't yet writable.
|
| + UpdateWritableState();
|
| + UpdateReceivingState();
|
| + UpdateGatheringState();
|
| + MaybeSignalCompleted();
|
| }
|
| }
|
|
|
| void Transport::ConnectChannels() {
|
| - ASSERT(signaling_thread()->IsCurrent());
|
| - worker_thread_->Invoke<void>(Bind(&Transport::ConnectChannels_w, this));
|
| -}
|
| -
|
| -void Transport::ConnectChannels_w() {
|
| - ASSERT(worker_thread()->IsCurrent());
|
| if (connect_requested_ || channels_.empty())
|
| return;
|
|
|
| connect_requested_ = true;
|
| - signaling_thread()->Post(this, MSG_CANDIDATEREADY, NULL);
|
| +
|
| + if (!ready_candidates_.empty()) {
|
| + SignalCandidatesGathered(this, ready_candidates_);
|
| + ready_candidates_.clear();
|
| + }
|
|
|
| if (!local_description_) {
|
| // TOOD(mallinath) : TransportDescription(TD) shouldn't be generated here.
|
| @@ -329,38 +317,22 @@ void Transport::ConnectChannels_w() {
|
| // Session.
|
| // Session must generate local TD before remote candidates pushed when
|
| // initiate request initiated by the remote.
|
| - LOG(LS_INFO) << "Transport::ConnectChannels_w: No local description has "
|
| + LOG(LS_INFO) << "Transport::ConnectChannels: No local description has "
|
| << "been set. Will generate one.";
|
| - TransportDescription desc(std::vector<std::string>(),
|
| - rtc::CreateRandomString(ICE_UFRAG_LENGTH),
|
| - rtc::CreateRandomString(ICE_PWD_LENGTH),
|
| - ICEMODE_FULL, CONNECTIONROLE_NONE, NULL,
|
| - Candidates());
|
| - SetLocalTransportDescription_w(desc, CA_OFFER, NULL);
|
| + TransportDescription desc(
|
| + std::vector<std::string>(), rtc::CreateRandomString(ICE_UFRAG_LENGTH),
|
| + rtc::CreateRandomString(ICE_PWD_LENGTH), ICEMODE_FULL,
|
| + CONNECTIONROLE_NONE, NULL, Candidates());
|
| + SetLocalTransportDescription(desc, CA_OFFER, NULL);
|
| }
|
|
|
| - CallChannels_w(&TransportChannelImpl::Connect);
|
| - if (!channels_.empty()) {
|
| - signaling_thread()->Post(this, MSG_CONNECTING, NULL);
|
| + CallChannels(&TransportChannelImpl::Connect);
|
| + if (HasChannels()) {
|
| + SignalConnecting(this);
|
| }
|
| }
|
|
|
| -void Transport::OnConnecting_s() {
|
| - ASSERT(signaling_thread()->IsCurrent());
|
| - SignalConnecting(this);
|
| -}
|
| -
|
| void Transport::DestroyAllChannels() {
|
| - ASSERT(signaling_thread()->IsCurrent());
|
| - worker_thread_->Invoke<void>(Bind(&Transport::DestroyAllChannels_w, this));
|
| - worker_thread()->Clear(this);
|
| - signaling_thread()->Clear(this);
|
| - destroyed_ = true;
|
| -}
|
| -
|
| -void Transport::DestroyAllChannels_w() {
|
| - ASSERT(worker_thread()->IsCurrent());
|
| -
|
| std::vector<TransportChannelImpl*> impls;
|
| for (auto& iter : channels_) {
|
| iter.second.DecRef();
|
| @@ -368,27 +340,15 @@ void Transport::DestroyAllChannels_w() {
|
| impls.push_back(iter.second.get());
|
| }
|
|
|
| - {
|
| - rtc::CritScope cs(&crit_);
|
| - channels_.clear();
|
| - }
|
| -
|
| - for (size_t i = 0; i < impls.size(); ++i)
|
| - DestroyTransportChannel(impls[i]);
|
| -}
|
| + channels_.clear();
|
|
|
| -void Transport::OnSignalingReady() {
|
| - ASSERT(signaling_thread()->IsCurrent());
|
| - if (destroyed_) return;
|
| -
|
| - worker_thread()->Post(this, MSG_ONSIGNALINGREADY, NULL);
|
| -
|
| - // Notify the subclass.
|
| - OnTransportSignalingReady();
|
| + for (TransportChannelImpl* impl : impls) {
|
| + DestroyTransportChannel(impl);
|
| + }
|
| + channels_destroyed_ = true;
|
| }
|
|
|
| -void Transport::CallChannels_w(TransportChannelFunc func) {
|
| - ASSERT(worker_thread()->IsCurrent());
|
| +void Transport::CallChannels(TransportChannelFunc func) {
|
| for (const auto& iter : channels_) {
|
| ((iter.second.get())->*func)();
|
| }
|
| @@ -427,14 +387,7 @@ bool Transport::VerifyCandidate(const Candidate& cand, std::string* error) {
|
|
|
|
|
| bool Transport::GetStats(TransportStats* stats) {
|
| - ASSERT(signaling_thread()->IsCurrent());
|
| - return worker_thread_->Invoke<bool>(Bind(
|
| - &Transport::GetStats_w, this, stats));
|
| -}
|
| -
|
| -bool Transport::GetStats_w(TransportStats* stats) {
|
| - ASSERT(worker_thread()->IsCurrent());
|
| - stats->content_name = content_name();
|
| + stats->transport_name = name();
|
| stats->channel_stats.clear();
|
| for (auto iter : channels_) {
|
| ChannelMapEntry& entry = iter.second;
|
| @@ -450,82 +403,45 @@ bool Transport::GetStats_w(TransportStats* stats) {
|
| return true;
|
| }
|
|
|
| -bool Transport::GetSslRole(rtc::SSLRole* ssl_role) const {
|
| - return worker_thread_->Invoke<bool>(Bind(
|
| - &Transport::GetSslRole_w, this, ssl_role));
|
| -}
|
| -
|
| -bool Transport::SetSslMaxProtocolVersion(rtc::SSLProtocolVersion version) {
|
| - return worker_thread_->Invoke<bool>(Bind(
|
| - &Transport::SetSslMaxProtocolVersion_w, this, version));
|
| -}
|
| +bool Transport::AddRemoteCandidates(const std::vector<Candidate>& candidates,
|
| + std::string* error) {
|
| + ASSERT(!channels_destroyed_);
|
| + // Verify each candidate before passing down to transport layer.
|
| + for (const Candidate& cand : candidates) {
|
| + if (!VerifyCandidate(cand, error)) {
|
| + return false;
|
| + }
|
| + if (!HasChannel(cand.component())) {
|
| + *error = "Candidate has unknown component: " + cand.ToString() +
|
| + " for content: " + name();
|
| + return false;
|
| + }
|
| + }
|
|
|
| -void Transport::OnRemoteCandidates(const std::vector<Candidate>& candidates) {
|
| for (std::vector<Candidate>::const_iterator iter = candidates.begin();
|
| iter != candidates.end();
|
| ++iter) {
|
| - OnRemoteCandidate(*iter);
|
| - }
|
| -}
|
| -
|
| -void Transport::OnRemoteCandidate(const Candidate& candidate) {
|
| - ASSERT(signaling_thread()->IsCurrent());
|
| - if (destroyed_) return;
|
| -
|
| - if (!HasChannel(candidate.component())) {
|
| - LOG(LS_WARNING) << "Ignoring candidate for unknown component "
|
| - << candidate.component();
|
| - return;
|
| - }
|
| -
|
| - ChannelParams* params = new ChannelParams(new Candidate(candidate));
|
| - worker_thread()->Post(this, MSG_ONREMOTECANDIDATE, params);
|
| -}
|
| -
|
| -void Transport::OnRemoteCandidate_w(const Candidate& candidate) {
|
| - ASSERT(worker_thread()->IsCurrent());
|
| - ChannelMap::iterator iter = channels_.find(candidate.component());
|
| - // It's ok for a channel to go away while this message is in transit.
|
| - if (iter != channels_.end()) {
|
| - iter->second->OnCandidate(candidate);
|
| + TransportChannelImpl* channel = GetChannel(iter->component());
|
| + if (channel != NULL) {
|
| + channel->AddRemoteCandidate(*iter);
|
| + }
|
| }
|
| + return true;
|
| }
|
|
|
| void Transport::OnChannelWritableState(TransportChannel* channel) {
|
| - ASSERT(worker_thread()->IsCurrent());
|
| - signaling_thread()->Post(this, MSG_WRITESTATE, NULL);
|
| -
|
| - MaybeCompleted_w();
|
| -}
|
| -
|
| -void Transport::OnChannelWritableState_s() {
|
| - ASSERT(signaling_thread()->IsCurrent());
|
| - TransportState writable = GetTransportState_s(TRANSPORT_WRITABLE_STATE);
|
| - if (writable_ != writable) {
|
| - was_writable_ = (writable_ == TRANSPORT_STATE_ALL);
|
| - writable_ = writable;
|
| - SignalWritableState(this);
|
| - }
|
| + LOG(LS_INFO) << name() << " TransportChannel " << channel->component()
|
| + << " writability changed to " << channel->writable()
|
| + << ". Check if transport is complete.";
|
| + UpdateWritableState();
|
| + MaybeSignalCompleted();
|
| }
|
|
|
| void Transport::OnChannelReceivingState(TransportChannel* channel) {
|
| - ASSERT(worker_thread()->IsCurrent());
|
| - signaling_thread()->Post(this, MSG_RECEIVINGSTATE);
|
| + UpdateReceivingState();
|
| }
|
|
|
| -void Transport::OnChannelReceivingState_s() {
|
| - ASSERT(signaling_thread()->IsCurrent());
|
| - TransportState receiving = GetTransportState_s(TRANSPORT_RECEIVING_STATE);
|
| - if (receiving_ != receiving) {
|
| - receiving_ = receiving;
|
| - SignalReceivingState(this);
|
| - }
|
| -}
|
| -
|
| -TransportState Transport::GetTransportState_s(TransportStateType state_type) {
|
| - ASSERT(signaling_thread()->IsCurrent());
|
| -
|
| - rtc::CritScope cs(&crit_);
|
| +TransportState Transport::GetTransportState(TransportStateType state_type) {
|
| bool any = false;
|
| bool all = !channels_.empty();
|
| for (const auto iter : channels_) {
|
| @@ -553,106 +469,48 @@ TransportState Transport::GetTransportState_s(TransportStateType state_type) {
|
| return TRANSPORT_STATE_NONE;
|
| }
|
|
|
| -void Transport::OnChannelRequestSignaling(TransportChannelImpl* channel) {
|
| - ASSERT(worker_thread()->IsCurrent());
|
| - // Resetting ICE state for the channel.
|
| - ChannelMap::iterator iter = channels_.find(channel->component());
|
| - if (iter != channels_.end())
|
| - iter->second.set_candidates_allocated(false);
|
| - signaling_thread()->Post(this, MSG_REQUESTSIGNALING, nullptr);
|
| -}
|
| -
|
| -void Transport::OnChannelRequestSignaling_s() {
|
| - ASSERT(signaling_thread()->IsCurrent());
|
| - LOG(LS_INFO) << "Transport: " << content_name_ << ", allocating candidates";
|
| - SignalRequestSignaling(this);
|
| +void Transport::OnChannelGatheringState(TransportChannelImpl* channel) {
|
| + ASSERT(channels_.find(channel->component()) != channels_.end());
|
| + UpdateGatheringState();
|
| + if (gathering_state_ == kIceGatheringComplete) {
|
| + // If UpdateGatheringState brought us to kIceGatheringComplete, check if
|
| + // our connection state is also "Completed". Otherwise, there's no point in
|
| + // checking (since it would only produce log messages).
|
| + MaybeSignalCompleted();
|
| + }
|
| }
|
|
|
| -void Transport::OnChannelCandidateReady(TransportChannelImpl* channel,
|
| - const Candidate& candidate) {
|
| +void Transport::OnChannelCandidateGathered(TransportChannelImpl* channel,
|
| + const Candidate& candidate) {
|
| // We should never signal peer-reflexive candidates.
|
| if (candidate.type() == PRFLX_PORT_TYPE) {
|
| ASSERT(false);
|
| return;
|
| }
|
|
|
| - ASSERT(worker_thread()->IsCurrent());
|
| - rtc::CritScope cs(&crit_);
|
| - ready_candidates_.push_back(candidate);
|
| -
|
| - // We hold any messages until the client lets us connect.
|
| if (connect_requested_) {
|
| - signaling_thread()->Post(
|
| - this, MSG_CANDIDATEREADY, NULL);
|
| - }
|
| -}
|
| -
|
| -void Transport::OnChannelCandidateReady_s() {
|
| - ASSERT(signaling_thread()->IsCurrent());
|
| - ASSERT(connect_requested_);
|
| -
|
| - std::vector<Candidate> candidates;
|
| - {
|
| - rtc::CritScope cs(&crit_);
|
| - candidates.swap(ready_candidates_);
|
| - }
|
| -
|
| - // we do the deleting of Candidate* here to keep the new above and
|
| - // delete below close to each other
|
| - if (!candidates.empty()) {
|
| - SignalCandidatesReady(this, candidates);
|
| + std::vector<Candidate> candidates;
|
| + candidates.push_back(candidate);
|
| + SignalCandidatesGathered(this, candidates);
|
| + } else {
|
| + // We hold any candidates until the client lets us connect.
|
| + ready_candidates_.push_back(candidate);
|
| }
|
| }
|
|
|
| void Transport::OnChannelRouteChange(TransportChannel* channel,
|
| const Candidate& remote_candidate) {
|
| - ASSERT(worker_thread()->IsCurrent());
|
| - ChannelParams* params = new ChannelParams(new Candidate(remote_candidate));
|
| - params->channel = static_cast<cricket::TransportChannelImpl*>(channel);
|
| - signaling_thread()->Post(this, MSG_ROUTECHANGE, params);
|
| -}
|
| -
|
| -void Transport::OnChannelRouteChange_s(const TransportChannel* channel,
|
| - const Candidate& remote_candidate) {
|
| - ASSERT(signaling_thread()->IsCurrent());
|
| SignalRouteChange(this, remote_candidate.component(), remote_candidate);
|
| }
|
|
|
| -void Transport::OnChannelCandidatesAllocationDone(
|
| - TransportChannelImpl* channel) {
|
| - ASSERT(worker_thread()->IsCurrent());
|
| - ChannelMap::iterator iter = channels_.find(channel->component());
|
| - ASSERT(iter != channels_.end());
|
| - LOG(LS_INFO) << "Transport: " << content_name_ << ", component "
|
| - << channel->component() << " allocation complete";
|
| -
|
| - iter->second.set_candidates_allocated(true);
|
| -
|
| - // If all channels belonging to this Transport got signal, then
|
| - // forward this signal to upper layer.
|
| - // Can this signal arrive before all transport channels are created?
|
| - for (auto& iter : channels_) {
|
| - if (!iter.second.candidates_allocated())
|
| - return;
|
| - }
|
| - signaling_thread_->Post(this, MSG_CANDIDATEALLOCATIONCOMPLETE);
|
| -
|
| - MaybeCompleted_w();
|
| -}
|
| -
|
| -void Transport::OnChannelCandidatesAllocationDone_s() {
|
| - ASSERT(signaling_thread()->IsCurrent());
|
| - LOG(LS_INFO) << "Transport: " << content_name_ << " allocation complete";
|
| - SignalCandidatesAllocationDone(this);
|
| -}
|
| -
|
| void Transport::OnRoleConflict(TransportChannelImpl* channel) {
|
| - signaling_thread_->Post(this, MSG_ROLECONFLICT);
|
| + SignalRoleConflict();
|
| }
|
|
|
| void Transport::OnChannelConnectionRemoved(TransportChannelImpl* channel) {
|
| - ASSERT(worker_thread()->IsCurrent());
|
| - MaybeCompleted_w();
|
| + LOG(LS_INFO) << name() << " TransportChannel " << channel->component()
|
| + << " connection removed. Check if transport is complete.";
|
| + MaybeSignalCompleted();
|
|
|
| // Check if the state is now Failed.
|
| // Failed is only available in the Controlling ICE role.
|
| @@ -660,158 +518,97 @@ void Transport::OnChannelConnectionRemoved(TransportChannelImpl* channel) {
|
| return;
|
| }
|
|
|
| - ChannelMap::iterator iter = channels_.find(channel->component());
|
| - ASSERT(iter != channels_.end());
|
| - // Failed can only occur after candidate allocation has stopped.
|
| - if (!iter->second.candidates_allocated()) {
|
| + // Failed can only occur after candidate gathering has stopped.
|
| + if (channel->gathering_state() != kIceGatheringComplete) {
|
| return;
|
| }
|
|
|
| if (channel->GetState() == TransportChannelState::STATE_FAILED) {
|
| // A Transport has failed if any of its channels have no remaining
|
| // connections.
|
| - signaling_thread_->Post(this, MSG_FAILED);
|
| + SignalFailed(this);
|
| }
|
| }
|
|
|
| -void Transport::MaybeCompleted_w() {
|
| - ASSERT(worker_thread()->IsCurrent());
|
| -
|
| - // When there is no channel created yet, calling this function could fire an
|
| - // IceConnectionCompleted event prematurely.
|
| - if (channels_.empty()) {
|
| - return;
|
| +void Transport::MaybeSignalCompleted() {
|
| + if (AllChannelsCompleted()) {
|
| + LOG(LS_INFO) << name() << " transport is complete"
|
| + << " because all the channels are complete.";
|
| + SignalCompleted(this);
|
| }
|
| -
|
| - // A Transport's ICE process is completed if all of its channels are writable,
|
| - // have finished allocating candidates, and have pruned all but one of their
|
| - // connections.
|
| - for (const auto& iter : channels_) {
|
| - const TransportChannelImpl* channel = iter.second.get();
|
| - if (!(channel->writable() &&
|
| - channel->GetState() == TransportChannelState::STATE_COMPLETED &&
|
| - channel->GetIceRole() == ICEROLE_CONTROLLING &&
|
| - iter.second.candidates_allocated())) {
|
| - return;
|
| - }
|
| - }
|
| -
|
| - signaling_thread_->Post(this, MSG_COMPLETED);
|
| + // TODO(deadbeef): Should we do anything if we previously were completed,
|
| + // but now are not (if, for example, a new remote candidate is added)?
|
| }
|
|
|
| -void Transport::SetIceRole_w(IceRole role) {
|
| - ASSERT(worker_thread()->IsCurrent());
|
| - rtc::CritScope cs(&crit_);
|
| - ice_role_ = role;
|
| - for (auto& iter : channels_) {
|
| - iter.second->SetIceRole(ice_role_);
|
| - }
|
| -}
|
| -
|
| -void Transport::SetRemoteIceMode_w(IceMode mode) {
|
| - ASSERT(worker_thread()->IsCurrent());
|
| - remote_ice_mode_ = mode;
|
| - // Shouldn't channels be created after this method executed?
|
| - for (auto& iter : channels_) {
|
| - iter.second->SetRemoteIceMode(remote_ice_mode_);
|
| +void Transport::UpdateGatheringState() {
|
| + IceGatheringState new_state = kIceGatheringNew;
|
| + bool any_gathering = false;
|
| + bool all_complete = !channels_.empty();
|
| + for (const auto& kv : channels_) {
|
| + any_gathering =
|
| + any_gathering || kv.second->gathering_state() != kIceGatheringNew;
|
| + all_complete =
|
| + all_complete && kv.second->gathering_state() == kIceGatheringComplete;
|
| + }
|
| + if (all_complete) {
|
| + new_state = kIceGatheringComplete;
|
| + } else if (any_gathering) {
|
| + new_state = kIceGatheringGathering;
|
| + }
|
| +
|
| + if (gathering_state_ != new_state) {
|
| + gathering_state_ = new_state;
|
| + if (gathering_state_ == kIceGatheringGathering) {
|
| + LOG(LS_INFO) << "Transport: " << name_ << ", gathering candidates";
|
| + } else if (gathering_state_ == kIceGatheringComplete) {
|
| + LOG(LS_INFO) << "Transport " << name() << " gathering complete.";
|
| + }
|
| + SignalGatheringState(this);
|
| }
|
| }
|
|
|
| -bool Transport::SetLocalTransportDescription_w(
|
| - const TransportDescription& desc,
|
| - ContentAction action,
|
| - std::string* error_desc) {
|
| - ASSERT(worker_thread()->IsCurrent());
|
| - bool ret = true;
|
| -
|
| - if (!VerifyIceParams(desc)) {
|
| - return BadTransportDescription("Invalid ice-ufrag or ice-pwd length",
|
| - error_desc);
|
| - }
|
| -
|
| - // TODO(tommi,pthatcher): I'm not sure why we need to grab this lock at this
|
| - // point. |local_description_| seems to always be modified on the worker
|
| - // thread, so we should be able to use it here without grabbing the lock.
|
| - // However, we _might_ need it before the call to reset() below?
|
| - // Raw access to |local_description_| is granted to derived transports outside
|
| - // of locking (see local_description() in the header file).
|
| - // The contract is that the derived implementations must be aware of when the
|
| - // description might change and do appropriate synchronization.
|
| - rtc::CritScope cs(&crit_);
|
| - if (local_description_ && IceCredentialsChanged(*local_description_, desc)) {
|
| - IceRole new_ice_role = (action == CA_OFFER) ? ICEROLE_CONTROLLING
|
| - : ICEROLE_CONTROLLED;
|
| -
|
| - // It must be called before ApplyLocalTransportDescription_w, which may
|
| - // trigger an ICE restart and depends on the new ICE role.
|
| - SetIceRole_w(new_ice_role);
|
| - }
|
| -
|
| - local_description_.reset(new TransportDescription(desc));
|
| -
|
| - for (auto& iter : channels_) {
|
| - ret &= ApplyLocalTransportDescription_w(iter.second.get(), error_desc);
|
| - }
|
| - if (!ret)
|
| - return false;
|
| -
|
| - // If PRANSWER/ANSWER is set, we should decide transport protocol type.
|
| - if (action == CA_PRANSWER || action == CA_ANSWER) {
|
| - ret &= NegotiateTransportDescription_w(action, error_desc);
|
| +void Transport::UpdateReceivingState() {
|
| + TransportState receiving = GetTransportState(TRANSPORT_RECEIVING_STATE);
|
| + if (receiving_ != receiving) {
|
| + receiving_ = receiving;
|
| + SignalReceivingState(this);
|
| }
|
| - return ret;
|
| }
|
|
|
| -bool Transport::SetRemoteTransportDescription_w(
|
| - const TransportDescription& desc,
|
| - ContentAction action,
|
| - std::string* error_desc) {
|
| - bool ret = true;
|
| -
|
| - if (!VerifyIceParams(desc)) {
|
| - return BadTransportDescription("Invalid ice-ufrag or ice-pwd length",
|
| - error_desc);
|
| - }
|
| -
|
| - // TODO(tommi,pthatcher): See todo for local_description_ above.
|
| - rtc::CritScope cs(&crit_);
|
| - remote_description_.reset(new TransportDescription(desc));
|
| - for (auto& iter : channels_) {
|
| - ret &= ApplyRemoteTransportDescription_w(iter.second.get(), error_desc);
|
| - }
|
| -
|
| - // If PRANSWER/ANSWER is set, we should decide transport protocol type.
|
| - if (action == CA_PRANSWER || action == CA_ANSWER) {
|
| - ret = NegotiateTransportDescription_w(CA_OFFER, error_desc);
|
| +void Transport::UpdateWritableState() {
|
| + TransportState writable = GetTransportState(TRANSPORT_WRITABLE_STATE);
|
| + LOG(LS_INFO) << name() << " transport writable state changed? " << writable_
|
| + << " => " << writable;
|
| + if (writable_ != writable) {
|
| + was_writable_ = (writable_ == TRANSPORT_STATE_ALL);
|
| + writable_ = writable;
|
| + SignalWritableState(this);
|
| }
|
| - return ret;
|
| }
|
|
|
| -bool Transport::ApplyLocalTransportDescription_w(TransportChannelImpl* ch,
|
| - std::string* error_desc) {
|
| - ASSERT(worker_thread()->IsCurrent());
|
| +bool Transport::ApplyLocalTransportDescription(TransportChannelImpl* ch,
|
| + std::string* error_desc) {
|
| ch->SetIceCredentials(local_description_->ice_ufrag,
|
| local_description_->ice_pwd);
|
| return true;
|
| }
|
|
|
| -bool Transport::ApplyRemoteTransportDescription_w(TransportChannelImpl* ch,
|
| - std::string* error_desc) {
|
| +bool Transport::ApplyRemoteTransportDescription(TransportChannelImpl* ch,
|
| + std::string* error_desc) {
|
| ch->SetRemoteIceCredentials(remote_description_->ice_ufrag,
|
| remote_description_->ice_pwd);
|
| return true;
|
| }
|
|
|
| -bool Transport::ApplyNegotiatedTransportDescription_w(
|
| - TransportChannelImpl* channel, std::string* error_desc) {
|
| - ASSERT(worker_thread()->IsCurrent());
|
| +bool Transport::ApplyNegotiatedTransportDescription(
|
| + TransportChannelImpl* channel,
|
| + std::string* error_desc) {
|
| channel->SetRemoteIceMode(remote_ice_mode_);
|
| return true;
|
| }
|
|
|
| -bool Transport::NegotiateTransportDescription_w(ContentAction local_role,
|
| - std::string* error_desc) {
|
| - ASSERT(worker_thread()->IsCurrent());
|
| +bool Transport::NegotiateTransportDescription(ContentAction local_role,
|
| + std::string* error_desc) {
|
| // TODO(ekr@rtfm.com): This is ICE-specific stuff. Refactor into
|
| // P2PTransport.
|
|
|
| @@ -819,7 +616,7 @@ bool Transport::NegotiateTransportDescription_w(ContentAction local_role,
|
| // ice_lite, this local end point should take CONTROLLING role.
|
| if (ice_role_ == ICEROLE_CONTROLLED &&
|
| remote_description_->ice_mode == ICEMODE_LITE) {
|
| - SetIceRole_w(ICEROLE_CONTROLLING);
|
| + SetIceRole(ICEROLE_CONTROLLING);
|
| }
|
|
|
| // Update remote ice_mode to all existing channels.
|
| @@ -831,57 +628,10 @@ bool Transport::NegotiateTransportDescription_w(ContentAction local_role,
|
| // creation, we have the negotiation state saved until a new
|
| // negotiation happens.
|
| for (auto& iter : channels_) {
|
| - if (!ApplyNegotiatedTransportDescription_w(iter.second.get(), error_desc))
|
| + if (!ApplyNegotiatedTransportDescription(iter.second.get(), error_desc))
|
| return false;
|
| }
|
| return true;
|
| }
|
|
|
| -void Transport::OnMessage(rtc::Message* msg) {
|
| - switch (msg->message_id) {
|
| - case MSG_ONSIGNALINGREADY:
|
| - CallChannels_w(&TransportChannelImpl::OnSignalingReady);
|
| - break;
|
| - case MSG_ONREMOTECANDIDATE: {
|
| - ChannelParams* params = static_cast<ChannelParams*>(msg->pdata);
|
| - OnRemoteCandidate_w(*params->candidate);
|
| - delete params;
|
| - }
|
| - break;
|
| - case MSG_CONNECTING:
|
| - OnConnecting_s();
|
| - break;
|
| - case MSG_WRITESTATE:
|
| - OnChannelWritableState_s();
|
| - break;
|
| - case MSG_RECEIVINGSTATE:
|
| - OnChannelReceivingState_s();
|
| - break;
|
| - case MSG_REQUESTSIGNALING:
|
| - OnChannelRequestSignaling_s();
|
| - break;
|
| - case MSG_CANDIDATEREADY:
|
| - OnChannelCandidateReady_s();
|
| - break;
|
| - case MSG_ROUTECHANGE: {
|
| - ChannelParams* params = static_cast<ChannelParams*>(msg->pdata);
|
| - OnChannelRouteChange_s(params->channel, *params->candidate);
|
| - delete params;
|
| - }
|
| - break;
|
| - case MSG_CANDIDATEALLOCATIONCOMPLETE:
|
| - OnChannelCandidatesAllocationDone_s();
|
| - break;
|
| - case MSG_ROLECONFLICT:
|
| - SignalRoleConflict();
|
| - break;
|
| - case MSG_COMPLETED:
|
| - SignalCompleted(this);
|
| - break;
|
| - case MSG_FAILED:
|
| - SignalFailed(this);
|
| - break;
|
| - }
|
| -}
|
| -
|
| } // namespace cricket
|
|
|