Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(656)

Unified Diff: webrtc/p2p/base/transport.cc

Issue 1350523003: TransportController refactoring. (Closed) Base URL: https://chromium.googlesource.com/external/webrtc.git@master
Patch Set: Fixing Mac test. Created 5 years, 3 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « webrtc/p2p/base/transport.h ('k') | webrtc/p2p/base/transport_unittest.cc » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: webrtc/p2p/base/transport.cc
diff --git a/webrtc/p2p/base/transport.cc b/webrtc/p2p/base/transport.cc
index d626ad3d65acf284a6022428cbfb43615a6d07e3..b7aba7540a5699e165994a143bbae9c5852ec5a8 100644
--- a/webrtc/p2p/base/transport.cc
+++ b/webrtc/p2p/base/transport.cc
@@ -8,6 +8,8 @@
* be found in the AUTHORS file in the root of the source tree.
*/
+#include <utility> // for std::pair
+
#include "webrtc/p2p/base/transport.h"
#include "webrtc/p2p/base/candidate.h"
@@ -22,39 +24,6 @@ namespace cricket {
using rtc::Bind;
-enum {
- MSG_ONSIGNALINGREADY = 1,
- MSG_ONREMOTECANDIDATE,
- MSG_WRITESTATE,
- MSG_REQUESTSIGNALING,
- MSG_CANDIDATEREADY,
- MSG_ROUTECHANGE,
- MSG_CONNECTING,
- MSG_CANDIDATEALLOCATIONCOMPLETE,
- MSG_ROLECONFLICT,
- MSG_COMPLETED,
- MSG_FAILED,
- MSG_RECEIVINGSTATE,
-};
-
-struct ChannelParams : public rtc::MessageData {
- ChannelParams() : channel(NULL), candidate(NULL) {}
- explicit ChannelParams(int component)
- : component(component), channel(NULL), candidate(NULL) {}
- explicit ChannelParams(Candidate* candidate)
- : channel(NULL), candidate(candidate) {
- }
-
- ~ChannelParams() {
- delete candidate;
- }
-
- std::string name;
- int component;
- TransportChannelImpl* channel;
- Candidate* candidate;
-};
-
static bool VerifyIceParams(const TransportDescription& desc) {
// For legacy protocols.
if (desc.ice_ufrag.empty() && desc.ice_pwd.empty())
@@ -96,58 +65,59 @@ static bool IceCredentialsChanged(const TransportDescription& old_desc,
new_desc.ice_ufrag, new_desc.ice_pwd);
}
-Transport::Transport(rtc::Thread* signaling_thread,
- rtc::Thread* worker_thread,
- const std::string& content_name,
- PortAllocator* allocator)
- : signaling_thread_(signaling_thread),
- worker_thread_(worker_thread),
- content_name_(content_name),
- allocator_(allocator),
- destroyed_(false),
- readable_(TRANSPORT_STATE_NONE),
- writable_(TRANSPORT_STATE_NONE),
- receiving_(TRANSPORT_STATE_NONE),
- was_writable_(false),
- connect_requested_(false),
- ice_role_(ICEROLE_UNKNOWN),
- tiebreaker_(0),
- remote_ice_mode_(ICEMODE_FULL),
- channel_receiving_timeout_(-1) {
-}
+Transport::Transport(const std::string& name, PortAllocator* allocator)
+ : name_(name), allocator_(allocator) {}
Transport::~Transport() {
- ASSERT(signaling_thread_->IsCurrent());
- ASSERT(destroyed_);
+ ASSERT(channels_destroyed_);
}
-void Transport::SetIceRole(IceRole role) {
- worker_thread_->Invoke<void>(Bind(&Transport::SetIceRole_w, this, role));
-}
+bool Transport::AllChannelsCompleted() const {
+ // We aren't completed until at least one channel is complete, so if there
+ // are no channels, we aren't complete yet.
+ if (channels_.empty()) {
+ LOG(LS_INFO) << name() << " transport is not complete"
+ << " because it has no TransportChannels";
+ return false;
+ }
-void Transport::SetCertificate(
- const rtc::scoped_refptr<rtc::RTCCertificate>& certificate) {
- worker_thread_->Invoke<void>(Bind(&Transport::SetCertificate_w, this,
- certificate));
+ // A Transport's ICE process is completed if all of its channels are writable,
+ // have finished allocating candidates, and have pruned all but one of their
+ // connections.
+ for (const auto& iter : channels_) {
+ const TransportChannelImpl* channel = iter.second.get();
+ bool complete =
+ channel->writable() &&
+ channel->GetState() == TransportChannelState::STATE_COMPLETED &&
+ channel->GetIceRole() == ICEROLE_CONTROLLING &&
+ channel->gathering_state() == kIceGatheringComplete;
+ if (!complete) {
+ LOG(LS_INFO) << name() << " transport is not complete"
+ << " because a channel is still incomplete.";
+ return false;
+ }
+ }
+
+ return true;
}
-bool Transport::GetCertificate(
- rtc::scoped_refptr<rtc::RTCCertificate>* certificate) {
- // The identity is set on the worker thread, so for safety it must also be
- // acquired on the worker thread.
- return worker_thread_->Invoke<bool>(
- Bind(&Transport::GetCertificate_w, this, certificate));
+bool Transport::AnyChannelFailed() const {
+ for (const auto& iter : channels_) {
+ if (iter.second->GetState() == TransportChannelState::STATE_FAILED) {
+ return true;
+ }
+ }
+ return false;
}
-bool Transport::GetRemoteSSLCertificate(rtc::SSLCertificate** cert) {
- // Channels can be deleted on the worker thread, so for safety the remote
- // certificate is acquired on the worker thread.
- return worker_thread_->Invoke<bool>(
- Bind(&Transport::GetRemoteSSLCertificate_w, this, cert));
+void Transport::SetIceRole(IceRole role) {
+ ice_role_ = role;
+ for (auto& iter : channels_) {
+ iter.second->SetIceRole(ice_role_);
+ }
}
-bool Transport::GetRemoteSSLCertificate_w(rtc::SSLCertificate** cert) {
- ASSERT(worker_thread()->IsCurrent());
+bool Transport::GetRemoteSSLCertificate(rtc::SSLCertificate** cert) {
if (channels_.empty())
return false;
@@ -156,12 +126,6 @@ bool Transport::GetRemoteSSLCertificate_w(rtc::SSLCertificate** cert) {
}
void Transport::SetChannelReceivingTimeout(int timeout_ms) {
- worker_thread_->Invoke<void>(
- Bind(&Transport::SetChannelReceivingTimeout_w, this, timeout_ms));
-}
-
-void Transport::SetChannelReceivingTimeout_w(int timeout_ms) {
- ASSERT(worker_thread()->IsCurrent());
channel_receiving_timeout_ = timeout_ms;
for (const auto& kv : channels_) {
kv.second->SetReceivingTimeout(timeout_ms);
@@ -172,35 +136,73 @@ bool Transport::SetLocalTransportDescription(
const TransportDescription& description,
ContentAction action,
std::string* error_desc) {
- return worker_thread_->Invoke<bool>(Bind(
- &Transport::SetLocalTransportDescription_w, this,
- description, action, error_desc));
+ bool ret = true;
+
+ if (!VerifyIceParams(description)) {
+ return BadTransportDescription("Invalid ice-ufrag or ice-pwd length",
+ error_desc);
+ }
+
+ if (local_description_ &&
+ IceCredentialsChanged(*local_description_, description)) {
+ IceRole new_ice_role =
+ (action == CA_OFFER) ? ICEROLE_CONTROLLING : ICEROLE_CONTROLLED;
+
+ // It must be called before ApplyLocalTransportDescription, which may
+ // trigger an ICE restart and depends on the new ICE role.
+ SetIceRole(new_ice_role);
+ }
+
+ local_description_.reset(new TransportDescription(description));
+
+ for (auto& iter : channels_) {
+ ret &= ApplyLocalTransportDescription(iter.second.get(), error_desc);
+ }
+ if (!ret) {
+ return false;
+ }
+
+ // If PRANSWER/ANSWER is set, we should decide transport protocol type.
+ if (action == CA_PRANSWER || action == CA_ANSWER) {
+ ret &= NegotiateTransportDescription(action, error_desc);
+ }
+ if (ret) {
+ local_description_set_ = true;
+ ConnectChannels();
+ }
+
+ return ret;
}
bool Transport::SetRemoteTransportDescription(
const TransportDescription& description,
ContentAction action,
std::string* error_desc) {
- return worker_thread_->Invoke<bool>(Bind(
- &Transport::SetRemoteTransportDescription_w, this,
- description, action, error_desc));
-}
+ bool ret = true;
-TransportChannelImpl* Transport::CreateChannel(int component) {
- return worker_thread_->Invoke<TransportChannelImpl*>(Bind(
- &Transport::CreateChannel_w, this, component));
+ if (!VerifyIceParams(description)) {
+ return BadTransportDescription("Invalid ice-ufrag or ice-pwd length",
+ error_desc);
+ }
+
+ remote_description_.reset(new TransportDescription(description));
+ for (auto& iter : channels_) {
+ ret &= ApplyRemoteTransportDescription(iter.second.get(), error_desc);
+ }
+
+ // If PRANSWER/ANSWER is set, we should decide transport protocol type.
+ if (action == CA_PRANSWER || action == CA_ANSWER) {
+ ret = NegotiateTransportDescription(CA_OFFER, error_desc);
+ }
+ if (ret) {
+ remote_description_set_ = true;
+ }
+
+ return ret;
}
-TransportChannelImpl* Transport::CreateChannel_w(int component) {
- ASSERT(worker_thread()->IsCurrent());
+TransportChannelImpl* Transport::CreateChannel(int component) {
TransportChannelImpl* impl;
- // TODO(tommi): We don't really need to grab the lock until the actual call
- // to insert() below and presumably hold it throughout initialization of
- // |impl| after the impl_exists check. Maybe we can factor that out to
- // a separate function and not grab the lock in this function.
- // Actually, we probably don't need to hold the lock while initializing
- // |impl| since we can just do the insert when that's done.
- rtc::CritScope cs(&crit_);
// Create the entry if it does not exist.
bool impl_exists = false;
@@ -216,7 +218,7 @@ TransportChannelImpl* Transport::CreateChannel_w(int component) {
// Increase the ref count.
iterator->second.AddRef();
- destroyed_ = false;
+ channels_destroyed_ = false;
if (impl_exists) {
// If this is an existing channel, we should just return it without
@@ -228,23 +230,21 @@ TransportChannelImpl* Transport::CreateChannel_w(int component) {
impl->SetIceRole(ice_role_);
impl->SetIceTiebreaker(tiebreaker_);
impl->SetReceivingTimeout(channel_receiving_timeout_);
- // TODO(ronghuawu): Change CreateChannel_w to be able to return error since
- // below Apply**Description_w calls can fail.
+ // TODO(ronghuawu): Change CreateChannel to be able to return error since
+ // below Apply**Description calls can fail.
if (local_description_)
- ApplyLocalTransportDescription_w(impl, NULL);
+ ApplyLocalTransportDescription(impl, NULL);
if (remote_description_)
- ApplyRemoteTransportDescription_w(impl, NULL);
+ ApplyRemoteTransportDescription(impl, NULL);
if (local_description_ && remote_description_)
- ApplyNegotiatedTransportDescription_w(impl, NULL);
+ ApplyNegotiatedTransportDescription(impl, NULL);
impl->SignalWritableState.connect(this, &Transport::OnChannelWritableState);
impl->SignalReceivingState.connect(this, &Transport::OnChannelReceivingState);
- impl->SignalRequestSignaling.connect(
- this, &Transport::OnChannelRequestSignaling);
- impl->SignalCandidateReady.connect(this, &Transport::OnChannelCandidateReady);
+ impl->SignalGatheringState.connect(this, &Transport::OnChannelGatheringState);
+ impl->SignalCandidateGathered.connect(this,
+ &Transport::OnChannelCandidateGathered);
impl->SignalRouteChange.connect(this, &Transport::OnChannelRouteChange);
- impl->SignalCandidatesAllocationDone.connect(
- this, &Transport::OnChannelCandidatesAllocationDone);
impl->SignalRoleConflict.connect(this, &Transport::OnRoleConflict);
impl->SignalConnectionRemoved.connect(
this, &Transport::OnChannelConnectionRemoved);
@@ -254,36 +254,22 @@ TransportChannelImpl* Transport::CreateChannel_w(int component) {
if (channels_.size() == 1) {
// If this is the first channel, then indicate that we have started
// connecting.
- signaling_thread()->Post(this, MSG_CONNECTING, NULL);
+ SignalConnecting(this);
}
}
return impl;
}
TransportChannelImpl* Transport::GetChannel(int component) {
- // TODO(tommi,pthatcher): Since we're returning a pointer from the channels_
- // map, shouldn't we assume that we're on the worker thread? (The pointer
- // will be used outside of the lock).
- // And if we're on the worker thread, which is the only thread that modifies
- // channels_, can we skip grabbing the lock?
- rtc::CritScope cs(&crit_);
ChannelMap::iterator iter = channels_.find(component);
return (iter != channels_.end()) ? iter->second.get() : NULL;
}
bool Transport::HasChannels() {
- rtc::CritScope cs(&crit_);
return !channels_.empty();
}
void Transport::DestroyChannel(int component) {
- worker_thread_->Invoke<void>(Bind(
- &Transport::DestroyChannel_w, this, component));
-}
-
-void Transport::DestroyChannel_w(int component) {
- ASSERT(worker_thread()->IsCurrent());
-
ChannelMap::iterator iter = channels_.find(component);
if (iter == channels_.end())
return;
@@ -293,34 +279,30 @@ void Transport::DestroyChannel_w(int component) {
iter->second.DecRef();
if (!iter->second.ref()) {
impl = iter->second.get();
- rtc::CritScope cs(&crit_);
channels_.erase(iter);
}
if (connect_requested_ && channels_.empty()) {
// We're no longer attempting to connect.
- signaling_thread()->Post(this, MSG_CONNECTING, NULL);
+ SignalConnecting(this);
}
if (impl) {
- // Check in case the deleted channel was the only non-writable channel.
- OnChannelWritableState(impl);
DestroyTransportChannel(impl);
+ // Need to update aggregate state after destroying a channel,
+ // for example if it was the only one that wasn't yet writable.
+ UpdateWritableState();
+ UpdateReceivingState();
+ UpdateGatheringState();
+ MaybeSignalCompleted();
}
}
void Transport::ConnectChannels() {
- ASSERT(signaling_thread()->IsCurrent());
- worker_thread_->Invoke<void>(Bind(&Transport::ConnectChannels_w, this));
-}
-
-void Transport::ConnectChannels_w() {
- ASSERT(worker_thread()->IsCurrent());
if (connect_requested_ || channels_.empty())
return;
connect_requested_ = true;
- signaling_thread()->Post(this, MSG_CANDIDATEREADY, NULL);
if (!local_description_) {
// TOOD(mallinath) : TransportDescription(TD) shouldn't be generated here.
@@ -329,38 +311,28 @@ void Transport::ConnectChannels_w() {
// Session.
// Session must generate local TD before remote candidates pushed when
// initiate request initiated by the remote.
- LOG(LS_INFO) << "Transport::ConnectChannels_w: No local description has "
+ LOG(LS_INFO) << "Transport::ConnectChannels: No local description has "
<< "been set. Will generate one.";
- TransportDescription desc(std::vector<std::string>(),
- rtc::CreateRandomString(ICE_UFRAG_LENGTH),
- rtc::CreateRandomString(ICE_PWD_LENGTH),
- ICEMODE_FULL, CONNECTIONROLE_NONE, NULL,
- Candidates());
- SetLocalTransportDescription_w(desc, CA_OFFER, NULL);
+ TransportDescription desc(
+ std::vector<std::string>(), rtc::CreateRandomString(ICE_UFRAG_LENGTH),
+ rtc::CreateRandomString(ICE_PWD_LENGTH), ICEMODE_FULL,
+ CONNECTIONROLE_NONE, NULL, Candidates());
+ SetLocalTransportDescription(desc, CA_OFFER, NULL);
}
- CallChannels_w(&TransportChannelImpl::Connect);
- if (!channels_.empty()) {
- signaling_thread()->Post(this, MSG_CONNECTING, NULL);
+ CallChannels(&TransportChannelImpl::Connect);
+ if (HasChannels()) {
+ SignalConnecting(this);
}
}
-void Transport::OnConnecting_s() {
- ASSERT(signaling_thread()->IsCurrent());
- SignalConnecting(this);
+void Transport::MaybeStartGathering() {
+ if (connect_requested_) {
+ CallChannels(&TransportChannelImpl::MaybeStartGathering);
+ }
}
void Transport::DestroyAllChannels() {
- ASSERT(signaling_thread()->IsCurrent());
- worker_thread_->Invoke<void>(Bind(&Transport::DestroyAllChannels_w, this));
- worker_thread()->Clear(this);
- signaling_thread()->Clear(this);
- destroyed_ = true;
-}
-
-void Transport::DestroyAllChannels_w() {
- ASSERT(worker_thread()->IsCurrent());
-
std::vector<TransportChannelImpl*> impls;
for (auto& iter : channels_) {
iter.second.DecRef();
@@ -368,27 +340,15 @@ void Transport::DestroyAllChannels_w() {
impls.push_back(iter.second.get());
}
- {
- rtc::CritScope cs(&crit_);
- channels_.clear();
- }
+ channels_.clear();
- for (size_t i = 0; i < impls.size(); ++i)
- DestroyTransportChannel(impls[i]);
-}
-
-void Transport::OnSignalingReady() {
- ASSERT(signaling_thread()->IsCurrent());
- if (destroyed_) return;
-
- worker_thread()->Post(this, MSG_ONSIGNALINGREADY, NULL);
-
- // Notify the subclass.
- OnTransportSignalingReady();
+ for (TransportChannelImpl* impl : impls) {
+ DestroyTransportChannel(impl);
+ }
+ channels_destroyed_ = true;
}
-void Transport::CallChannels_w(TransportChannelFunc func) {
- ASSERT(worker_thread()->IsCurrent());
+void Transport::CallChannels(TransportChannelFunc func) {
for (const auto& iter : channels_) {
((iter.second.get())->*func)();
}
@@ -427,14 +387,7 @@ bool Transport::VerifyCandidate(const Candidate& cand, std::string* error) {
bool Transport::GetStats(TransportStats* stats) {
- ASSERT(signaling_thread()->IsCurrent());
- return worker_thread_->Invoke<bool>(Bind(
- &Transport::GetStats_w, this, stats));
-}
-
-bool Transport::GetStats_w(TransportStats* stats) {
- ASSERT(worker_thread()->IsCurrent());
- stats->content_name = content_name();
+ stats->transport_name = name();
stats->channel_stats.clear();
for (auto iter : channels_) {
ChannelMapEntry& entry = iter.second;
@@ -450,82 +403,45 @@ bool Transport::GetStats_w(TransportStats* stats) {
return true;
}
-bool Transport::GetSslRole(rtc::SSLRole* ssl_role) const {
- return worker_thread_->Invoke<bool>(Bind(
- &Transport::GetSslRole_w, this, ssl_role));
-}
-
-bool Transport::SetSslMaxProtocolVersion(rtc::SSLProtocolVersion version) {
- return worker_thread_->Invoke<bool>(Bind(
- &Transport::SetSslMaxProtocolVersion_w, this, version));
-}
+bool Transport::AddRemoteCandidates(const std::vector<Candidate>& candidates,
+ std::string* error) {
+ ASSERT(!channels_destroyed_);
+ // Verify each candidate before passing down to transport layer.
+ for (const Candidate& cand : candidates) {
+ if (!VerifyCandidate(cand, error)) {
+ return false;
+ }
+ if (!HasChannel(cand.component())) {
+ *error = "Candidate has unknown component: " + cand.ToString() +
+ " for content: " + name();
+ return false;
+ }
+ }
-void Transport::OnRemoteCandidates(const std::vector<Candidate>& candidates) {
for (std::vector<Candidate>::const_iterator iter = candidates.begin();
iter != candidates.end();
++iter) {
- OnRemoteCandidate(*iter);
- }
-}
-
-void Transport::OnRemoteCandidate(const Candidate& candidate) {
- ASSERT(signaling_thread()->IsCurrent());
- if (destroyed_) return;
-
- if (!HasChannel(candidate.component())) {
- LOG(LS_WARNING) << "Ignoring candidate for unknown component "
- << candidate.component();
- return;
- }
-
- ChannelParams* params = new ChannelParams(new Candidate(candidate));
- worker_thread()->Post(this, MSG_ONREMOTECANDIDATE, params);
-}
-
-void Transport::OnRemoteCandidate_w(const Candidate& candidate) {
- ASSERT(worker_thread()->IsCurrent());
- ChannelMap::iterator iter = channels_.find(candidate.component());
- // It's ok for a channel to go away while this message is in transit.
- if (iter != channels_.end()) {
- iter->second->OnCandidate(candidate);
+ TransportChannelImpl* channel = GetChannel(iter->component());
+ if (channel != NULL) {
+ channel->AddRemoteCandidate(*iter);
+ }
}
+ return true;
}
void Transport::OnChannelWritableState(TransportChannel* channel) {
- ASSERT(worker_thread()->IsCurrent());
- signaling_thread()->Post(this, MSG_WRITESTATE, NULL);
-
- MaybeCompleted_w();
-}
-
-void Transport::OnChannelWritableState_s() {
- ASSERT(signaling_thread()->IsCurrent());
- TransportState writable = GetTransportState_s(TRANSPORT_WRITABLE_STATE);
- if (writable_ != writable) {
- was_writable_ = (writable_ == TRANSPORT_STATE_ALL);
- writable_ = writable;
- SignalWritableState(this);
- }
+ LOG(LS_INFO) << name() << " TransportChannel " << channel->component()
+ << " writability changed to " << channel->writable()
+ << ". Check if transport is complete.";
+ UpdateWritableState();
+ MaybeSignalCompleted();
}
void Transport::OnChannelReceivingState(TransportChannel* channel) {
- ASSERT(worker_thread()->IsCurrent());
- signaling_thread()->Post(this, MSG_RECEIVINGSTATE);
+ UpdateReceivingState();
}
-void Transport::OnChannelReceivingState_s() {
- ASSERT(signaling_thread()->IsCurrent());
- TransportState receiving = GetTransportState_s(TRANSPORT_RECEIVING_STATE);
- if (receiving_ != receiving) {
- receiving_ = receiving;
- SignalReceivingState(this);
- }
-}
-
-TransportState Transport::GetTransportState_s(TransportStateType state_type) {
- ASSERT(signaling_thread()->IsCurrent());
-
- rtc::CritScope cs(&crit_);
+TransportState Transport::GetTransportState(TransportStateType state_type) {
bool any = false;
bool all = !channels_.empty();
for (const auto iter : channels_) {
@@ -553,106 +469,44 @@ TransportState Transport::GetTransportState_s(TransportStateType state_type) {
return TRANSPORT_STATE_NONE;
}
-void Transport::OnChannelRequestSignaling(TransportChannelImpl* channel) {
- ASSERT(worker_thread()->IsCurrent());
- // Resetting ICE state for the channel.
- ChannelMap::iterator iter = channels_.find(channel->component());
- if (iter != channels_.end())
- iter->second.set_candidates_allocated(false);
- signaling_thread()->Post(this, MSG_REQUESTSIGNALING, nullptr);
-}
-
-void Transport::OnChannelRequestSignaling_s() {
- ASSERT(signaling_thread()->IsCurrent());
- LOG(LS_INFO) << "Transport: " << content_name_ << ", allocating candidates";
- SignalRequestSignaling(this);
+void Transport::OnChannelGatheringState(TransportChannelImpl* channel) {
+ ASSERT(channels_.find(channel->component()) != channels_.end());
+ UpdateGatheringState();
+ if (gathering_state_ == kIceGatheringComplete) {
+ // If UpdateGatheringState brought us to kIceGatheringComplete, check if
+ // our connection state is also "Completed". Otherwise, there's no point in
+ // checking (since it would only produce log messages).
+ MaybeSignalCompleted();
+ }
}
-void Transport::OnChannelCandidateReady(TransportChannelImpl* channel,
- const Candidate& candidate) {
+void Transport::OnChannelCandidateGathered(TransportChannelImpl* channel,
+ const Candidate& candidate) {
// We should never signal peer-reflexive candidates.
if (candidate.type() == PRFLX_PORT_TYPE) {
ASSERT(false);
return;
}
- ASSERT(worker_thread()->IsCurrent());
- rtc::CritScope cs(&crit_);
- ready_candidates_.push_back(candidate);
-
- // We hold any messages until the client lets us connect.
- if (connect_requested_) {
- signaling_thread()->Post(
- this, MSG_CANDIDATEREADY, NULL);
- }
-}
-
-void Transport::OnChannelCandidateReady_s() {
- ASSERT(signaling_thread()->IsCurrent());
ASSERT(connect_requested_);
-
std::vector<Candidate> candidates;
- {
- rtc::CritScope cs(&crit_);
- candidates.swap(ready_candidates_);
- }
-
- // we do the deleting of Candidate* here to keep the new above and
- // delete below close to each other
- if (!candidates.empty()) {
- SignalCandidatesReady(this, candidates);
- }
+ candidates.push_back(candidate);
+ SignalCandidatesGathered(this, candidates);
}
void Transport::OnChannelRouteChange(TransportChannel* channel,
const Candidate& remote_candidate) {
- ASSERT(worker_thread()->IsCurrent());
- ChannelParams* params = new ChannelParams(new Candidate(remote_candidate));
- params->channel = static_cast<cricket::TransportChannelImpl*>(channel);
- signaling_thread()->Post(this, MSG_ROUTECHANGE, params);
-}
-
-void Transport::OnChannelRouteChange_s(const TransportChannel* channel,
- const Candidate& remote_candidate) {
- ASSERT(signaling_thread()->IsCurrent());
SignalRouteChange(this, remote_candidate.component(), remote_candidate);
}
-void Transport::OnChannelCandidatesAllocationDone(
- TransportChannelImpl* channel) {
- ASSERT(worker_thread()->IsCurrent());
- ChannelMap::iterator iter = channels_.find(channel->component());
- ASSERT(iter != channels_.end());
- LOG(LS_INFO) << "Transport: " << content_name_ << ", component "
- << channel->component() << " allocation complete";
-
- iter->second.set_candidates_allocated(true);
-
- // If all channels belonging to this Transport got signal, then
- // forward this signal to upper layer.
- // Can this signal arrive before all transport channels are created?
- for (auto& iter : channels_) {
- if (!iter.second.candidates_allocated())
- return;
- }
- signaling_thread_->Post(this, MSG_CANDIDATEALLOCATIONCOMPLETE);
-
- MaybeCompleted_w();
-}
-
-void Transport::OnChannelCandidatesAllocationDone_s() {
- ASSERT(signaling_thread()->IsCurrent());
- LOG(LS_INFO) << "Transport: " << content_name_ << " allocation complete";
- SignalCandidatesAllocationDone(this);
-}
-
void Transport::OnRoleConflict(TransportChannelImpl* channel) {
- signaling_thread_->Post(this, MSG_ROLECONFLICT);
+ SignalRoleConflict();
}
void Transport::OnChannelConnectionRemoved(TransportChannelImpl* channel) {
- ASSERT(worker_thread()->IsCurrent());
- MaybeCompleted_w();
+ LOG(LS_INFO) << name() << " TransportChannel " << channel->component()
+ << " connection removed. Check if transport is complete.";
+ MaybeSignalCompleted();
// Check if the state is now Failed.
// Failed is only available in the Controlling ICE role.
@@ -660,158 +514,97 @@ void Transport::OnChannelConnectionRemoved(TransportChannelImpl* channel) {
return;
}
- ChannelMap::iterator iter = channels_.find(channel->component());
- ASSERT(iter != channels_.end());
- // Failed can only occur after candidate allocation has stopped.
- if (!iter->second.candidates_allocated()) {
+ // Failed can only occur after candidate gathering has stopped.
+ if (channel->gathering_state() != kIceGatheringComplete) {
return;
}
if (channel->GetState() == TransportChannelState::STATE_FAILED) {
// A Transport has failed if any of its channels have no remaining
// connections.
- signaling_thread_->Post(this, MSG_FAILED);
- }
-}
-
-void Transport::MaybeCompleted_w() {
- ASSERT(worker_thread()->IsCurrent());
-
- // When there is no channel created yet, calling this function could fire an
- // IceConnectionCompleted event prematurely.
- if (channels_.empty()) {
- return;
- }
-
- // A Transport's ICE process is completed if all of its channels are writable,
- // have finished allocating candidates, and have pruned all but one of their
- // connections.
- for (const auto& iter : channels_) {
- const TransportChannelImpl* channel = iter.second.get();
- if (!(channel->writable() &&
- channel->GetState() == TransportChannelState::STATE_COMPLETED &&
- channel->GetIceRole() == ICEROLE_CONTROLLING &&
- iter.second.candidates_allocated())) {
- return;
- }
+ SignalFailed(this);
}
-
- signaling_thread_->Post(this, MSG_COMPLETED);
}
-void Transport::SetIceRole_w(IceRole role) {
- ASSERT(worker_thread()->IsCurrent());
- rtc::CritScope cs(&crit_);
- ice_role_ = role;
- for (auto& iter : channels_) {
- iter.second->SetIceRole(ice_role_);
+void Transport::MaybeSignalCompleted() {
+ if (AllChannelsCompleted()) {
+ LOG(LS_INFO) << name() << " transport is complete"
+ << " because all the channels are complete.";
+ SignalCompleted(this);
}
+ // TODO(deadbeef): Should we do anything if we previously were completed,
+ // but now are not (if, for example, a new remote candidate is added)?
}
-void Transport::SetRemoteIceMode_w(IceMode mode) {
- ASSERT(worker_thread()->IsCurrent());
- remote_ice_mode_ = mode;
- // Shouldn't channels be created after this method executed?
- for (auto& iter : channels_) {
- iter.second->SetRemoteIceMode(remote_ice_mode_);
+void Transport::UpdateGatheringState() {
+ IceGatheringState new_state = kIceGatheringNew;
+ bool any_gathering = false;
+ bool all_complete = !channels_.empty();
+ for (const auto& kv : channels_) {
+ any_gathering =
+ any_gathering || kv.second->gathering_state() != kIceGatheringNew;
+ all_complete =
+ all_complete && kv.second->gathering_state() == kIceGatheringComplete;
+ }
+ if (all_complete) {
+ new_state = kIceGatheringComplete;
+ } else if (any_gathering) {
+ new_state = kIceGatheringGathering;
+ }
+
+ if (gathering_state_ != new_state) {
+ gathering_state_ = new_state;
+ if (gathering_state_ == kIceGatheringGathering) {
+ LOG(LS_INFO) << "Transport: " << name_ << ", gathering candidates";
+ } else if (gathering_state_ == kIceGatheringComplete) {
+ LOG(LS_INFO) << "Transport " << name() << " gathering complete.";
+ }
+ SignalGatheringState(this);
}
}
-bool Transport::SetLocalTransportDescription_w(
- const TransportDescription& desc,
- ContentAction action,
- std::string* error_desc) {
- ASSERT(worker_thread()->IsCurrent());
- bool ret = true;
-
- if (!VerifyIceParams(desc)) {
- return BadTransportDescription("Invalid ice-ufrag or ice-pwd length",
- error_desc);
- }
-
- // TODO(tommi,pthatcher): I'm not sure why we need to grab this lock at this
- // point. |local_description_| seems to always be modified on the worker
- // thread, so we should be able to use it here without grabbing the lock.
- // However, we _might_ need it before the call to reset() below?
- // Raw access to |local_description_| is granted to derived transports outside
- // of locking (see local_description() in the header file).
- // The contract is that the derived implementations must be aware of when the
- // description might change and do appropriate synchronization.
- rtc::CritScope cs(&crit_);
- if (local_description_ && IceCredentialsChanged(*local_description_, desc)) {
- IceRole new_ice_role = (action == CA_OFFER) ? ICEROLE_CONTROLLING
- : ICEROLE_CONTROLLED;
-
- // It must be called before ApplyLocalTransportDescription_w, which may
- // trigger an ICE restart and depends on the new ICE role.
- SetIceRole_w(new_ice_role);
- }
-
- local_description_.reset(new TransportDescription(desc));
-
- for (auto& iter : channels_) {
- ret &= ApplyLocalTransportDescription_w(iter.second.get(), error_desc);
- }
- if (!ret)
- return false;
-
- // If PRANSWER/ANSWER is set, we should decide transport protocol type.
- if (action == CA_PRANSWER || action == CA_ANSWER) {
- ret &= NegotiateTransportDescription_w(action, error_desc);
+void Transport::UpdateReceivingState() {
+ TransportState receiving = GetTransportState(TRANSPORT_RECEIVING_STATE);
+ if (receiving_ != receiving) {
+ receiving_ = receiving;
+ SignalReceivingState(this);
}
- return ret;
}
-bool Transport::SetRemoteTransportDescription_w(
- const TransportDescription& desc,
- ContentAction action,
- std::string* error_desc) {
- bool ret = true;
-
- if (!VerifyIceParams(desc)) {
- return BadTransportDescription("Invalid ice-ufrag or ice-pwd length",
- error_desc);
- }
-
- // TODO(tommi,pthatcher): See todo for local_description_ above.
- rtc::CritScope cs(&crit_);
- remote_description_.reset(new TransportDescription(desc));
- for (auto& iter : channels_) {
- ret &= ApplyRemoteTransportDescription_w(iter.second.get(), error_desc);
- }
-
- // If PRANSWER/ANSWER is set, we should decide transport protocol type.
- if (action == CA_PRANSWER || action == CA_ANSWER) {
- ret = NegotiateTransportDescription_w(CA_OFFER, error_desc);
+void Transport::UpdateWritableState() {
+ TransportState writable = GetTransportState(TRANSPORT_WRITABLE_STATE);
+ LOG(LS_INFO) << name() << " transport writable state changed? " << writable_
+ << " => " << writable;
+ if (writable_ != writable) {
+ was_writable_ = (writable_ == TRANSPORT_STATE_ALL);
+ writable_ = writable;
+ SignalWritableState(this);
}
- return ret;
}
-bool Transport::ApplyLocalTransportDescription_w(TransportChannelImpl* ch,
- std::string* error_desc) {
- ASSERT(worker_thread()->IsCurrent());
+bool Transport::ApplyLocalTransportDescription(TransportChannelImpl* ch,
+ std::string* error_desc) {
ch->SetIceCredentials(local_description_->ice_ufrag,
local_description_->ice_pwd);
return true;
}
-bool Transport::ApplyRemoteTransportDescription_w(TransportChannelImpl* ch,
- std::string* error_desc) {
+bool Transport::ApplyRemoteTransportDescription(TransportChannelImpl* ch,
+ std::string* error_desc) {
ch->SetRemoteIceCredentials(remote_description_->ice_ufrag,
remote_description_->ice_pwd);
return true;
}
-bool Transport::ApplyNegotiatedTransportDescription_w(
- TransportChannelImpl* channel, std::string* error_desc) {
- ASSERT(worker_thread()->IsCurrent());
+bool Transport::ApplyNegotiatedTransportDescription(
+ TransportChannelImpl* channel,
+ std::string* error_desc) {
channel->SetRemoteIceMode(remote_ice_mode_);
return true;
}
-bool Transport::NegotiateTransportDescription_w(ContentAction local_role,
- std::string* error_desc) {
- ASSERT(worker_thread()->IsCurrent());
+bool Transport::NegotiateTransportDescription(ContentAction local_role,
+ std::string* error_desc) {
// TODO(ekr@rtfm.com): This is ICE-specific stuff. Refactor into
// P2PTransport.
@@ -819,7 +612,7 @@ bool Transport::NegotiateTransportDescription_w(ContentAction local_role,
// ice_lite, this local end point should take CONTROLLING role.
if (ice_role_ == ICEROLE_CONTROLLED &&
remote_description_->ice_mode == ICEMODE_LITE) {
- SetIceRole_w(ICEROLE_CONTROLLING);
+ SetIceRole(ICEROLE_CONTROLLING);
}
// Update remote ice_mode to all existing channels.
@@ -831,57 +624,10 @@ bool Transport::NegotiateTransportDescription_w(ContentAction local_role,
// creation, we have the negotiation state saved until a new
// negotiation happens.
for (auto& iter : channels_) {
- if (!ApplyNegotiatedTransportDescription_w(iter.second.get(), error_desc))
+ if (!ApplyNegotiatedTransportDescription(iter.second.get(), error_desc))
return false;
}
return true;
}
-void Transport::OnMessage(rtc::Message* msg) {
- switch (msg->message_id) {
- case MSG_ONSIGNALINGREADY:
- CallChannels_w(&TransportChannelImpl::OnSignalingReady);
- break;
- case MSG_ONREMOTECANDIDATE: {
- ChannelParams* params = static_cast<ChannelParams*>(msg->pdata);
- OnRemoteCandidate_w(*params->candidate);
- delete params;
- }
- break;
- case MSG_CONNECTING:
- OnConnecting_s();
- break;
- case MSG_WRITESTATE:
- OnChannelWritableState_s();
- break;
- case MSG_RECEIVINGSTATE:
- OnChannelReceivingState_s();
- break;
- case MSG_REQUESTSIGNALING:
- OnChannelRequestSignaling_s();
- break;
- case MSG_CANDIDATEREADY:
- OnChannelCandidateReady_s();
- break;
- case MSG_ROUTECHANGE: {
- ChannelParams* params = static_cast<ChannelParams*>(msg->pdata);
- OnChannelRouteChange_s(params->channel, *params->candidate);
- delete params;
- }
- break;
- case MSG_CANDIDATEALLOCATIONCOMPLETE:
- OnChannelCandidatesAllocationDone_s();
- break;
- case MSG_ROLECONFLICT:
- SignalRoleConflict();
- break;
- case MSG_COMPLETED:
- SignalCompleted(this);
- break;
- case MSG_FAILED:
- SignalFailed(this);
- break;
- }
-}
-
} // namespace cricket
« no previous file with comments | « webrtc/p2p/base/transport.h ('k') | webrtc/p2p/base/transport_unittest.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698