Index: webrtc/pc/channel.cc |
diff --git a/webrtc/pc/channel.cc b/webrtc/pc/channel.cc |
index 4d47e878b8e2b8c7d7c858c439d822f9d95a4720..7c2acf87af796ee4d67adc691fa947c1a0da7d6e 100644 |
--- a/webrtc/pc/channel.cc |
+++ b/webrtc/pc/channel.cc |
@@ -37,6 +37,22 @@ bool SetRawAudioSink_w(VoiceMediaChannel* channel, |
channel->SetRawAudioSink(ssrc, std::move(*sink)); |
return true; |
} |
+ |
+struct PacketMessageData : public rtc::MessageData { |
+ rtc::CopyOnWriteBuffer packet; |
+ rtc::PacketOptions options; |
+}; |
+ |
+struct IncomingPacketMessageData : public rtc::MessageData { |
+ rtc::CopyOnWriteBuffer packet; |
+ rtc::PacketTime packet_time; |
+}; |
+ |
+struct ChangeState : public rtc::MessageData { |
+ bool send; |
+ bool recv; |
+}; |
+ |
} // namespace |
enum { |
@@ -48,6 +64,11 @@ enum { |
MSG_DATARECEIVED, |
MSG_FIRSTPACKETRECEIVED, |
MSG_STREAMCLOSEDREMOTELY, |
+ MSG_INCOMING_RTP_PACKET, |
+ MSG_INCOMING_RTCP_PACKET, |
+ MSG_CHANGE_STATE, |
+ MSG_NOT_READY_TO_SEND, |
+ MSG_READY_TO_SEND, |
}; |
// Value specified in RFC 5764. |
@@ -61,11 +82,6 @@ static void SafeSetError(const std::string& message, std::string* error_desc) { |
} |
} |
-struct PacketMessageData : public rtc::MessageData { |
- rtc::CopyOnWriteBuffer packet; |
- rtc::PacketOptions options; |
-}; |
- |
struct VoiceChannelErrorMessageData : public rtc::MessageData { |
VoiceChannelErrorMessageData(uint32_t in_ssrc, |
VoiceMediaChannel::Error in_error) |
@@ -142,12 +158,14 @@ void RtpSendParametersFromMediaDescription( |
send_params->max_bandwidth_bps = desc->bandwidth(); |
} |
-BaseChannel::BaseChannel(rtc::Thread* thread, |
+BaseChannel::BaseChannel(rtc::Thread* worker_thread, |
+ rtc::Thread* network_thread, |
MediaChannel* media_channel, |
TransportController* transport_controller, |
const std::string& content_name, |
bool rtcp) |
- : worker_thread_(thread), |
+ : worker_thread_(worker_thread), |
+ network_thread_(network_thread), |
transport_controller_(transport_controller), |
media_channel_(media_channel), |
content_name_(content_name), |
@@ -165,33 +183,37 @@ BaseChannel::BaseChannel(rtc::Thread* thread, |
dtls_keyed_(false), |
secure_required_(false), |
rtp_abs_sendtime_extn_id_(-1) { |
- ASSERT(worker_thread_ == rtc::Thread::Current()); |
+ RTC_DCHECK(worker_thread_->IsCurrent()); |
+ RTC_DCHECK_EQ(network_thread, transport_controller->network_thread()); |
LOG(LS_INFO) << "Created channel for " << content_name; |
} |
BaseChannel::~BaseChannel() { |
TRACE_EVENT0("webrtc", "BaseChannel::~BaseChannel"); |
- ASSERT(worker_thread_ == rtc::Thread::Current()); |
+ RTC_DCHECK(worker_thread_->IsCurrent()); |
Deinit(); |
StopConnectionMonitor(); |
FlushRtcpMessages(); // Send any outstanding RTCP packets. |
worker_thread_->Clear(this); // eats any outstanding messages or packets |
+ network_thread_->Clear(this); |
// We must destroy the media channel before the transport channel, otherwise |
// the media channel may try to send on the dead transport channel. NULLing |
// is not an effective strategy since the sends will come on another thread. |
delete media_channel_; |
// Note that we don't just call set_transport_channel(nullptr) because that |
// would call a pure virtual method which we can't do from a destructor. |
- if (transport_channel_) { |
- DisconnectFromTransportChannel(transport_channel_); |
- transport_controller_->DestroyTransportChannel_w( |
- transport_name_, cricket::ICE_CANDIDATE_COMPONENT_RTP); |
- } |
- if (rtcp_transport_channel_) { |
- DisconnectFromTransportChannel(rtcp_transport_channel_); |
- transport_controller_->DestroyTransportChannel_w( |
- transport_name_, cricket::ICE_CANDIDATE_COMPONENT_RTCP); |
- } |
+ network_thread_->Invoke<void>([this] { |
+ if (transport_channel_) { |
+ DisconnectFromTransportChannel(transport_channel_); |
+ transport_controller_->DestroyTransportChannel_n( |
+ transport_name_, cricket::ICE_CANDIDATE_COMPONENT_RTP); |
+ } |
+ if (rtcp_transport_channel_) { |
+ DisconnectFromTransportChannel(rtcp_transport_channel_); |
+ transport_controller_->DestroyTransportChannel_n( |
+ transport_name_, cricket::ICE_CANDIDATE_COMPONENT_RTCP); |
+ } |
+ }); |
LOG(LS_INFO) << "Destroyed channel"; |
} |
@@ -219,12 +241,12 @@ void BaseChannel::Deinit() { |
} |
bool BaseChannel::SetTransport(const std::string& transport_name) { |
- return worker_thread_->Invoke<bool>( |
- Bind(&BaseChannel::SetTransport_w, this, transport_name)); |
+ return network_thread_->Invoke<bool>( |
+ Bind(&BaseChannel::SetTransport_n, this, transport_name)); |
} |
-bool BaseChannel::SetTransport_w(const std::string& transport_name) { |
- ASSERT(worker_thread_ == rtc::Thread::Current()); |
+bool BaseChannel::SetTransport_n(const std::string& transport_name) { |
+ RTC_DCHECK(network_thread_->IsCurrent()); |
if (transport_name == transport_name_) { |
// Nothing to do if transport name isn't changing |
@@ -245,18 +267,14 @@ bool BaseChannel::SetTransport_w(const std::string& transport_name) { |
if (rtcp_transport_enabled()) { |
LOG(LS_INFO) << "Create RTCP TransportChannel for " << content_name() |
<< " on " << transport_name << " transport "; |
- set_rtcp_transport_channel( |
- transport_controller_->CreateTransportChannel_w( |
- transport_name, cricket::ICE_CANDIDATE_COMPONENT_RTCP), |
- false /* update_writablity */); |
+ set_rtcp_transport_channel(&transport_name, false /* update_writablity */); |
if (!rtcp_transport_channel()) { |
return false; |
} |
} |
// We're not updating the writablity during the transition state. |
- set_transport_channel(transport_controller_->CreateTransportChannel_w( |
- transport_name, cricket::ICE_CANDIDATE_COMPONENT_RTP)); |
+ set_transport_channel(transport_name); |
if (!transport_channel()) { |
return false; |
} |
@@ -265,26 +283,28 @@ bool BaseChannel::SetTransport_w(const std::string& transport_name) { |
if (rtcp_transport_enabled()) { |
// We can only update the RTCP ready to send after set_transport_channel has |
// handled channel writability. |
- SetReadyToSend( |
+ SetReadyToSend_n( |
true, rtcp_transport_channel() && rtcp_transport_channel()->writable()); |
} |
transport_name_ = transport_name; |
return true; |
} |
-void BaseChannel::set_transport_channel(TransportChannel* new_tc) { |
- ASSERT(worker_thread_ == rtc::Thread::Current()); |
+void BaseChannel::set_transport_channel(const std::string& transport_name) { |
+ RTC_DCHECK(network_thread_->IsCurrent()); |
+ TransportChannel* new_tc = transport_controller_->CreateTransportChannel_n( |
+ transport_name, cricket::ICE_CANDIDATE_COMPONENT_RTP); |
TransportChannel* old_tc = transport_channel_; |
if (!old_tc && !new_tc) { |
// Nothing to do |
return; |
} |
- ASSERT(old_tc != new_tc); |
+ RTC_DCHECK_NE(old_tc, new_tc); |
if (old_tc) { |
DisconnectFromTransportChannel(old_tc); |
- transport_controller_->DestroyTransportChannel_w( |
+ transport_controller_->DestroyTransportChannel_n( |
transport_name_, cricket::ICE_CANDIDATE_COMPONENT_RTP); |
} |
@@ -299,24 +319,30 @@ void BaseChannel::set_transport_channel(TransportChannel* new_tc) { |
// Update aggregate writable/ready-to-send state between RTP and RTCP upon |
// setting new channel |
- UpdateWritableState_w(); |
- SetReadyToSend(false, new_tc && new_tc->writable()); |
+ UpdateWritableState_n(); |
+ SetReadyToSend_n(false, new_tc && new_tc->writable()); |
} |
-void BaseChannel::set_rtcp_transport_channel(TransportChannel* new_tc, |
+void BaseChannel::set_rtcp_transport_channel(const std::string* transport_name, |
bool update_writablity) { |
- ASSERT(worker_thread_ == rtc::Thread::Current()); |
+ RTC_DCHECK(network_thread_->IsCurrent()); |
+ |
+ TransportChannel* new_tc = |
+ transport_name |
+ ? transport_controller_->CreateTransportChannel_n( |
+ *transport_name, cricket::ICE_CANDIDATE_COMPONENT_RTCP) |
+ : nullptr; |
TransportChannel* old_tc = rtcp_transport_channel_; |
if (!old_tc && !new_tc) { |
// Nothing to do |
return; |
} |
- ASSERT(old_tc != new_tc); |
+ RTC_DCHECK_NE(old_tc, new_tc); |
if (old_tc) { |
DisconnectFromTransportChannel(old_tc); |
- transport_controller_->DestroyTransportChannel_w( |
+ transport_controller_->DestroyTransportChannel_n( |
transport_name_, cricket::ICE_CANDIDATE_COMPONENT_RTCP); |
} |
@@ -335,13 +361,13 @@ void BaseChannel::set_rtcp_transport_channel(TransportChannel* new_tc, |
if (update_writablity) { |
// Update aggregate writable/ready-to-send state between RTP and RTCP upon |
// setting new channel |
- UpdateWritableState_w(); |
- SetReadyToSend(true, new_tc && new_tc->writable()); |
+ UpdateWritableState_n(); |
+ SetReadyToSend_n(true, new_tc && new_tc->writable()); |
} |
} |
void BaseChannel::ConnectToTransportChannel(TransportChannel* tc) { |
- ASSERT(worker_thread_ == rtc::Thread::Current()); |
+ RTC_DCHECK(network_thread_->IsCurrent()); |
tc->SignalWritableState.connect(this, &BaseChannel::OnWritableState); |
tc->SignalReadPacket.connect(this, &BaseChannel::OnChannelRead); |
@@ -352,7 +378,7 @@ void BaseChannel::ConnectToTransportChannel(TransportChannel* tc) { |
} |
void BaseChannel::DisconnectFromTransportChannel(TransportChannel* tc) { |
- ASSERT(worker_thread_ == rtc::Thread::Current()); |
+ RTC_DCHECK(network_thread_->IsCurrent()); |
tc->SignalWritableState.disconnect(this); |
tc->SignalReadPacket.disconnect(this); |
@@ -405,8 +431,8 @@ void BaseChannel::StartConnectionMonitor(int cms) { |
// We pass in the BaseChannel instead of the transport_channel_ |
// because if the transport_channel_ changes, the ConnectionMonitor |
// would be pointing to the wrong TransportChannel. |
- connection_monitor_.reset(new ConnectionMonitor( |
- this, worker_thread(), rtc::Thread::Current())); |
+ connection_monitor_.reset( |
+ new ConnectionMonitor(this, network_thread(), rtc::Thread::Current())); |
connection_monitor_->SignalUpdate.connect( |
this, &BaseChannel::OnConnectionMonitorUpdate); |
connection_monitor_->Start(cms); |
@@ -420,7 +446,7 @@ void BaseChannel::StopConnectionMonitor() { |
} |
bool BaseChannel::GetConnectionStats(ConnectionInfos* infos) { |
- ASSERT(worker_thread_ == rtc::Thread::Current()); |
+ RTC_DCHECK(network_thread_->IsCurrent()); |
return transport_channel_->GetStats(infos); |
} |
@@ -450,25 +476,29 @@ bool BaseChannel::SendRtcp(rtc::CopyOnWriteBuffer* packet, |
int BaseChannel::SetOption(SocketType type, rtc::Socket::Option opt, |
int value) { |
- TransportChannel* channel = NULL; |
- switch (type) { |
- case ST_RTP: |
- channel = transport_channel_; |
- socket_options_.push_back( |
- std::pair<rtc::Socket::Option, int>(opt, value)); |
- break; |
- case ST_RTCP: |
- channel = rtcp_transport_channel_; |
- rtcp_socket_options_.push_back( |
- std::pair<rtc::Socket::Option, int>(opt, value)); |
- break; |
- } |
- return channel ? channel->SetOption(opt, value) : -1; |
+ return network_thread_->Invoke<int>([this, type, opt, value] { |
+ TransportChannel* channel = nullptr; |
+ switch (type) { |
+ case ST_RTP: |
+ channel = transport_channel_; |
+ socket_options_.push_back( |
+ std::pair<rtc::Socket::Option, int>(opt, value)); |
+ break; |
+ case ST_RTCP: |
+ channel = rtcp_transport_channel_; |
+ rtcp_socket_options_.push_back( |
+ std::pair<rtc::Socket::Option, int>(opt, value)); |
+ break; |
+ } |
+ return channel ? channel->SetOption(opt, value) : -1; |
+ }); |
} |
void BaseChannel::OnWritableState(TransportChannel* channel) { |
- ASSERT(channel == transport_channel_ || channel == rtcp_transport_channel_); |
- UpdateWritableState_w(); |
+ RTC_DCHECK(channel == transport_channel_ || |
+ channel == rtcp_transport_channel_); |
+ RTC_DCHECK(network_thread_->IsCurrent()); |
+ UpdateWritableState_n(); |
} |
void BaseChannel::OnChannelRead(TransportChannel* channel, |
@@ -477,7 +507,7 @@ void BaseChannel::OnChannelRead(TransportChannel* channel, |
int flags) { |
TRACE_EVENT0("webrtc", "BaseChannel::OnChannelRead"); |
// OnChannelRead gets called from P2PSocket; now pass data to MediaEngine |
- ASSERT(worker_thread_ == rtc::Thread::Current()); |
+ RTC_DCHECK(network_thread_->IsCurrent()); |
// When using RTCP multiplexing we might get RTCP packets on the RTP |
// transport. We feed RTP traffic into the demuxer to determine if it is RTCP. |
@@ -488,7 +518,7 @@ void BaseChannel::OnChannelRead(TransportChannel* channel, |
void BaseChannel::OnReadyToSend(TransportChannel* channel) { |
ASSERT(channel == transport_channel_ || channel == rtcp_transport_channel_); |
- SetReadyToSend(channel == rtcp_transport_channel_, true); |
+ SetReadyToSend_n(channel == rtcp_transport_channel_, true); |
} |
void BaseChannel::OnDtlsState(TransportChannel* channel, |
@@ -519,25 +549,29 @@ void BaseChannel::OnSelectedCandidatePairChanged( |
selected_candidate_pair->remote_candidate().network_id(), |
last_sent_packet_id); |
} |
+ // TODO(danilchap): reroute this call to worker thread when it will start to |
+ // do something. |
media_channel()->OnNetworkRouteChanged(channel->transport_name(), |
network_route); |
} |
-void BaseChannel::SetReadyToSend(bool rtcp, bool ready) { |
+void BaseChannel::SetReadyToSend_n(bool rtcp, bool ready) { |
if (rtcp) { |
rtcp_ready_to_send_ = ready; |
} else { |
rtp_ready_to_send_ = ready; |
} |
- if (rtp_ready_to_send_ && |
- // In the case of rtcp mux |rtcp_transport_channel_| will be null. |
- (rtcp_ready_to_send_ || !rtcp_transport_channel_)) { |
- // Notify the MediaChannel when both rtp and rtcp channel can send. |
- media_channel_->OnReadyToSend(true); |
+ bool ready_to_send = |
+ (rtp_ready_to_send_ && |
+ // In the case of rtcp mux |rtcp_transport_channel_| will be null. |
+ (rtcp_ready_to_send_ || !rtcp_transport_channel_)); |
+ |
+ if (worker_thread_->IsCurrent()) { |
+ media_channel_->OnReadyToSend(ready_to_send); |
} else { |
- // Notify the MediaChannel when either rtp or rtcp channel can't send. |
- media_channel_->OnReadyToSend(false); |
+ worker_thread_->Post( |
+ this, ready_to_send ? MSG_READY_TO_SEND : MSG_NOT_READY_TO_SEND); |
} |
} |
@@ -550,22 +584,24 @@ bool BaseChannel::PacketIsRtcp(const TransportChannel* channel, |
bool BaseChannel::SendPacket(bool rtcp, |
rtc::CopyOnWriteBuffer* packet, |
const rtc::PacketOptions& options) { |
- // SendPacket gets called from MediaEngine, typically on an encoder thread. |
- // If the thread is not our worker thread, we will post to our worker |
- // so that the real work happens on our worker. This avoids us having to |
+ // SendPacket gets called from MediaEngine, on a pacer or an encoder thread. |
+ // If the thread is not our network thread, we will post to our network |
+ // so that the real work happens on our network. This avoids us having to |
// synchronize access to all the pieces of the send path, including |
// SRTP and the inner workings of the transport channels. |
// The only downside is that we can't return a proper failure code if |
- // needed. Since UDP is unreliable anyway, this should be a non-issue. |
- if (rtc::Thread::Current() != worker_thread_) { |
+ // needed. Since UDP is unreliable anyway, this should be a non-issue. /// |
+ // except for tests.... |
+ if (!network_thread_->IsCurrent()) { |
// Avoid a copy by transferring the ownership of the packet data. |
int message_id = (!rtcp) ? MSG_RTPPACKET : MSG_RTCPPACKET; |
PacketMessageData* data = new PacketMessageData; |
data->packet = std::move(*packet); |
data->options = options; |
- worker_thread_->Post(this, message_id, data); |
+ network_thread_->Post(this, message_id, data); |
return true; |
} |
+ TRACE_EVENT0("webrtc", "BaseChannel::SendPacket"); |
// Now that we are on the correct thread, ensure we have a place to send this |
// packet before doing anything. (We might get RTCP packets that we don't |
@@ -589,6 +625,7 @@ bool BaseChannel::SendPacket(bool rtcp, |
updated_options = options; |
// Protect if needed. |
if (srtp_filter_.IsActive()) { |
+ TRACE_EVENT0("webrtc", "SRTP"); |
bool res; |
uint8_t* data = packet->data(); |
int len = static_cast<int>(packet->size()); |
@@ -650,19 +687,17 @@ bool BaseChannel::SendPacket(bool rtcp, |
// This is a double check for something that supposedly can't happen. |
LOG(LS_ERROR) << "Can't send outgoing " << PacketType(rtcp) |
<< " packet when SRTP is inactive and crypto is required"; |
- |
ASSERT(false); |
return false; |
} |
- // Bon voyage. |
- int ret = |
- channel->SendPacket(packet->data<char>(), packet->size(), updated_options, |
- (secure() && secure_dtls()) ? PF_SRTP_BYPASS : 0); |
+ int flags = (secure() && secure_dtls()) ? PF_SRTP_BYPASS : PF_NORMAL; |
+ int ret = channel->SendPacket(packet->data<char>(), packet->size(), |
+ updated_options, flags); |
if (ret != static_cast<int>(packet->size())) { |
if (channel->GetError() == EWOULDBLOCK) { |
LOG(LS_WARNING) << "Got EWOULDBLOCK from socket."; |
- SetReadyToSend(rtcp, false); |
+ SetReadyToSend_n(rtcp, false); |
} |
return false; |
} |
@@ -685,8 +720,10 @@ bool BaseChannel::WantsPacket(bool rtcp, const rtc::CopyOnWriteBuffer* packet) { |
return bundle_filter_.DemuxPacket(packet->data(), packet->size()); |
} |
-void BaseChannel::HandlePacket(bool rtcp, rtc::CopyOnWriteBuffer* packet, |
+void BaseChannel::HandlePacket(bool rtcp, |
+ rtc::CopyOnWriteBuffer* packet, |
const rtc::PacketTime& packet_time) { |
+ RTC_DCHECK(network_thread_->IsCurrent()); |
if (!WantsPacket(rtcp, packet)) { |
return; |
} |
@@ -700,6 +737,7 @@ void BaseChannel::HandlePacket(bool rtcp, rtc::CopyOnWriteBuffer* packet, |
// Unprotect the packet, if needed. |
if (srtp_filter_.IsActive()) { |
+ TRACE_EVENT0("webrtc", "SRTP"); |
char* data = packet->data<char>(); |
int len = static_cast<int>(packet->size()); |
bool res; |
@@ -743,11 +781,19 @@ void BaseChannel::HandlePacket(bool rtcp, rtc::CopyOnWriteBuffer* packet, |
return; |
} |
- // Push it down to the media channel. |
- if (!rtcp) { |
- media_channel_->OnPacketReceived(packet, packet_time); |
+ if (worker_thread_->IsCurrent()) { |
+ // Push it down to the media channel. |
+ if (!rtcp) { |
+ media_channel_->OnPacketReceived(packet, packet_time); |
+ } else { |
+ media_channel_->OnRtcpReceived(packet, packet_time); |
+ } |
} else { |
- media_channel_->OnRtcpReceived(packet, packet_time); |
+ IncomingPacketMessageData* message_data = new IncomingPacketMessageData; |
+ message_data->packet = std::move(*packet); |
+ message_data->packet_time = packet_time; |
+ int message_id = rtcp ? MSG_INCOMING_RTCP_PACKET : MSG_INCOMING_RTP_PACKET; |
+ worker_thread_->Post(this, message_id, message_data); |
} |
} |
@@ -786,30 +832,30 @@ void BaseChannel::EnableMedia_w() { |
LOG(LS_INFO) << "Channel enabled"; |
enabled_ = true; |
- ChangeState(); |
+ ChangeState_w(); |
} |
void BaseChannel::DisableMedia_w() { |
- ASSERT(worker_thread_ == rtc::Thread::Current()); |
+ RTC_DCHECK(worker_thread_->IsCurrent()); |
if (!enabled_) |
return; |
LOG(LS_INFO) << "Channel disabled"; |
enabled_ = false; |
- ChangeState(); |
+ ChangeState_w(); |
} |
-void BaseChannel::UpdateWritableState_w() { |
+void BaseChannel::UpdateWritableState_n() { |
if (transport_channel_ && transport_channel_->writable() && |
(!rtcp_transport_channel_ || rtcp_transport_channel_->writable())) { |
- ChannelWritable_w(); |
+ ChannelWritable_n(); |
} else { |
- ChannelNotWritable_w(); |
+ ChannelNotWritable_n(); |
} |
} |
-void BaseChannel::ChannelWritable_w() { |
- ASSERT(worker_thread_ == rtc::Thread::Current()); |
+void BaseChannel::ChannelWritable_n() { |
+ RTC_DCHECK(network_thread_->IsCurrent()); |
if (writable_) { |
return; |
} |
@@ -829,19 +875,21 @@ void BaseChannel::ChannelWritable_w() { |
} |
was_ever_writable_ = true; |
- MaybeSetupDtlsSrtp_w(); |
+ MaybeSetupDtlsSrtp_n(); |
writable_ = true; |
- ChangeState(); |
+ ChangeState_n(); |
} |
-void BaseChannel::SignalDtlsSetupFailure_w(bool rtcp) { |
- ASSERT(worker_thread() == rtc::Thread::Current()); |
+void BaseChannel::SignalDtlsSetupFailure_n(bool rtcp) { |
+ RTC_DCHECK(network_thread_->IsCurrent()); |
+ RTC_NOTREACHED(); |
+ // TODO(danilchap): Not allowed to invoke from network thread. Post instead. |
signaling_thread()->Invoke<void>(Bind( |
&BaseChannel::SignalDtlsSetupFailure_s, this, rtcp)); |
} |
void BaseChannel::SignalDtlsSetupFailure_s(bool rtcp) { |
- ASSERT(signaling_thread() == rtc::Thread::Current()); |
+ RTC_DCHECK(signaling_thread()->IsCurrent()); |
SignalDtlsSetupFailure(this, rtcp); |
} |
@@ -864,7 +912,8 @@ bool BaseChannel::ShouldSetupDtlsSrtp() const { |
// This function returns true if either DTLS-SRTP is not in use |
// *or* DTLS-SRTP is successfully set up. |
-bool BaseChannel::SetupDtlsSrtp(bool rtcp_channel) { |
+bool BaseChannel::SetupDtlsSrtp_n(bool rtcp_channel) { |
+ RTC_DCHECK(network_thread_->IsCurrent()); |
bool ret = false; |
TransportChannel* channel = |
@@ -950,7 +999,7 @@ bool BaseChannel::SetupDtlsSrtp(bool rtcp_channel) { |
return ret; |
} |
-void BaseChannel::MaybeSetupDtlsSrtp_w() { |
+void BaseChannel::MaybeSetupDtlsSrtp_n() { |
if (srtp_filter_.IsActive()) { |
return; |
} |
@@ -959,30 +1008,30 @@ void BaseChannel::MaybeSetupDtlsSrtp_w() { |
return; |
} |
- if (!SetupDtlsSrtp(false)) { |
- SignalDtlsSetupFailure_w(false); |
+ if (!SetupDtlsSrtp_n(false)) { |
+ SignalDtlsSetupFailure_n(false); |
return; |
} |
if (rtcp_transport_channel_) { |
- if (!SetupDtlsSrtp(true)) { |
- SignalDtlsSetupFailure_w(true); |
+ if (!SetupDtlsSrtp_n(true)) { |
+ SignalDtlsSetupFailure_n(true); |
return; |
} |
} |
} |
-void BaseChannel::ChannelNotWritable_w() { |
- ASSERT(worker_thread_ == rtc::Thread::Current()); |
+void BaseChannel::ChannelNotWritable_n() { |
+ RTC_DCHECK(network_thread_->IsCurrent()); |
if (!writable_) |
return; |
LOG(LS_INFO) << "Channel not writable (" << content_name_ << ")"; |
writable_ = false; |
- ChangeState(); |
+ ChangeState_n(); |
} |
-bool BaseChannel::SetRtpTransportParameters_w( |
+bool BaseChannel::SetRtpTransportParameters_n( |
const MediaContentDescription* content, |
ContentAction action, |
ContentSource src, |
@@ -997,11 +1046,11 @@ bool BaseChannel::SetRtpTransportParameters_w( |
set_secure_required(content->crypto_required() != CT_NONE); |
} |
- if (!SetSrtp_w(content->cryptos(), action, src, error_desc)) { |
+ if (!SetSrtp_n(content->cryptos(), action, src, error_desc)) { |
return false; |
} |
- if (!SetRtcpMux_w(content->rtcp_mux(), action, src, error_desc)) { |
+ if (!SetRtcpMux_n(content->rtcp_mux(), action, src, error_desc)) { |
return false; |
} |
@@ -1022,7 +1071,7 @@ bool BaseChannel::CheckSrtpConfig(const std::vector<CryptoParams>& cryptos, |
return true; |
} |
-bool BaseChannel::SetSrtp_w(const std::vector<CryptoParams>& cryptos, |
+bool BaseChannel::SetSrtp_n(const std::vector<CryptoParams>& cryptos, |
ContentAction action, |
ContentSource src, |
std::string* error_desc) { |
@@ -1070,11 +1119,10 @@ bool BaseChannel::SetSrtp_w(const std::vector<CryptoParams>& cryptos, |
} |
void BaseChannel::ActivateRtcpMux() { |
- worker_thread_->Invoke<void>(Bind( |
- &BaseChannel::ActivateRtcpMux_w, this)); |
+ network_thread_->Invoke<void>(Bind(&BaseChannel::ActivateRtcpMux_n, this)); |
} |
-void BaseChannel::ActivateRtcpMux_w() { |
+void BaseChannel::ActivateRtcpMux_n() { |
if (!rtcp_mux_filter_.IsActive()) { |
rtcp_mux_filter_.SetActive(); |
set_rtcp_transport_channel(nullptr, true); |
@@ -1082,7 +1130,8 @@ void BaseChannel::ActivateRtcpMux_w() { |
} |
} |
-bool BaseChannel::SetRtcpMux_w(bool enable, ContentAction action, |
+bool BaseChannel::SetRtcpMux_n(bool enable, |
+ ContentAction action, |
ContentSource src, |
std::string* error_desc) { |
bool ret = false; |
@@ -1121,7 +1170,7 @@ bool BaseChannel::SetRtcpMux_w(bool enable, ContentAction action, |
if (rtcp_mux_filter_.IsActive()) { |
// If the RTP transport is already writable, then so are we. |
if (transport_channel_->writable()) { |
- ChannelWritable_w(); |
+ ChannelWritable_n(); |
} |
} |
@@ -1298,6 +1347,7 @@ void BaseChannel::OnMessage(rtc::Message *pmsg) { |
switch (pmsg->message_id) { |
case MSG_RTPPACKET: |
case MSG_RTCPPACKET: { |
+ RTC_DCHECK(network_thread_->IsCurrent()); |
PacketMessageData* data = static_cast<PacketMessageData*>(pmsg->pdata); |
SendPacket(pmsg->message_id == MSG_RTCPPACKET, &data->packet, |
data->options); |
@@ -1308,28 +1358,62 @@ void BaseChannel::OnMessage(rtc::Message *pmsg) { |
SignalFirstPacketReceived(this); |
break; |
} |
+ case MSG_INCOMING_RTP_PACKET: { |
+ RTC_DCHECK(worker_thread_->IsCurrent()); |
+ IncomingPacketMessageData* data = |
+ static_cast<IncomingPacketMessageData*>(pmsg->pdata); |
+ media_channel_->OnPacketReceived(&data->packet, data->packet_time); |
+ delete data; |
+ break; |
+ } |
+ case MSG_INCOMING_RTCP_PACKET: { |
+ RTC_DCHECK(worker_thread_->IsCurrent()); |
+ IncomingPacketMessageData* data = |
+ static_cast<IncomingPacketMessageData*>(pmsg->pdata); |
+ media_channel_->OnRtcpReceived(&data->packet, data->packet_time); |
+ delete data; |
+ break; |
+ } |
+ case MSG_CHANGE_STATE: { |
+ RTC_DCHECK(worker_thread_->IsCurrent()); |
+ ChangeState_w(); |
+ break; |
+ } |
+ case MSG_READY_TO_SEND: { |
+ RTC_DCHECK(worker_thread_->IsCurrent()); |
+ media_channel_->OnReadyToSend(true); |
+ break; |
+ } |
+ case MSG_NOT_READY_TO_SEND: { |
+ RTC_DCHECK(worker_thread_->IsCurrent()); |
+ media_channel_->OnReadyToSend(false); |
+ break; |
+ } |
} |
} |
void BaseChannel::FlushRtcpMessages() { |
// Flush all remaining RTCP messages. This should only be called in |
// destructor. |
- ASSERT(rtc::Thread::Current() == worker_thread_); |
- rtc::MessageList rtcp_messages; |
- worker_thread_->Clear(this, MSG_RTCPPACKET, &rtcp_messages); |
- for (rtc::MessageList::iterator it = rtcp_messages.begin(); |
- it != rtcp_messages.end(); ++it) { |
- worker_thread_->Send(this, MSG_RTCPPACKET, it->pdata); |
- } |
+ network_thread_->Invoke<void>([this] { |
+ rtc::MessageList rtcp_messages; |
+ network_thread_->Clear(this, MSG_RTCPPACKET, &rtcp_messages); |
+ for (rtc::MessageList::iterator it = rtcp_messages.begin(); |
+ it != rtcp_messages.end(); ++it) { |
+ network_thread_->Send(this, MSG_RTCPPACKET, it->pdata); |
+ } |
+ }); |
} |
-VoiceChannel::VoiceChannel(rtc::Thread* thread, |
+VoiceChannel::VoiceChannel(rtc::Thread* worker_thread, |
+ rtc::Thread* network_thread, |
MediaEngineInterface* media_engine, |
VoiceMediaChannel* media_channel, |
TransportController* transport_controller, |
const std::string& content_name, |
bool rtcp) |
- : BaseChannel(thread, |
+ : BaseChannel(worker_thread, |
+ network_thread, |
media_channel, |
transport_controller, |
content_name, |
@@ -1487,7 +1571,16 @@ void VoiceChannel::OnChannelRead(TransportChannel* channel, |
} |
} |
-void VoiceChannel::ChangeState() { |
+void BaseChannel::ChangeState_n() { |
+ RTC_DCHECK(network_thread_->IsCurrent()); |
+ if (worker_thread_->IsCurrent()) { |
+ ChangeState_w(); |
+ } else { |
+ worker_thread_->Post(this, MSG_CHANGE_STATE); |
+ } |
+} |
+ |
+void VoiceChannel::ChangeState_w() { |
// Render incoming data if we're the active call, and we have the local |
// content. We receive data on the default channel and multiplexed streams. |
bool recv = IsReadyToReceive(); |
@@ -1521,7 +1614,9 @@ bool VoiceChannel::SetLocalContent_w(const MediaContentDescription* content, |
return false; |
} |
- if (!SetRtpTransportParameters_w(content, action, CS_LOCAL, error_desc)) { |
+ if (!InvokeOnNetwork(Bind(&BaseChannel::SetRtpTransportParameters_n, |
+ static_cast<BaseChannel*>(this), content, action, |
+ CS_LOCAL, error_desc))) { |
return false; |
} |
@@ -1547,7 +1642,7 @@ bool VoiceChannel::SetLocalContent_w(const MediaContentDescription* content, |
} |
set_local_content_direction(content->direction()); |
- ChangeState(); |
+ ChangeState_w(); |
return true; |
} |
@@ -1566,7 +1661,9 @@ bool VoiceChannel::SetRemoteContent_w(const MediaContentDescription* content, |
return false; |
} |
- if (!SetRtpTransportParameters_w(content, action, CS_REMOTE, error_desc)) { |
+ if (!InvokeOnNetwork(Bind(&BaseChannel::SetRtpTransportParameters_n, |
+ static_cast<BaseChannel*>(this), content, action, |
+ CS_REMOTE, error_desc))) { |
return false; |
} |
@@ -1598,7 +1695,7 @@ bool VoiceChannel::SetRemoteContent_w(const MediaContentDescription* content, |
} |
set_remote_content_direction(content->direction()); |
- ChangeState(); |
+ ChangeState_w(); |
return true; |
} |
@@ -1656,12 +1753,14 @@ void VoiceChannel::GetSrtpCryptoSuites(std::vector<int>* crypto_suites) const { |
GetSupportedAudioCryptoSuites(crypto_suites); |
} |
-VideoChannel::VideoChannel(rtc::Thread* thread, |
+VideoChannel::VideoChannel(rtc::Thread* worker_thread, |
+ rtc::Thread* network_thread, |
VideoMediaChannel* media_channel, |
TransportController* transport_controller, |
const std::string& content_name, |
bool rtcp) |
- : BaseChannel(thread, |
+ : BaseChannel(worker_thread, |
+ network_thread, |
media_channel, |
transport_controller, |
content_name, |
@@ -1723,7 +1822,8 @@ bool VideoChannel::SetRtpParameters_w(uint32_t ssrc, |
webrtc::RtpParameters parameters) { |
return media_channel()->SetRtpParameters(ssrc, parameters); |
} |
-void VideoChannel::ChangeState() { |
+ |
+void VideoChannel::ChangeState_w() { |
// Send outgoing data if we're the active call, we have the remote content, |
// and we have had some form of connectivity. |
bool send = IsReadyToSend(); |
@@ -1775,7 +1875,9 @@ bool VideoChannel::SetLocalContent_w(const MediaContentDescription* content, |
return false; |
} |
- if (!SetRtpTransportParameters_w(content, action, CS_LOCAL, error_desc)) { |
+ if (!InvokeOnNetwork(Bind(&BaseChannel::SetRtpTransportParameters_n, |
+ static_cast<BaseChannel*>(this), content, action, |
+ CS_LOCAL, error_desc))) { |
return false; |
} |
@@ -1801,7 +1903,7 @@ bool VideoChannel::SetLocalContent_w(const MediaContentDescription* content, |
} |
set_local_content_direction(content->direction()); |
- ChangeState(); |
+ ChangeState_w(); |
return true; |
} |
@@ -1820,8 +1922,9 @@ bool VideoChannel::SetRemoteContent_w(const MediaContentDescription* content, |
return false; |
} |
- |
- if (!SetRtpTransportParameters_w(content, action, CS_REMOTE, error_desc)) { |
+ if (!InvokeOnNetwork(Bind(&BaseChannel::SetRtpTransportParameters_n, |
+ static_cast<BaseChannel*>(this), content, action, |
+ CS_REMOTE, error_desc))) { |
return false; |
} |
@@ -1854,7 +1957,7 @@ bool VideoChannel::SetRemoteContent_w(const MediaContentDescription* content, |
} |
set_remote_content_direction(content->direction()); |
- ChangeState(); |
+ ChangeState_w(); |
return true; |
} |
@@ -1889,12 +1992,14 @@ void VideoChannel::GetSrtpCryptoSuites(std::vector<int>* crypto_suites) const { |
GetSupportedVideoCryptoSuites(crypto_suites); |
} |
-DataChannel::DataChannel(rtc::Thread* thread, |
+DataChannel::DataChannel(rtc::Thread* worker_thread, |
+ rtc::Thread* network_thread, |
DataMediaChannel* media_channel, |
TransportController* transport_controller, |
const std::string& content_name, |
bool rtcp) |
- : BaseChannel(thread, |
+ : BaseChannel(worker_thread, |
+ network_thread, |
media_channel, |
transport_controller, |
content_name, |
@@ -1998,7 +2103,9 @@ bool DataChannel::SetLocalContent_w(const MediaContentDescription* content, |
} |
if (data_channel_type_ == DCT_RTP) { |
- if (!SetRtpTransportParameters_w(content, action, CS_LOCAL, error_desc)) { |
+ if (!InvokeOnNetwork(Bind(&BaseChannel::SetRtpTransportParameters_n, |
+ static_cast<BaseChannel*>(this), content, action, |
+ CS_LOCAL, error_desc))) { |
return false; |
} |
} |
@@ -2030,7 +2137,7 @@ bool DataChannel::SetLocalContent_w(const MediaContentDescription* content, |
} |
set_local_content_direction(content->direction()); |
- ChangeState(); |
+ ChangeState_w(); |
return true; |
} |
@@ -2059,9 +2166,12 @@ bool DataChannel::SetRemoteContent_w(const MediaContentDescription* content, |
} |
LOG(LS_INFO) << "Setting remote data description"; |
- if (data_channel_type_ == DCT_RTP && |
- !SetRtpTransportParameters_w(content, action, CS_REMOTE, error_desc)) { |
- return false; |
+ if (data_channel_type_ == DCT_RTP) { |
+ if (!InvokeOnNetwork(Bind(&BaseChannel::SetRtpTransportParameters_n, |
+ static_cast<BaseChannel*>(this), content, action, |
+ CS_REMOTE, error_desc))) { |
+ return false; |
+ } |
} |
@@ -2085,11 +2195,11 @@ bool DataChannel::SetRemoteContent_w(const MediaContentDescription* content, |
} |
set_remote_content_direction(content->direction()); |
- ChangeState(); |
+ ChangeState_w(); |
return true; |
} |
-void DataChannel::ChangeState() { |
+void DataChannel::ChangeState_w() { |
// Render incoming data if we're the active call, and we have the local |
// content. We receive data on the default channel and multiplexed streams. |
bool recv = IsReadyToReceive(); |