| Index: webrtc/p2p/base/transport.cc
|
| diff --git a/webrtc/p2p/base/transport.cc b/webrtc/p2p/base/transport.cc
|
| index b7aba7540a5699e165994a143bbae9c5852ec5a8..d626ad3d65acf284a6022428cbfb43615a6d07e3 100644
|
| --- a/webrtc/p2p/base/transport.cc
|
| +++ b/webrtc/p2p/base/transport.cc
|
| @@ -8,8 +8,6 @@
|
| * be found in the AUTHORS file in the root of the source tree.
|
| */
|
|
|
| -#include <utility> // for std::pair
|
| -
|
| #include "webrtc/p2p/base/transport.h"
|
|
|
| #include "webrtc/p2p/base/candidate.h"
|
| @@ -23,6 +21,39 @@
|
| namespace cricket {
|
|
|
| using rtc::Bind;
|
| +
|
| +enum {
|
| + MSG_ONSIGNALINGREADY = 1,
|
| + MSG_ONREMOTECANDIDATE,
|
| + MSG_WRITESTATE,
|
| + MSG_REQUESTSIGNALING,
|
| + MSG_CANDIDATEREADY,
|
| + MSG_ROUTECHANGE,
|
| + MSG_CONNECTING,
|
| + MSG_CANDIDATEALLOCATIONCOMPLETE,
|
| + MSG_ROLECONFLICT,
|
| + MSG_COMPLETED,
|
| + MSG_FAILED,
|
| + MSG_RECEIVINGSTATE,
|
| +};
|
| +
|
| +struct ChannelParams : public rtc::MessageData {
|
| + ChannelParams() : channel(NULL), candidate(NULL) {}
|
| + explicit ChannelParams(int component)
|
| + : component(component), channel(NULL), candidate(NULL) {}
|
| + explicit ChannelParams(Candidate* candidate)
|
| + : channel(NULL), candidate(candidate) {
|
| + }
|
| +
|
| + ~ChannelParams() {
|
| + delete candidate;
|
| + }
|
| +
|
| + std::string name;
|
| + int component;
|
| + TransportChannelImpl* channel;
|
| + Candidate* candidate;
|
| +};
|
|
|
| static bool VerifyIceParams(const TransportDescription& desc) {
|
| // For legacy protocols.
|
| @@ -65,59 +96,58 @@
|
| new_desc.ice_ufrag, new_desc.ice_pwd);
|
| }
|
|
|
| -Transport::Transport(const std::string& name, PortAllocator* allocator)
|
| - : name_(name), allocator_(allocator) {}
|
| +Transport::Transport(rtc::Thread* signaling_thread,
|
| + rtc::Thread* worker_thread,
|
| + const std::string& content_name,
|
| + PortAllocator* allocator)
|
| + : signaling_thread_(signaling_thread),
|
| + worker_thread_(worker_thread),
|
| + content_name_(content_name),
|
| + allocator_(allocator),
|
| + destroyed_(false),
|
| + readable_(TRANSPORT_STATE_NONE),
|
| + writable_(TRANSPORT_STATE_NONE),
|
| + receiving_(TRANSPORT_STATE_NONE),
|
| + was_writable_(false),
|
| + connect_requested_(false),
|
| + ice_role_(ICEROLE_UNKNOWN),
|
| + tiebreaker_(0),
|
| + remote_ice_mode_(ICEMODE_FULL),
|
| + channel_receiving_timeout_(-1) {
|
| +}
|
|
|
| Transport::~Transport() {
|
| - ASSERT(channels_destroyed_);
|
| -}
|
| -
|
| -bool Transport::AllChannelsCompleted() const {
|
| - // We aren't completed until at least one channel is complete, so if there
|
| - // are no channels, we aren't complete yet.
|
| - if (channels_.empty()) {
|
| - LOG(LS_INFO) << name() << " transport is not complete"
|
| - << " because it has no TransportChannels";
|
| - return false;
|
| - }
|
| -
|
| - // A Transport's ICE process is completed if all of its channels are writable,
|
| - // have finished allocating candidates, and have pruned all but one of their
|
| - // connections.
|
| - for (const auto& iter : channels_) {
|
| - const TransportChannelImpl* channel = iter.second.get();
|
| - bool complete =
|
| - channel->writable() &&
|
| - channel->GetState() == TransportChannelState::STATE_COMPLETED &&
|
| - channel->GetIceRole() == ICEROLE_CONTROLLING &&
|
| - channel->gathering_state() == kIceGatheringComplete;
|
| - if (!complete) {
|
| - LOG(LS_INFO) << name() << " transport is not complete"
|
| - << " because a channel is still incomplete.";
|
| - return false;
|
| - }
|
| - }
|
| -
|
| - return true;
|
| -}
|
| -
|
| -bool Transport::AnyChannelFailed() const {
|
| - for (const auto& iter : channels_) {
|
| - if (iter.second->GetState() == TransportChannelState::STATE_FAILED) {
|
| - return true;
|
| - }
|
| - }
|
| - return false;
|
| + ASSERT(signaling_thread_->IsCurrent());
|
| + ASSERT(destroyed_);
|
| }
|
|
|
| void Transport::SetIceRole(IceRole role) {
|
| - ice_role_ = role;
|
| - for (auto& iter : channels_) {
|
| - iter.second->SetIceRole(ice_role_);
|
| - }
|
| + worker_thread_->Invoke<void>(Bind(&Transport::SetIceRole_w, this, role));
|
| +}
|
| +
|
| +void Transport::SetCertificate(
|
| + const rtc::scoped_refptr<rtc::RTCCertificate>& certificate) {
|
| + worker_thread_->Invoke<void>(Bind(&Transport::SetCertificate_w, this,
|
| + certificate));
|
| +}
|
| +
|
| +bool Transport::GetCertificate(
|
| + rtc::scoped_refptr<rtc::RTCCertificate>* certificate) {
|
| + // The identity is set on the worker thread, so for safety it must also be
|
| + // acquired on the worker thread.
|
| + return worker_thread_->Invoke<bool>(
|
| + Bind(&Transport::GetCertificate_w, this, certificate));
|
| }
|
|
|
| bool Transport::GetRemoteSSLCertificate(rtc::SSLCertificate** cert) {
|
| + // Channels can be deleted on the worker thread, so for safety the remote
|
| + // certificate is acquired on the worker thread.
|
| + return worker_thread_->Invoke<bool>(
|
| + Bind(&Transport::GetRemoteSSLCertificate_w, this, cert));
|
| +}
|
| +
|
| +bool Transport::GetRemoteSSLCertificate_w(rtc::SSLCertificate** cert) {
|
| + ASSERT(worker_thread()->IsCurrent());
|
| if (channels_.empty())
|
| return false;
|
|
|
| @@ -126,6 +156,12 @@
|
| }
|
|
|
| void Transport::SetChannelReceivingTimeout(int timeout_ms) {
|
| + worker_thread_->Invoke<void>(
|
| + Bind(&Transport::SetChannelReceivingTimeout_w, this, timeout_ms));
|
| +}
|
| +
|
| +void Transport::SetChannelReceivingTimeout_w(int timeout_ms) {
|
| + ASSERT(worker_thread()->IsCurrent());
|
| channel_receiving_timeout_ = timeout_ms;
|
| for (const auto& kv : channels_) {
|
| kv.second->SetReceivingTimeout(timeout_ms);
|
| @@ -136,73 +172,35 @@
|
| const TransportDescription& description,
|
| ContentAction action,
|
| std::string* error_desc) {
|
| - bool ret = true;
|
| -
|
| - if (!VerifyIceParams(description)) {
|
| - return BadTransportDescription("Invalid ice-ufrag or ice-pwd length",
|
| - error_desc);
|
| - }
|
| -
|
| - if (local_description_ &&
|
| - IceCredentialsChanged(*local_description_, description)) {
|
| - IceRole new_ice_role =
|
| - (action == CA_OFFER) ? ICEROLE_CONTROLLING : ICEROLE_CONTROLLED;
|
| -
|
| - // It must be called before ApplyLocalTransportDescription, which may
|
| - // trigger an ICE restart and depends on the new ICE role.
|
| - SetIceRole(new_ice_role);
|
| - }
|
| -
|
| - local_description_.reset(new TransportDescription(description));
|
| -
|
| - for (auto& iter : channels_) {
|
| - ret &= ApplyLocalTransportDescription(iter.second.get(), error_desc);
|
| - }
|
| - if (!ret) {
|
| - return false;
|
| - }
|
| -
|
| - // If PRANSWER/ANSWER is set, we should decide transport protocol type.
|
| - if (action == CA_PRANSWER || action == CA_ANSWER) {
|
| - ret &= NegotiateTransportDescription(action, error_desc);
|
| - }
|
| - if (ret) {
|
| - local_description_set_ = true;
|
| - ConnectChannels();
|
| - }
|
| -
|
| - return ret;
|
| + return worker_thread_->Invoke<bool>(Bind(
|
| + &Transport::SetLocalTransportDescription_w, this,
|
| + description, action, error_desc));
|
| }
|
|
|
| bool Transport::SetRemoteTransportDescription(
|
| const TransportDescription& description,
|
| ContentAction action,
|
| std::string* error_desc) {
|
| - bool ret = true;
|
| -
|
| - if (!VerifyIceParams(description)) {
|
| - return BadTransportDescription("Invalid ice-ufrag or ice-pwd length",
|
| - error_desc);
|
| - }
|
| -
|
| - remote_description_.reset(new TransportDescription(description));
|
| - for (auto& iter : channels_) {
|
| - ret &= ApplyRemoteTransportDescription(iter.second.get(), error_desc);
|
| - }
|
| -
|
| - // If PRANSWER/ANSWER is set, we should decide transport protocol type.
|
| - if (action == CA_PRANSWER || action == CA_ANSWER) {
|
| - ret = NegotiateTransportDescription(CA_OFFER, error_desc);
|
| - }
|
| - if (ret) {
|
| - remote_description_set_ = true;
|
| - }
|
| -
|
| - return ret;
|
| + return worker_thread_->Invoke<bool>(Bind(
|
| + &Transport::SetRemoteTransportDescription_w, this,
|
| + description, action, error_desc));
|
| }
|
|
|
| TransportChannelImpl* Transport::CreateChannel(int component) {
|
| + return worker_thread_->Invoke<TransportChannelImpl*>(Bind(
|
| + &Transport::CreateChannel_w, this, component));
|
| +}
|
| +
|
| +TransportChannelImpl* Transport::CreateChannel_w(int component) {
|
| + ASSERT(worker_thread()->IsCurrent());
|
| TransportChannelImpl* impl;
|
| + // TODO(tommi): We don't really need to grab the lock until the actual call
|
| + // to insert() below and presumably hold it throughout initialization of
|
| + // |impl| after the impl_exists check. Maybe we can factor that out to
|
| + // a separate function and not grab the lock in this function.
|
| + // Actually, we probably don't need to hold the lock while initializing
|
| + // |impl| since we can just do the insert when that's done.
|
| + rtc::CritScope cs(&crit_);
|
|
|
| // Create the entry if it does not exist.
|
| bool impl_exists = false;
|
| @@ -218,7 +216,7 @@
|
|
|
| // Increase the ref count.
|
| iterator->second.AddRef();
|
| - channels_destroyed_ = false;
|
| + destroyed_ = false;
|
|
|
| if (impl_exists) {
|
| // If this is an existing channel, we should just return it without
|
| @@ -230,21 +228,23 @@
|
| impl->SetIceRole(ice_role_);
|
| impl->SetIceTiebreaker(tiebreaker_);
|
| impl->SetReceivingTimeout(channel_receiving_timeout_);
|
| - // TODO(ronghuawu): Change CreateChannel to be able to return error since
|
| - // below Apply**Description calls can fail.
|
| + // TODO(ronghuawu): Change CreateChannel_w to be able to return error since
|
| + // below Apply**Description_w calls can fail.
|
| if (local_description_)
|
| - ApplyLocalTransportDescription(impl, NULL);
|
| + ApplyLocalTransportDescription_w(impl, NULL);
|
| if (remote_description_)
|
| - ApplyRemoteTransportDescription(impl, NULL);
|
| + ApplyRemoteTransportDescription_w(impl, NULL);
|
| if (local_description_ && remote_description_)
|
| - ApplyNegotiatedTransportDescription(impl, NULL);
|
| + ApplyNegotiatedTransportDescription_w(impl, NULL);
|
|
|
| impl->SignalWritableState.connect(this, &Transport::OnChannelWritableState);
|
| impl->SignalReceivingState.connect(this, &Transport::OnChannelReceivingState);
|
| - impl->SignalGatheringState.connect(this, &Transport::OnChannelGatheringState);
|
| - impl->SignalCandidateGathered.connect(this,
|
| - &Transport::OnChannelCandidateGathered);
|
| + impl->SignalRequestSignaling.connect(
|
| + this, &Transport::OnChannelRequestSignaling);
|
| + impl->SignalCandidateReady.connect(this, &Transport::OnChannelCandidateReady);
|
| impl->SignalRouteChange.connect(this, &Transport::OnChannelRouteChange);
|
| + impl->SignalCandidatesAllocationDone.connect(
|
| + this, &Transport::OnChannelCandidatesAllocationDone);
|
| impl->SignalRoleConflict.connect(this, &Transport::OnRoleConflict);
|
| impl->SignalConnectionRemoved.connect(
|
| this, &Transport::OnChannelConnectionRemoved);
|
| @@ -254,22 +254,36 @@
|
| if (channels_.size() == 1) {
|
| // If this is the first channel, then indicate that we have started
|
| // connecting.
|
| - SignalConnecting(this);
|
| + signaling_thread()->Post(this, MSG_CONNECTING, NULL);
|
| }
|
| }
|
| return impl;
|
| }
|
|
|
| TransportChannelImpl* Transport::GetChannel(int component) {
|
| + // TODO(tommi,pthatcher): Since we're returning a pointer from the channels_
|
| + // map, shouldn't we assume that we're on the worker thread? (The pointer
|
| + // will be used outside of the lock).
|
| + // And if we're on the worker thread, which is the only thread that modifies
|
| + // channels_, can we skip grabbing the lock?
|
| + rtc::CritScope cs(&crit_);
|
| ChannelMap::iterator iter = channels_.find(component);
|
| return (iter != channels_.end()) ? iter->second.get() : NULL;
|
| }
|
|
|
| bool Transport::HasChannels() {
|
| + rtc::CritScope cs(&crit_);
|
| return !channels_.empty();
|
| }
|
|
|
| void Transport::DestroyChannel(int component) {
|
| + worker_thread_->Invoke<void>(Bind(
|
| + &Transport::DestroyChannel_w, this, component));
|
| +}
|
| +
|
| +void Transport::DestroyChannel_w(int component) {
|
| + ASSERT(worker_thread()->IsCurrent());
|
| +
|
| ChannelMap::iterator iter = channels_.find(component);
|
| if (iter == channels_.end())
|
| return;
|
| @@ -279,30 +293,34 @@
|
| iter->second.DecRef();
|
| if (!iter->second.ref()) {
|
| impl = iter->second.get();
|
| + rtc::CritScope cs(&crit_);
|
| channels_.erase(iter);
|
| }
|
|
|
| if (connect_requested_ && channels_.empty()) {
|
| // We're no longer attempting to connect.
|
| - SignalConnecting(this);
|
| + signaling_thread()->Post(this, MSG_CONNECTING, NULL);
|
| }
|
|
|
| if (impl) {
|
| + // Check in case the deleted channel was the only non-writable channel.
|
| + OnChannelWritableState(impl);
|
| DestroyTransportChannel(impl);
|
| - // Need to update aggregate state after destroying a channel,
|
| - // for example if it was the only one that wasn't yet writable.
|
| - UpdateWritableState();
|
| - UpdateReceivingState();
|
| - UpdateGatheringState();
|
| - MaybeSignalCompleted();
|
| }
|
| }
|
|
|
| void Transport::ConnectChannels() {
|
| + ASSERT(signaling_thread()->IsCurrent());
|
| + worker_thread_->Invoke<void>(Bind(&Transport::ConnectChannels_w, this));
|
| +}
|
| +
|
| +void Transport::ConnectChannels_w() {
|
| + ASSERT(worker_thread()->IsCurrent());
|
| if (connect_requested_ || channels_.empty())
|
| return;
|
|
|
| connect_requested_ = true;
|
| + signaling_thread()->Post(this, MSG_CANDIDATEREADY, NULL);
|
|
|
| if (!local_description_) {
|
| // TOOD(mallinath) : TransportDescription(TD) shouldn't be generated here.
|
| @@ -311,28 +329,38 @@
|
| // Session.
|
| // Session must generate local TD before remote candidates pushed when
|
| // initiate request initiated by the remote.
|
| - LOG(LS_INFO) << "Transport::ConnectChannels: No local description has "
|
| + LOG(LS_INFO) << "Transport::ConnectChannels_w: No local description has "
|
| << "been set. Will generate one.";
|
| - TransportDescription desc(
|
| - std::vector<std::string>(), rtc::CreateRandomString(ICE_UFRAG_LENGTH),
|
| - rtc::CreateRandomString(ICE_PWD_LENGTH), ICEMODE_FULL,
|
| - CONNECTIONROLE_NONE, NULL, Candidates());
|
| - SetLocalTransportDescription(desc, CA_OFFER, NULL);
|
| - }
|
| -
|
| - CallChannels(&TransportChannelImpl::Connect);
|
| - if (HasChannels()) {
|
| - SignalConnecting(this);
|
| - }
|
| -}
|
| -
|
| -void Transport::MaybeStartGathering() {
|
| - if (connect_requested_) {
|
| - CallChannels(&TransportChannelImpl::MaybeStartGathering);
|
| - }
|
| + TransportDescription desc(std::vector<std::string>(),
|
| + rtc::CreateRandomString(ICE_UFRAG_LENGTH),
|
| + rtc::CreateRandomString(ICE_PWD_LENGTH),
|
| + ICEMODE_FULL, CONNECTIONROLE_NONE, NULL,
|
| + Candidates());
|
| + SetLocalTransportDescription_w(desc, CA_OFFER, NULL);
|
| + }
|
| +
|
| + CallChannels_w(&TransportChannelImpl::Connect);
|
| + if (!channels_.empty()) {
|
| + signaling_thread()->Post(this, MSG_CONNECTING, NULL);
|
| + }
|
| +}
|
| +
|
| +void Transport::OnConnecting_s() {
|
| + ASSERT(signaling_thread()->IsCurrent());
|
| + SignalConnecting(this);
|
| }
|
|
|
| void Transport::DestroyAllChannels() {
|
| + ASSERT(signaling_thread()->IsCurrent());
|
| + worker_thread_->Invoke<void>(Bind(&Transport::DestroyAllChannels_w, this));
|
| + worker_thread()->Clear(this);
|
| + signaling_thread()->Clear(this);
|
| + destroyed_ = true;
|
| +}
|
| +
|
| +void Transport::DestroyAllChannels_w() {
|
| + ASSERT(worker_thread()->IsCurrent());
|
| +
|
| std::vector<TransportChannelImpl*> impls;
|
| for (auto& iter : channels_) {
|
| iter.second.DecRef();
|
| @@ -340,15 +368,27 @@
|
| impls.push_back(iter.second.get());
|
| }
|
|
|
| - channels_.clear();
|
| -
|
| - for (TransportChannelImpl* impl : impls) {
|
| - DestroyTransportChannel(impl);
|
| - }
|
| - channels_destroyed_ = true;
|
| -}
|
| -
|
| -void Transport::CallChannels(TransportChannelFunc func) {
|
| + {
|
| + rtc::CritScope cs(&crit_);
|
| + channels_.clear();
|
| + }
|
| +
|
| + for (size_t i = 0; i < impls.size(); ++i)
|
| + DestroyTransportChannel(impls[i]);
|
| +}
|
| +
|
| +void Transport::OnSignalingReady() {
|
| + ASSERT(signaling_thread()->IsCurrent());
|
| + if (destroyed_) return;
|
| +
|
| + worker_thread()->Post(this, MSG_ONSIGNALINGREADY, NULL);
|
| +
|
| + // Notify the subclass.
|
| + OnTransportSignalingReady();
|
| +}
|
| +
|
| +void Transport::CallChannels_w(TransportChannelFunc func) {
|
| + ASSERT(worker_thread()->IsCurrent());
|
| for (const auto& iter : channels_) {
|
| ((iter.second.get())->*func)();
|
| }
|
| @@ -387,7 +427,14 @@
|
|
|
|
|
| bool Transport::GetStats(TransportStats* stats) {
|
| - stats->transport_name = name();
|
| + ASSERT(signaling_thread()->IsCurrent());
|
| + return worker_thread_->Invoke<bool>(Bind(
|
| + &Transport::GetStats_w, this, stats));
|
| +}
|
| +
|
| +bool Transport::GetStats_w(TransportStats* stats) {
|
| + ASSERT(worker_thread()->IsCurrent());
|
| + stats->content_name = content_name();
|
| stats->channel_stats.clear();
|
| for (auto iter : channels_) {
|
| ChannelMapEntry& entry = iter.second;
|
| @@ -403,45 +450,82 @@
|
| return true;
|
| }
|
|
|
| -bool Transport::AddRemoteCandidates(const std::vector<Candidate>& candidates,
|
| - std::string* error) {
|
| - ASSERT(!channels_destroyed_);
|
| - // Verify each candidate before passing down to transport layer.
|
| - for (const Candidate& cand : candidates) {
|
| - if (!VerifyCandidate(cand, error)) {
|
| - return false;
|
| - }
|
| - if (!HasChannel(cand.component())) {
|
| - *error = "Candidate has unknown component: " + cand.ToString() +
|
| - " for content: " + name();
|
| - return false;
|
| - }
|
| - }
|
| -
|
| +bool Transport::GetSslRole(rtc::SSLRole* ssl_role) const {
|
| + return worker_thread_->Invoke<bool>(Bind(
|
| + &Transport::GetSslRole_w, this, ssl_role));
|
| +}
|
| +
|
| +bool Transport::SetSslMaxProtocolVersion(rtc::SSLProtocolVersion version) {
|
| + return worker_thread_->Invoke<bool>(Bind(
|
| + &Transport::SetSslMaxProtocolVersion_w, this, version));
|
| +}
|
| +
|
| +void Transport::OnRemoteCandidates(const std::vector<Candidate>& candidates) {
|
| for (std::vector<Candidate>::const_iterator iter = candidates.begin();
|
| iter != candidates.end();
|
| ++iter) {
|
| - TransportChannelImpl* channel = GetChannel(iter->component());
|
| - if (channel != NULL) {
|
| - channel->AddRemoteCandidate(*iter);
|
| - }
|
| - }
|
| - return true;
|
| + OnRemoteCandidate(*iter);
|
| + }
|
| +}
|
| +
|
| +void Transport::OnRemoteCandidate(const Candidate& candidate) {
|
| + ASSERT(signaling_thread()->IsCurrent());
|
| + if (destroyed_) return;
|
| +
|
| + if (!HasChannel(candidate.component())) {
|
| + LOG(LS_WARNING) << "Ignoring candidate for unknown component "
|
| + << candidate.component();
|
| + return;
|
| + }
|
| +
|
| + ChannelParams* params = new ChannelParams(new Candidate(candidate));
|
| + worker_thread()->Post(this, MSG_ONREMOTECANDIDATE, params);
|
| +}
|
| +
|
| +void Transport::OnRemoteCandidate_w(const Candidate& candidate) {
|
| + ASSERT(worker_thread()->IsCurrent());
|
| + ChannelMap::iterator iter = channels_.find(candidate.component());
|
| + // It's ok for a channel to go away while this message is in transit.
|
| + if (iter != channels_.end()) {
|
| + iter->second->OnCandidate(candidate);
|
| + }
|
| }
|
|
|
| void Transport::OnChannelWritableState(TransportChannel* channel) {
|
| - LOG(LS_INFO) << name() << " TransportChannel " << channel->component()
|
| - << " writability changed to " << channel->writable()
|
| - << ". Check if transport is complete.";
|
| - UpdateWritableState();
|
| - MaybeSignalCompleted();
|
| + ASSERT(worker_thread()->IsCurrent());
|
| + signaling_thread()->Post(this, MSG_WRITESTATE, NULL);
|
| +
|
| + MaybeCompleted_w();
|
| +}
|
| +
|
| +void Transport::OnChannelWritableState_s() {
|
| + ASSERT(signaling_thread()->IsCurrent());
|
| + TransportState writable = GetTransportState_s(TRANSPORT_WRITABLE_STATE);
|
| + if (writable_ != writable) {
|
| + was_writable_ = (writable_ == TRANSPORT_STATE_ALL);
|
| + writable_ = writable;
|
| + SignalWritableState(this);
|
| + }
|
| }
|
|
|
| void Transport::OnChannelReceivingState(TransportChannel* channel) {
|
| - UpdateReceivingState();
|
| -}
|
| -
|
| -TransportState Transport::GetTransportState(TransportStateType state_type) {
|
| + ASSERT(worker_thread()->IsCurrent());
|
| + signaling_thread()->Post(this, MSG_RECEIVINGSTATE);
|
| +}
|
| +
|
| +void Transport::OnChannelReceivingState_s() {
|
| + ASSERT(signaling_thread()->IsCurrent());
|
| + TransportState receiving = GetTransportState_s(TRANSPORT_RECEIVING_STATE);
|
| + if (receiving_ != receiving) {
|
| + receiving_ = receiving;
|
| + SignalReceivingState(this);
|
| + }
|
| +}
|
| +
|
| +TransportState Transport::GetTransportState_s(TransportStateType state_type) {
|
| + ASSERT(signaling_thread()->IsCurrent());
|
| +
|
| + rtc::CritScope cs(&crit_);
|
| bool any = false;
|
| bool all = !channels_.empty();
|
| for (const auto iter : channels_) {
|
| @@ -469,44 +553,106 @@
|
| return TRANSPORT_STATE_NONE;
|
| }
|
|
|
| -void Transport::OnChannelGatheringState(TransportChannelImpl* channel) {
|
| - ASSERT(channels_.find(channel->component()) != channels_.end());
|
| - UpdateGatheringState();
|
| - if (gathering_state_ == kIceGatheringComplete) {
|
| - // If UpdateGatheringState brought us to kIceGatheringComplete, check if
|
| - // our connection state is also "Completed". Otherwise, there's no point in
|
| - // checking (since it would only produce log messages).
|
| - MaybeSignalCompleted();
|
| - }
|
| -}
|
| -
|
| -void Transport::OnChannelCandidateGathered(TransportChannelImpl* channel,
|
| - const Candidate& candidate) {
|
| +void Transport::OnChannelRequestSignaling(TransportChannelImpl* channel) {
|
| + ASSERT(worker_thread()->IsCurrent());
|
| + // Resetting ICE state for the channel.
|
| + ChannelMap::iterator iter = channels_.find(channel->component());
|
| + if (iter != channels_.end())
|
| + iter->second.set_candidates_allocated(false);
|
| + signaling_thread()->Post(this, MSG_REQUESTSIGNALING, nullptr);
|
| +}
|
| +
|
| +void Transport::OnChannelRequestSignaling_s() {
|
| + ASSERT(signaling_thread()->IsCurrent());
|
| + LOG(LS_INFO) << "Transport: " << content_name_ << ", allocating candidates";
|
| + SignalRequestSignaling(this);
|
| +}
|
| +
|
| +void Transport::OnChannelCandidateReady(TransportChannelImpl* channel,
|
| + const Candidate& candidate) {
|
| // We should never signal peer-reflexive candidates.
|
| if (candidate.type() == PRFLX_PORT_TYPE) {
|
| ASSERT(false);
|
| return;
|
| }
|
|
|
| + ASSERT(worker_thread()->IsCurrent());
|
| + rtc::CritScope cs(&crit_);
|
| + ready_candidates_.push_back(candidate);
|
| +
|
| + // We hold any messages until the client lets us connect.
|
| + if (connect_requested_) {
|
| + signaling_thread()->Post(
|
| + this, MSG_CANDIDATEREADY, NULL);
|
| + }
|
| +}
|
| +
|
| +void Transport::OnChannelCandidateReady_s() {
|
| + ASSERT(signaling_thread()->IsCurrent());
|
| ASSERT(connect_requested_);
|
| +
|
| std::vector<Candidate> candidates;
|
| - candidates.push_back(candidate);
|
| - SignalCandidatesGathered(this, candidates);
|
| + {
|
| + rtc::CritScope cs(&crit_);
|
| + candidates.swap(ready_candidates_);
|
| + }
|
| +
|
| + // we do the deleting of Candidate* here to keep the new above and
|
| + // delete below close to each other
|
| + if (!candidates.empty()) {
|
| + SignalCandidatesReady(this, candidates);
|
| + }
|
| }
|
|
|
| void Transport::OnChannelRouteChange(TransportChannel* channel,
|
| const Candidate& remote_candidate) {
|
| + ASSERT(worker_thread()->IsCurrent());
|
| + ChannelParams* params = new ChannelParams(new Candidate(remote_candidate));
|
| + params->channel = static_cast<cricket::TransportChannelImpl*>(channel);
|
| + signaling_thread()->Post(this, MSG_ROUTECHANGE, params);
|
| +}
|
| +
|
| +void Transport::OnChannelRouteChange_s(const TransportChannel* channel,
|
| + const Candidate& remote_candidate) {
|
| + ASSERT(signaling_thread()->IsCurrent());
|
| SignalRouteChange(this, remote_candidate.component(), remote_candidate);
|
| }
|
|
|
| +void Transport::OnChannelCandidatesAllocationDone(
|
| + TransportChannelImpl* channel) {
|
| + ASSERT(worker_thread()->IsCurrent());
|
| + ChannelMap::iterator iter = channels_.find(channel->component());
|
| + ASSERT(iter != channels_.end());
|
| + LOG(LS_INFO) << "Transport: " << content_name_ << ", component "
|
| + << channel->component() << " allocation complete";
|
| +
|
| + iter->second.set_candidates_allocated(true);
|
| +
|
| + // If all channels belonging to this Transport got signal, then
|
| + // forward this signal to upper layer.
|
| + // Can this signal arrive before all transport channels are created?
|
| + for (auto& iter : channels_) {
|
| + if (!iter.second.candidates_allocated())
|
| + return;
|
| + }
|
| + signaling_thread_->Post(this, MSG_CANDIDATEALLOCATIONCOMPLETE);
|
| +
|
| + MaybeCompleted_w();
|
| +}
|
| +
|
| +void Transport::OnChannelCandidatesAllocationDone_s() {
|
| + ASSERT(signaling_thread()->IsCurrent());
|
| + LOG(LS_INFO) << "Transport: " << content_name_ << " allocation complete";
|
| + SignalCandidatesAllocationDone(this);
|
| +}
|
| +
|
| void Transport::OnRoleConflict(TransportChannelImpl* channel) {
|
| - SignalRoleConflict();
|
| + signaling_thread_->Post(this, MSG_ROLECONFLICT);
|
| }
|
|
|
| void Transport::OnChannelConnectionRemoved(TransportChannelImpl* channel) {
|
| - LOG(LS_INFO) << name() << " TransportChannel " << channel->component()
|
| - << " connection removed. Check if transport is complete.";
|
| - MaybeSignalCompleted();
|
| + ASSERT(worker_thread()->IsCurrent());
|
| + MaybeCompleted_w();
|
|
|
| // Check if the state is now Failed.
|
| // Failed is only available in the Controlling ICE role.
|
| @@ -514,97 +660,158 @@
|
| return;
|
| }
|
|
|
| - // Failed can only occur after candidate gathering has stopped.
|
| - if (channel->gathering_state() != kIceGatheringComplete) {
|
| + ChannelMap::iterator iter = channels_.find(channel->component());
|
| + ASSERT(iter != channels_.end());
|
| + // Failed can only occur after candidate allocation has stopped.
|
| + if (!iter->second.candidates_allocated()) {
|
| return;
|
| }
|
|
|
| if (channel->GetState() == TransportChannelState::STATE_FAILED) {
|
| // A Transport has failed if any of its channels have no remaining
|
| // connections.
|
| - SignalFailed(this);
|
| - }
|
| -}
|
| -
|
| -void Transport::MaybeSignalCompleted() {
|
| - if (AllChannelsCompleted()) {
|
| - LOG(LS_INFO) << name() << " transport is complete"
|
| - << " because all the channels are complete.";
|
| - SignalCompleted(this);
|
| - }
|
| - // TODO(deadbeef): Should we do anything if we previously were completed,
|
| - // but now are not (if, for example, a new remote candidate is added)?
|
| -}
|
| -
|
| -void Transport::UpdateGatheringState() {
|
| - IceGatheringState new_state = kIceGatheringNew;
|
| - bool any_gathering = false;
|
| - bool all_complete = !channels_.empty();
|
| - for (const auto& kv : channels_) {
|
| - any_gathering =
|
| - any_gathering || kv.second->gathering_state() != kIceGatheringNew;
|
| - all_complete =
|
| - all_complete && kv.second->gathering_state() == kIceGatheringComplete;
|
| - }
|
| - if (all_complete) {
|
| - new_state = kIceGatheringComplete;
|
| - } else if (any_gathering) {
|
| - new_state = kIceGatheringGathering;
|
| - }
|
| -
|
| - if (gathering_state_ != new_state) {
|
| - gathering_state_ = new_state;
|
| - if (gathering_state_ == kIceGatheringGathering) {
|
| - LOG(LS_INFO) << "Transport: " << name_ << ", gathering candidates";
|
| - } else if (gathering_state_ == kIceGatheringComplete) {
|
| - LOG(LS_INFO) << "Transport " << name() << " gathering complete.";
|
| + signaling_thread_->Post(this, MSG_FAILED);
|
| + }
|
| +}
|
| +
|
| +void Transport::MaybeCompleted_w() {
|
| + ASSERT(worker_thread()->IsCurrent());
|
| +
|
| + // When there is no channel created yet, calling this function could fire an
|
| + // IceConnectionCompleted event prematurely.
|
| + if (channels_.empty()) {
|
| + return;
|
| + }
|
| +
|
| + // A Transport's ICE process is completed if all of its channels are writable,
|
| + // have finished allocating candidates, and have pruned all but one of their
|
| + // connections.
|
| + for (const auto& iter : channels_) {
|
| + const TransportChannelImpl* channel = iter.second.get();
|
| + if (!(channel->writable() &&
|
| + channel->GetState() == TransportChannelState::STATE_COMPLETED &&
|
| + channel->GetIceRole() == ICEROLE_CONTROLLING &&
|
| + iter.second.candidates_allocated())) {
|
| + return;
|
| }
|
| - SignalGatheringState(this);
|
| - }
|
| -}
|
| -
|
| -void Transport::UpdateReceivingState() {
|
| - TransportState receiving = GetTransportState(TRANSPORT_RECEIVING_STATE);
|
| - if (receiving_ != receiving) {
|
| - receiving_ = receiving;
|
| - SignalReceivingState(this);
|
| - }
|
| -}
|
| -
|
| -void Transport::UpdateWritableState() {
|
| - TransportState writable = GetTransportState(TRANSPORT_WRITABLE_STATE);
|
| - LOG(LS_INFO) << name() << " transport writable state changed? " << writable_
|
| - << " => " << writable;
|
| - if (writable_ != writable) {
|
| - was_writable_ = (writable_ == TRANSPORT_STATE_ALL);
|
| - writable_ = writable;
|
| - SignalWritableState(this);
|
| - }
|
| -}
|
| -
|
| -bool Transport::ApplyLocalTransportDescription(TransportChannelImpl* ch,
|
| - std::string* error_desc) {
|
| + }
|
| +
|
| + signaling_thread_->Post(this, MSG_COMPLETED);
|
| +}
|
| +
|
| +void Transport::SetIceRole_w(IceRole role) {
|
| + ASSERT(worker_thread()->IsCurrent());
|
| + rtc::CritScope cs(&crit_);
|
| + ice_role_ = role;
|
| + for (auto& iter : channels_) {
|
| + iter.second->SetIceRole(ice_role_);
|
| + }
|
| +}
|
| +
|
| +void Transport::SetRemoteIceMode_w(IceMode mode) {
|
| + ASSERT(worker_thread()->IsCurrent());
|
| + remote_ice_mode_ = mode;
|
| + // Shouldn't channels be created after this method executed?
|
| + for (auto& iter : channels_) {
|
| + iter.second->SetRemoteIceMode(remote_ice_mode_);
|
| + }
|
| +}
|
| +
|
| +bool Transport::SetLocalTransportDescription_w(
|
| + const TransportDescription& desc,
|
| + ContentAction action,
|
| + std::string* error_desc) {
|
| + ASSERT(worker_thread()->IsCurrent());
|
| + bool ret = true;
|
| +
|
| + if (!VerifyIceParams(desc)) {
|
| + return BadTransportDescription("Invalid ice-ufrag or ice-pwd length",
|
| + error_desc);
|
| + }
|
| +
|
| + // TODO(tommi,pthatcher): I'm not sure why we need to grab this lock at this
|
| + // point. |local_description_| seems to always be modified on the worker
|
| + // thread, so we should be able to use it here without grabbing the lock.
|
| + // However, we _might_ need it before the call to reset() below?
|
| + // Raw access to |local_description_| is granted to derived transports outside
|
| + // of locking (see local_description() in the header file).
|
| + // The contract is that the derived implementations must be aware of when the
|
| + // description might change and do appropriate synchronization.
|
| + rtc::CritScope cs(&crit_);
|
| + if (local_description_ && IceCredentialsChanged(*local_description_, desc)) {
|
| + IceRole new_ice_role = (action == CA_OFFER) ? ICEROLE_CONTROLLING
|
| + : ICEROLE_CONTROLLED;
|
| +
|
| + // It must be called before ApplyLocalTransportDescription_w, which may
|
| + // trigger an ICE restart and depends on the new ICE role.
|
| + SetIceRole_w(new_ice_role);
|
| + }
|
| +
|
| + local_description_.reset(new TransportDescription(desc));
|
| +
|
| + for (auto& iter : channels_) {
|
| + ret &= ApplyLocalTransportDescription_w(iter.second.get(), error_desc);
|
| + }
|
| + if (!ret)
|
| + return false;
|
| +
|
| + // If PRANSWER/ANSWER is set, we should decide transport protocol type.
|
| + if (action == CA_PRANSWER || action == CA_ANSWER) {
|
| + ret &= NegotiateTransportDescription_w(action, error_desc);
|
| + }
|
| + return ret;
|
| +}
|
| +
|
| +bool Transport::SetRemoteTransportDescription_w(
|
| + const TransportDescription& desc,
|
| + ContentAction action,
|
| + std::string* error_desc) {
|
| + bool ret = true;
|
| +
|
| + if (!VerifyIceParams(desc)) {
|
| + return BadTransportDescription("Invalid ice-ufrag or ice-pwd length",
|
| + error_desc);
|
| + }
|
| +
|
| + // TODO(tommi,pthatcher): See todo for local_description_ above.
|
| + rtc::CritScope cs(&crit_);
|
| + remote_description_.reset(new TransportDescription(desc));
|
| + for (auto& iter : channels_) {
|
| + ret &= ApplyRemoteTransportDescription_w(iter.second.get(), error_desc);
|
| + }
|
| +
|
| + // If PRANSWER/ANSWER is set, we should decide transport protocol type.
|
| + if (action == CA_PRANSWER || action == CA_ANSWER) {
|
| + ret = NegotiateTransportDescription_w(CA_OFFER, error_desc);
|
| + }
|
| + return ret;
|
| +}
|
| +
|
| +bool Transport::ApplyLocalTransportDescription_w(TransportChannelImpl* ch,
|
| + std::string* error_desc) {
|
| + ASSERT(worker_thread()->IsCurrent());
|
| ch->SetIceCredentials(local_description_->ice_ufrag,
|
| local_description_->ice_pwd);
|
| return true;
|
| }
|
|
|
| -bool Transport::ApplyRemoteTransportDescription(TransportChannelImpl* ch,
|
| - std::string* error_desc) {
|
| +bool Transport::ApplyRemoteTransportDescription_w(TransportChannelImpl* ch,
|
| + std::string* error_desc) {
|
| ch->SetRemoteIceCredentials(remote_description_->ice_ufrag,
|
| remote_description_->ice_pwd);
|
| return true;
|
| }
|
|
|
| -bool Transport::ApplyNegotiatedTransportDescription(
|
| - TransportChannelImpl* channel,
|
| - std::string* error_desc) {
|
| +bool Transport::ApplyNegotiatedTransportDescription_w(
|
| + TransportChannelImpl* channel, std::string* error_desc) {
|
| + ASSERT(worker_thread()->IsCurrent());
|
| channel->SetRemoteIceMode(remote_ice_mode_);
|
| return true;
|
| }
|
|
|
| -bool Transport::NegotiateTransportDescription(ContentAction local_role,
|
| - std::string* error_desc) {
|
| +bool Transport::NegotiateTransportDescription_w(ContentAction local_role,
|
| + std::string* error_desc) {
|
| + ASSERT(worker_thread()->IsCurrent());
|
| // TODO(ekr@rtfm.com): This is ICE-specific stuff. Refactor into
|
| // P2PTransport.
|
|
|
| @@ -612,7 +819,7 @@
|
| // ice_lite, this local end point should take CONTROLLING role.
|
| if (ice_role_ == ICEROLE_CONTROLLED &&
|
| remote_description_->ice_mode == ICEMODE_LITE) {
|
| - SetIceRole(ICEROLE_CONTROLLING);
|
| + SetIceRole_w(ICEROLE_CONTROLLING);
|
| }
|
|
|
| // Update remote ice_mode to all existing channels.
|
| @@ -624,10 +831,57 @@
|
| // creation, we have the negotiation state saved until a new
|
| // negotiation happens.
|
| for (auto& iter : channels_) {
|
| - if (!ApplyNegotiatedTransportDescription(iter.second.get(), error_desc))
|
| + if (!ApplyNegotiatedTransportDescription_w(iter.second.get(), error_desc))
|
| return false;
|
| }
|
| return true;
|
| }
|
|
|
| +void Transport::OnMessage(rtc::Message* msg) {
|
| + switch (msg->message_id) {
|
| + case MSG_ONSIGNALINGREADY:
|
| + CallChannels_w(&TransportChannelImpl::OnSignalingReady);
|
| + break;
|
| + case MSG_ONREMOTECANDIDATE: {
|
| + ChannelParams* params = static_cast<ChannelParams*>(msg->pdata);
|
| + OnRemoteCandidate_w(*params->candidate);
|
| + delete params;
|
| + }
|
| + break;
|
| + case MSG_CONNECTING:
|
| + OnConnecting_s();
|
| + break;
|
| + case MSG_WRITESTATE:
|
| + OnChannelWritableState_s();
|
| + break;
|
| + case MSG_RECEIVINGSTATE:
|
| + OnChannelReceivingState_s();
|
| + break;
|
| + case MSG_REQUESTSIGNALING:
|
| + OnChannelRequestSignaling_s();
|
| + break;
|
| + case MSG_CANDIDATEREADY:
|
| + OnChannelCandidateReady_s();
|
| + break;
|
| + case MSG_ROUTECHANGE: {
|
| + ChannelParams* params = static_cast<ChannelParams*>(msg->pdata);
|
| + OnChannelRouteChange_s(params->channel, *params->candidate);
|
| + delete params;
|
| + }
|
| + break;
|
| + case MSG_CANDIDATEALLOCATIONCOMPLETE:
|
| + OnChannelCandidatesAllocationDone_s();
|
| + break;
|
| + case MSG_ROLECONFLICT:
|
| + SignalRoleConflict();
|
| + break;
|
| + case MSG_COMPLETED:
|
| + SignalCompleted(this);
|
| + break;
|
| + case MSG_FAILED:
|
| + SignalFailed(this);
|
| + break;
|
| + }
|
| +}
|
| +
|
| } // namespace cricket
|
|
|