Chromium Code Reviews| Index: webrtc/p2p/base/transportcontroller.cc |
| diff --git a/webrtc/p2p/base/transportcontroller.cc b/webrtc/p2p/base/transportcontroller.cc |
| index d84d574f153076ad01015ee7bd25ca89f8b0a0d3..16ebbb0c6c66ce43f8004379c2c1784eda2cc430 100644 |
| --- a/webrtc/p2p/base/transportcontroller.cc |
| +++ b/webrtc/p2p/base/transportcontroller.cc |
| @@ -10,11 +10,14 @@ |
| #include "webrtc/p2p/base/transportcontroller.h" |
| +#include <algorithm> |
| + |
| #include "webrtc/base/bind.h" |
| #include "webrtc/base/checks.h" |
| #include "webrtc/base/thread.h" |
| #include "webrtc/p2p/base/dtlstransport.h" |
| #include "webrtc/p2p/base/p2ptransport.h" |
| +#include "webrtc/p2p/base/port.h" |
| namespace cricket { |
| @@ -140,8 +143,31 @@ TransportChannel* TransportController::CreateTransportChannel_w( |
| int component) { |
| RTC_DCHECK(worker_thread_->IsCurrent()); |
| - Transport* transport = GetOrCreateTransport_w(transport_name); |
| - return transport->CreateChannel(component); |
| + auto it = FindChannel_w(transport_name, component); |
| + if (it == channels_.end()) { |
| + // Need to create a new channel. |
| + Transport* transport = GetOrCreateTransport_w(transport_name); |
| + TransportChannelImpl* channel = transport->CreateChannel(component); |
| + channel->SignalWritableState.connect( |
| + this, &TransportController::OnChannelWritableState_w); |
| + channel->SignalReceivingState.connect( |
| + this, &TransportController::OnChannelReceivingState_w); |
| + channel->SignalGatheringState.connect( |
| + this, &TransportController::OnChannelGatheringState_w); |
| + channel->SignalCandidateGathered.connect( |
| + this, &TransportController::OnChannelCandidateGathered_w); |
| + channel->SignalRoleConflict.connect( |
| + this, &TransportController::OnChannelRoleConflict_w); |
| + channel->SignalConnectionRemoved.connect( |
| + this, &TransportController::OnChannelConnectionRemoved_w); |
| + channels_.insert(channels_.end(), RefCountedChannel(channel))->AddRef(); |
| + // Adding a channel could cause aggregate state to change. |
| + UpdateAggregateStates_w(); |
| + return channel; |
| + } |
| + // Channel already exists; increment reference count and return. |
| + it->AddRef(); |
| + return it->get(); |
|
pthatcher1
2015/09/29 23:50:58
Might be easier to read the other way around:
if
Taylor Brandstetter
2015/09/30 01:01:20
Done.
|
| } |
| void TransportController::DestroyTransportChannel_w( |
| @@ -149,17 +175,26 @@ void TransportController::DestroyTransportChannel_w( |
| int component) { |
| RTC_DCHECK(worker_thread_->IsCurrent()); |
| - Transport* transport = GetTransport_w(transport_name); |
| - if (!transport) { |
| - ASSERT(false); |
| + auto it = FindChannel_w(transport_name, component); |
| + if (it == channels_.end()) { |
| + LOG(LS_WARNING) << "Attempting to delete " << transport_name |
| + << " TransportChannel " << component |
| + << ", which doesn't exist."; |
| return; |
| } |
| - transport->DestroyChannel(component); |
| - // Just as we create a Transport when its first channel is created, |
| - // we delete it when its last channel is deleted. |
| - if (!transport->HasChannels()) { |
| - DestroyTransport_w(transport_name); |
| + it->DecRef(); |
| + if (it->ref() == 0) { |
|
pthatcher1
2015/09/29 23:50:58
Same here:
if (it->ref() > 0) {
return;
}
chan
Taylor Brandstetter
2015/09/30 01:01:20
Done.
|
| + channels_.erase(it); |
| + Transport* transport = GetTransport_w(transport_name); |
| + transport->DestroyChannel(component); |
| + // Just as we create a Transport when its first channel is created, |
| + // we delete it when its last channel is deleted. |
| + if (!transport->HasChannels()) { |
| + DestroyTransport_w(transport_name); |
| + } |
| + // Removing a channel could cause aggregate state to change. |
| + UpdateAggregateStates_w(); |
| } |
| } |
| @@ -221,6 +256,17 @@ void TransportController::OnMessage(rtc::Message* pmsg) { |
| } |
| } |
| +std::vector<TransportController::RefCountedChannel>::iterator |
| +TransportController::FindChannel_w(const std::string& transport_name, |
| + int component) { |
| + return std::find_if( |
| + channels_.begin(), channels_.end(), |
| + [transport_name, component](const RefCountedChannel& channel) { |
| + return channel->transport_name() == transport_name && |
| + channel->component() == component; |
| + }); |
| +} |
| + |
| Transport* TransportController::GetOrCreateTransport_w( |
| const std::string& transport_name) { |
| RTC_DCHECK(worker_thread_->IsCurrent()); |
| @@ -240,22 +286,6 @@ Transport* TransportController::GetOrCreateTransport_w( |
| if (certificate_) { |
| transport->SetLocalCertificate(certificate_); |
| } |
| - transport->SignalConnecting.connect( |
| - this, &TransportController::OnTransportConnecting_w); |
| - transport->SignalWritableState.connect( |
| - this, &TransportController::OnTransportWritableState_w); |
| - transport->SignalReceivingState.connect( |
| - this, &TransportController::OnTransportReceivingState_w); |
| - transport->SignalCompleted.connect( |
| - this, &TransportController::OnTransportCompleted_w); |
| - transport->SignalFailed.connect(this, |
| - &TransportController::OnTransportFailed_w); |
| - transport->SignalGatheringState.connect( |
| - this, &TransportController::OnTransportGatheringState_w); |
| - transport->SignalCandidatesGathered.connect( |
| - this, &TransportController::OnTransportCandidatesGathered_w); |
| - transport->SignalRoleConflict.connect( |
| - this, &TransportController::OnTransportRoleConflict_w); |
| transports_[transport_name] = transport; |
| return transport; |
| @@ -270,8 +300,6 @@ void TransportController::DestroyTransport_w( |
| delete iter->second; |
| transports_.erase(transport_name); |
| } |
| - // Destroying a transport may cause aggregate state to change. |
| - UpdateAggregateStates_w(); |
| } |
| void TransportController::DestroyAllTransports_w() { |
| @@ -447,49 +475,49 @@ bool TransportController::GetStats_w(const std::string& transport_name, |
| return transport->GetStats(stats); |
| } |
| -void TransportController::OnTransportConnecting_w(Transport* transport) { |
| - RTC_DCHECK(worker_thread_->IsCurrent()); |
| - UpdateAggregateStates_w(); |
| -} |
| - |
| -void TransportController::OnTransportWritableState_w(Transport* transport) { |
| - RTC_DCHECK(worker_thread_->IsCurrent()); |
| - UpdateAggregateStates_w(); |
| -} |
| - |
| -void TransportController::OnTransportReceivingState_w(Transport* transport) { |
| +void TransportController::OnChannelWritableState_w(TransportChannel* channel) { |
| RTC_DCHECK(worker_thread_->IsCurrent()); |
| + LOG(LS_INFO) << channel->transport_name() << " TransportChannel " |
| + << channel->component() << " writability changed to " |
| + << channel->writable() << "."; |
| UpdateAggregateStates_w(); |
| } |
| -void TransportController::OnTransportCompleted_w(Transport* transport) { |
| +void TransportController::OnChannelReceivingState_w(TransportChannel* channel) { |
| RTC_DCHECK(worker_thread_->IsCurrent()); |
| UpdateAggregateStates_w(); |
| } |
| -void TransportController::OnTransportFailed_w(Transport* transport) { |
| +void TransportController::OnChannelGatheringState_w( |
| + TransportChannelImpl* channel) { |
| RTC_DCHECK(worker_thread_->IsCurrent()); |
| UpdateAggregateStates_w(); |
| } |
| -void TransportController::OnTransportGatheringState_w(Transport* transport) { |
| +void TransportController::OnChannelCandidateGathered_w( |
| + TransportChannelImpl* channel, |
| + const Candidate& candidate) { |
| RTC_DCHECK(worker_thread_->IsCurrent()); |
| - UpdateAggregateStates_w(); |
| -} |
| -void TransportController::OnTransportCandidatesGathered_w( |
| - Transport* transport, |
| - const std::vector<Candidate>& candidates) { |
| - RTC_DCHECK(worker_thread_->IsCurrent()); |
| - CandidatesData* data = new CandidatesData(transport->name(), candidates); |
| + // We should never signal peer-reflexive candidates. |
| + if (candidate.type() == PRFLX_PORT_TYPE) { |
| + RTC_DCHECK(false); |
| + return; |
| + } |
| + std::vector<Candidate> candidates; |
| + candidates.push_back(candidate); |
| + CandidatesData* data = |
| + new CandidatesData(channel->transport_name(), candidates); |
| signaling_thread_->Post(this, MSG_CANDIDATESGATHERED, data); |
| } |
| -void TransportController::OnTransportRoleConflict_w() { |
| +void TransportController::OnChannelRoleConflict_w( |
| + TransportChannelImpl* channel) { |
| RTC_DCHECK(worker_thread_->IsCurrent()); |
| if (ice_role_switch_) { |
| - LOG(LS_WARNING) << "Repeat of role conflict signal from Transport."; |
| + LOG(LS_WARNING) |
| + << "Repeat of role conflict signal from TransportChannelImpl."; |
| return; |
| } |
| @@ -502,6 +530,15 @@ void TransportController::OnTransportRoleConflict_w() { |
| } |
| } |
| +void TransportController::OnChannelConnectionRemoved_w( |
| + TransportChannelImpl* channel) { |
| + RTC_DCHECK(worker_thread_->IsCurrent()); |
| + LOG(LS_INFO) << channel->transport_name() << " TransportChannel " |
| + << channel->component() |
| + << " connection removed. Check if state is complete."; |
| + UpdateAggregateStates_w(); |
| +} |
| + |
| void TransportController::UpdateAggregateStates_w() { |
| RTC_DCHECK(worker_thread_->IsCurrent()); |
| @@ -509,24 +546,24 @@ void TransportController::UpdateAggregateStates_w() { |
| IceGatheringState new_gathering_state = kIceGatheringNew; |
| bool any_receiving = false; |
| bool any_failed = false; |
| - bool all_connected = HasChannels_w(); |
| - bool all_completed = HasChannels_w(); |
| + bool all_connected = !channels_.empty(); |
| + bool all_completed = !channels_.empty(); |
| bool any_gathering = false; |
| - bool all_done_gathering = HasChannels_w(); |
| - for (const auto& kv : transports_) { |
| - // Ignore transports without channels since they're about to be deleted, |
| - // and their state is meaningless. |
| - if (!kv.second->HasChannels()) { |
| - continue; |
| - } |
| - any_receiving = any_receiving || kv.second->any_channel_receiving(); |
| - any_failed = any_failed || kv.second->AnyChannelFailed(); |
| - all_connected = all_connected && kv.second->all_channels_writable(); |
| - all_completed = all_completed && kv.second->AllChannelsCompleted(); |
| + bool all_done_gathering = !channels_.empty(); |
| + for (const auto& channel : channels_) { |
| + any_receiving = any_receiving || channel->receiving(); |
| + any_failed = any_failed || |
| + channel->GetState() == TransportChannelState::STATE_FAILED; |
| + all_connected = all_connected && channel->writable(); |
| + all_completed = |
| + all_completed && channel->writable() && |
| + channel->GetState() == TransportChannelState::STATE_COMPLETED && |
| + channel->GetIceRole() == ICEROLE_CONTROLLING && |
| + channel->gathering_state() == kIceGatheringComplete; |
| any_gathering = |
| - any_gathering || kv.second->gathering_state() != kIceGatheringNew; |
| + any_gathering || channel->gathering_state() != kIceGatheringNew; |
| all_done_gathering = all_done_gathering && |
| - kv.second->gathering_state() == kIceGatheringComplete; |
| + channel->gathering_state() == kIceGatheringComplete; |
| } |
| if (any_failed) { |
| @@ -562,13 +599,4 @@ void TransportController::UpdateAggregateStates_w() { |
| } |
| } |
| -bool TransportController::HasChannels_w() { |
| - for (const auto& kv : transports_) { |
| - if (kv.second->HasChannels()) { |
| - return true; |
| - } |
| - } |
| - return false; |
| -} |
| - |
| } // namespace cricket |