Chromium Code Reviews| Index: webrtc/pc/channel.cc |
| diff --git a/webrtc/pc/channel.cc b/webrtc/pc/channel.cc |
| index 4d47e878b8e2b8c7d7c858c439d822f9d95a4720..3ed352490787e09378caff304bc94640b38804df 100644 |
| --- a/webrtc/pc/channel.cc |
| +++ b/webrtc/pc/channel.cc |
| @@ -37,17 +37,44 @@ bool SetRawAudioSink_w(VoiceMediaChannel* channel, |
| channel->SetRawAudioSink(ssrc, std::move(*sink)); |
| return true; |
| } |
| + |
| +struct SendingPacketMessageData : public rtc::MessageData { |
| + rtc::CopyOnWriteBuffer packet; |
| + rtc::PacketOptions options; |
| +}; |
| + |
| +struct ReceivedPacketMessageData : public rtc::MessageData { |
| + rtc::CopyOnWriteBuffer packet; |
| + rtc::PacketTime packet_time; |
| +}; |
| + |
| +struct ChangeState : public rtc::MessageData { |
| + bool send; |
| + bool recv; |
| +}; |
| + |
| +struct NetworkRouteChanged : public rtc::MessageData { |
| + std::string transport_name; |
| + rtc::NetworkRoute network_route; |
| +}; |
| + |
| } // namespace |
| enum { |
| MSG_EARLYMEDIATIMEOUT = 1, |
| - MSG_RTPPACKET, |
| - MSG_RTCPPACKET, |
| + MSG_SENDING_RTP_PACKET, |
| + MSG_SENDING_RTCP_PACKET, |
| MSG_CHANNEL_ERROR, |
| MSG_READYTOSENDDATA, |
| MSG_DATARECEIVED, |
| MSG_FIRSTPACKETRECEIVED, |
| MSG_STREAMCLOSEDREMOTELY, |
| + MSG_RECEIVED_RTP_PACKET, |
| + MSG_RECEIVED_RTCP_PACKET, |
| + MSG_CHANGE_STATE, |
| + MSG_NOT_READY_TO_SEND, |
| + MSG_READY_TO_SEND, |
| + MSG_NETWORK_ROUTE_CHANGED, |
| }; |
| // Value specified in RFC 5764. |
| @@ -61,11 +88,6 @@ static void SafeSetError(const std::string& message, std::string* error_desc) { |
| } |
| } |
| -struct PacketMessageData : public rtc::MessageData { |
| - rtc::CopyOnWriteBuffer packet; |
| - rtc::PacketOptions options; |
| -}; |
| - |
| struct VoiceChannelErrorMessageData : public rtc::MessageData { |
| VoiceChannelErrorMessageData(uint32_t in_ssrc, |
| VoiceMediaChannel::Error in_error) |
| @@ -142,12 +164,14 @@ void RtpSendParametersFromMediaDescription( |
| send_params->max_bandwidth_bps = desc->bandwidth(); |
| } |
| -BaseChannel::BaseChannel(rtc::Thread* thread, |
| +BaseChannel::BaseChannel(rtc::Thread* worker_thread, |
| + rtc::Thread* network_thread, |
| MediaChannel* media_channel, |
| TransportController* transport_controller, |
| const std::string& content_name, |
| bool rtcp) |
| - : worker_thread_(thread), |
| + : worker_thread_(worker_thread), |
| + network_thread_(network_thread), |
| transport_controller_(transport_controller), |
| media_channel_(media_channel), |
| content_name_(content_name), |
| @@ -166,6 +190,9 @@ BaseChannel::BaseChannel(rtc::Thread* thread, |
| secure_required_(false), |
| rtp_abs_sendtime_extn_id_(-1) { |
| ASSERT(worker_thread_ == rtc::Thread::Current()); |
| + if (transport_controller) { |
| + RTC_DCHECK_EQ(network_thread, transport_controller->worker_thread()); |
| + } |
| LOG(LS_INFO) << "Created channel for " << content_name; |
| } |
| @@ -182,49 +209,60 @@ BaseChannel::~BaseChannel() { |
| delete media_channel_; |
| // Note that we don't just call set_transport_channel(nullptr) because that |
| // would call a pure virtual method which we can't do from a destructor. |
| - if (transport_channel_) { |
| - DisconnectFromTransportChannel(transport_channel_); |
| - transport_controller_->DestroyTransportChannel_w( |
| - transport_name_, cricket::ICE_CANDIDATE_COMPONENT_RTP); |
| - } |
| - if (rtcp_transport_channel_) { |
| - DisconnectFromTransportChannel(rtcp_transport_channel_); |
| - transport_controller_->DestroyTransportChannel_w( |
| - transport_name_, cricket::ICE_CANDIDATE_COMPONENT_RTCP); |
| - } |
| + network_thread_->Invoke<void>([this] { |
|
pthatcher1
2016/04/29 23:36:23
It seems risky to have the BaseChannel destructor
pthatcher1
2016/04/29 23:36:23
I don't think we're allowed to invoke lambdas beca
danilchap
2016/05/02 14:50:34
Wasn't sure if storing in sendlist queue was count
danilchap
2016/05/02 14:50:34
Worker can be blocked on network thread, but netwo
pthatcher1
2016/05/11 04:50:01
That's a good point. It's probably worth bringing
pthatcher1
2016/05/11 04:50:01
That's probably the only thing that makes sense.
danilchap
2016/05/11 12:19:16
Expanded comment above BaseChannel definition
|
| + if (transport_channel_) { |
| + DisconnectFromTransportChannel(transport_channel_); |
| + transport_controller_->DestroyTransportChannel_w( |
| + transport_name_, cricket::ICE_CANDIDATE_COMPONENT_RTP); |
| + } |
| + if (rtcp_transport_channel_) { |
| + DisconnectFromTransportChannel(rtcp_transport_channel_); |
| + transport_controller_->DestroyTransportChannel_w( |
| + transport_name_, cricket::ICE_CANDIDATE_COMPONENT_RTCP); |
| + } |
| + network_thread_->Clear(this); |
| + }); |
| LOG(LS_INFO) << "Destroyed channel"; |
| } |
| bool BaseChannel::Init() { |
| - if (!SetTransport(content_name())) { |
| - return false; |
| - } |
| + bool setup_transport = network_thread_->Invoke<bool>([this] { |
| + if (!SetTransport_n(content_name())) { |
| + return false; |
| + } |
| - if (!SetDtlsSrtpCryptoSuites(transport_channel(), false)) { |
| - return false; |
| - } |
| - if (rtcp_transport_enabled() && |
| - !SetDtlsSrtpCryptoSuites(rtcp_transport_channel(), true)) { |
| + if (!SetDtlsSrtpCryptoSuites(transport_channel(), false)) { |
|
pthatcher1
2016/04/29 23:36:23
I think SetDtlsSrtpCryptoSuites, rtcp_transport_en
danilchap
2016/05/02 14:50:34
Old code access them on worker, not signaling thre
pthatcher1
2016/05/11 04:50:01
Ah, true. I was thrown off by Init(). It should
danilchap
2016/05/11 12:19:16
Init renamed to Init_w, few more function got thre
|
| + return false; |
| + } |
| + if (rtcp_transport_enabled() && |
| + !SetDtlsSrtpCryptoSuites(rtcp_transport_channel(), true)) { |
| + return false; |
| + } |
| + return true; |
| + }); |
| + if (!setup_transport) { |
| return false; |
| } |
| // Both RTP and RTCP channels are set, we can call SetInterface on |
| // media channel and it can set network options. |
| + RTC_DCHECK(worker_thread_->IsCurrent()); |
| media_channel_->SetInterface(this); |
| return true; |
| } |
| void BaseChannel::Deinit() { |
| + RTC_DCHECK(worker_thread_->IsCurrent()); |
| media_channel_->SetInterface(NULL); |
| } |
| bool BaseChannel::SetTransport(const std::string& transport_name) { |
| - return worker_thread_->Invoke<bool>( |
| - Bind(&BaseChannel::SetTransport_w, this, transport_name)); |
| + return network_thread_->Invoke<bool>( |
| + Bind(&BaseChannel::SetTransport_n, this, transport_name)); |
| } |
| -bool BaseChannel::SetTransport_w(const std::string& transport_name) { |
| - ASSERT(worker_thread_ == rtc::Thread::Current()); |
| +bool BaseChannel::SetTransport_n(const std::string& transport_name) { |
| + RTC_DCHECK(network_thread_->IsCurrent()); |
| if (transport_name == transport_name_) { |
| // Nothing to do if transport name isn't changing |
| @@ -273,7 +311,7 @@ bool BaseChannel::SetTransport_w(const std::string& transport_name) { |
| } |
| void BaseChannel::set_transport_channel(TransportChannel* new_tc) { |
| - ASSERT(worker_thread_ == rtc::Thread::Current()); |
| + RTC_DCHECK(network_thread_->IsCurrent()); |
| TransportChannel* old_tc = transport_channel_; |
| if (!old_tc && !new_tc) { |
| @@ -299,13 +337,13 @@ void BaseChannel::set_transport_channel(TransportChannel* new_tc) { |
| // Update aggregate writable/ready-to-send state between RTP and RTCP upon |
| // setting new channel |
| - UpdateWritableState_w(); |
| + UpdateWritableState_n(); |
| SetReadyToSend(false, new_tc && new_tc->writable()); |
| } |
| void BaseChannel::set_rtcp_transport_channel(TransportChannel* new_tc, |
| bool update_writablity) { |
| - ASSERT(worker_thread_ == rtc::Thread::Current()); |
| + RTC_DCHECK(network_thread_->IsCurrent()); |
| TransportChannel* old_tc = rtcp_transport_channel_; |
| if (!old_tc && !new_tc) { |
| @@ -335,13 +373,13 @@ void BaseChannel::set_rtcp_transport_channel(TransportChannel* new_tc, |
| if (update_writablity) { |
| // Update aggregate writable/ready-to-send state between RTP and RTCP upon |
| // setting new channel |
| - UpdateWritableState_w(); |
| + UpdateWritableState_n(); |
| SetReadyToSend(true, new_tc && new_tc->writable()); |
| } |
| } |
| void BaseChannel::ConnectToTransportChannel(TransportChannel* tc) { |
| - ASSERT(worker_thread_ == rtc::Thread::Current()); |
| + RTC_DCHECK(network_thread_->IsCurrent()); |
| tc->SignalWritableState.connect(this, &BaseChannel::OnWritableState); |
| tc->SignalReadPacket.connect(this, &BaseChannel::OnChannelRead); |
| @@ -352,7 +390,7 @@ void BaseChannel::ConnectToTransportChannel(TransportChannel* tc) { |
| } |
| void BaseChannel::DisconnectFromTransportChannel(TransportChannel* tc) { |
| - ASSERT(worker_thread_ == rtc::Thread::Current()); |
| + RTC_DCHECK(network_thread_->IsCurrent()); |
| tc->SignalWritableState.disconnect(this); |
| tc->SignalReadPacket.disconnect(this); |
| @@ -405,8 +443,8 @@ void BaseChannel::StartConnectionMonitor(int cms) { |
| // We pass in the BaseChannel instead of the transport_channel_ |
| // because if the transport_channel_ changes, the ConnectionMonitor |
| // would be pointing to the wrong TransportChannel. |
| - connection_monitor_.reset(new ConnectionMonitor( |
| - this, worker_thread(), rtc::Thread::Current())); |
| + connection_monitor_.reset( |
| + new ConnectionMonitor(this, network_thread(), rtc::Thread::Current())); |
|
pthatcher1
2016/04/29 23:36:23
We should comment that this is because GetConnecti
danilchap
2016/05/02 14:50:34
Done.
|
| connection_monitor_->SignalUpdate.connect( |
| this, &BaseChannel::OnConnectionMonitorUpdate); |
| connection_monitor_->Start(cms); |
| @@ -420,7 +458,7 @@ void BaseChannel::StopConnectionMonitor() { |
| } |
| bool BaseChannel::GetConnectionStats(ConnectionInfos* infos) { |
| - ASSERT(worker_thread_ == rtc::Thread::Current()); |
| + RTC_DCHECK(network_thread_->IsCurrent()); |
| return transport_channel_->GetStats(infos); |
| } |
| @@ -450,25 +488,29 @@ bool BaseChannel::SendRtcp(rtc::CopyOnWriteBuffer* packet, |
| int BaseChannel::SetOption(SocketType type, rtc::Socket::Option opt, |
| int value) { |
| - TransportChannel* channel = NULL; |
| - switch (type) { |
| - case ST_RTP: |
| - channel = transport_channel_; |
| - socket_options_.push_back( |
| - std::pair<rtc::Socket::Option, int>(opt, value)); |
| - break; |
| - case ST_RTCP: |
| - channel = rtcp_transport_channel_; |
| - rtcp_socket_options_.push_back( |
| - std::pair<rtc::Socket::Option, int>(opt, value)); |
| - break; |
| - } |
| - return channel ? channel->SetOption(opt, value) : -1; |
| + return network_thread_->Invoke<int>([this, type, opt, value] { |
| + TransportChannel* channel = nullptr; |
| + switch (type) { |
| + case ST_RTP: |
| + channel = transport_channel_; |
| + socket_options_.push_back( |
| + std::pair<rtc::Socket::Option, int>(opt, value)); |
| + break; |
| + case ST_RTCP: |
| + channel = rtcp_transport_channel_; |
| + rtcp_socket_options_.push_back( |
| + std::pair<rtc::Socket::Option, int>(opt, value)); |
| + break; |
| + } |
| + return channel ? channel->SetOption(opt, value) : -1; |
|
pthatcher1
2016/04/29 23:36:23
Only this last part should be on the network threa
danilchap
2016/05/02 14:50:34
transport_channels created/destroyed/set on networ
pthatcher1
2016/05/11 04:50:01
Ah, that's a good point. transport_channel_ and r
|
| + }); |
| } |
| void BaseChannel::OnWritableState(TransportChannel* channel) { |
| - ASSERT(channel == transport_channel_ || channel == rtcp_transport_channel_); |
| - UpdateWritableState_w(); |
| + RTC_DCHECK(channel == transport_channel_ || |
| + channel == rtcp_transport_channel_); |
| + RTC_DCHECK(network_thread_->IsCurrent()); |
| + UpdateWritableState_n(); |
| } |
| void BaseChannel::OnChannelRead(TransportChannel* channel, |
| @@ -477,7 +519,7 @@ void BaseChannel::OnChannelRead(TransportChannel* channel, |
| int flags) { |
| TRACE_EVENT0("webrtc", "BaseChannel::OnChannelRead"); |
| // OnChannelRead gets called from P2PSocket; now pass data to MediaEngine |
| - ASSERT(worker_thread_ == rtc::Thread::Current()); |
| + RTC_DCHECK(network_thread_->IsCurrent()); |
| // When using RTCP multiplexing we might get RTCP packets on the RTP |
| // transport. We feed RTP traffic into the demuxer to determine if it is RTCP. |
| @@ -512,33 +554,33 @@ void BaseChannel::OnSelectedCandidatePairChanged( |
| CandidatePairInterface* selected_candidate_pair, |
| int last_sent_packet_id) { |
| ASSERT(channel == transport_channel_ || channel == rtcp_transport_channel_); |
| - rtc::NetworkRoute network_route; |
| + RTC_DCHECK(network_thread_->IsCurrent()); |
| + NetworkRouteChanged* message = new NetworkRouteChanged; |
| if (selected_candidate_pair) { |
| - network_route = rtc::NetworkRoute( |
| + message->network_route = rtc::NetworkRoute( |
| selected_candidate_pair->local_candidate().network_id(), |
| selected_candidate_pair->remote_candidate().network_id(), |
| last_sent_packet_id); |
| } |
| - media_channel()->OnNetworkRouteChanged(channel->transport_name(), |
| - network_route); |
| + message->transport_name = channel->transport_name(); |
| + worker_thread_->Post(this, MSG_NETWORK_ROUTE_CHANGED, message); |
|
pthatcher1
2016/04/29 23:36:23
Can't we do an AsyncInvoke on the worker_thread_ w
danilchap
2016/05/02 14:50:34
Didn't know there was AsyncInvoke.
Done.
|
| } |
| void BaseChannel::SetReadyToSend(bool rtcp, bool ready) { |
| + RTC_DCHECK(network_thread_->IsCurrent()); |
| if (rtcp) { |
| rtcp_ready_to_send_ = ready; |
| } else { |
| rtp_ready_to_send_ = ready; |
| } |
| - if (rtp_ready_to_send_ && |
| - // In the case of rtcp mux |rtcp_transport_channel_| will be null. |
| - (rtcp_ready_to_send_ || !rtcp_transport_channel_)) { |
| - // Notify the MediaChannel when both rtp and rtcp channel can send. |
| - media_channel_->OnReadyToSend(true); |
| - } else { |
| - // Notify the MediaChannel when either rtp or rtcp channel can't send. |
| - media_channel_->OnReadyToSend(false); |
| - } |
| + bool ready_to_send = |
| + (rtp_ready_to_send_ && |
| + // In the case of rtcp mux |rtcp_transport_channel_| will be null. |
| + (rtcp_ready_to_send_ || !rtcp_transport_channel_)); |
| + |
| + worker_thread_->Post( |
| + this, ready_to_send ? MSG_READY_TO_SEND : MSG_NOT_READY_TO_SEND); |
|
pthatcher1
2016/04/29 23:36:23
Same here.
danilchap
2016/05/02 14:50:34
Done.
|
| } |
| bool BaseChannel::PacketIsRtcp(const TransportChannel* channel, |
| @@ -550,22 +592,23 @@ bool BaseChannel::PacketIsRtcp(const TransportChannel* channel, |
| bool BaseChannel::SendPacket(bool rtcp, |
| rtc::CopyOnWriteBuffer* packet, |
| const rtc::PacketOptions& options) { |
| - // SendPacket gets called from MediaEngine, typically on an encoder thread. |
| - // If the thread is not our worker thread, we will post to our worker |
| - // so that the real work happens on our worker. This avoids us having to |
| + // SendPacket gets called from MediaEngine, on a pacer or an encoder thread. |
| + // If the thread is not our network thread, we will post to our network |
| + // so that the real work happens on our network. This avoids us having to |
| // synchronize access to all the pieces of the send path, including |
| // SRTP and the inner workings of the transport channels. |
| // The only downside is that we can't return a proper failure code if |
| // needed. Since UDP is unreliable anyway, this should be a non-issue. |
| - if (rtc::Thread::Current() != worker_thread_) { |
| + if (!network_thread_->IsCurrent()) { |
| // Avoid a copy by transferring the ownership of the packet data. |
| - int message_id = (!rtcp) ? MSG_RTPPACKET : MSG_RTCPPACKET; |
| - PacketMessageData* data = new PacketMessageData; |
| + int message_id = rtcp ? MSG_SENDING_RTCP_PACKET : MSG_SENDING_RTP_PACKET; |
| + SendingPacketMessageData* data = new SendingPacketMessageData; |
|
pthatcher1
2016/04/29 23:36:23
Why is this "Sending" instead of "Send".
danilchap
2016/05/02 14:50:34
For no good reason. Changed.
|
| data->packet = std::move(*packet); |
| data->options = options; |
| - worker_thread_->Post(this, message_id, data); |
| + network_thread_->Post(this, message_id, data); |
| return true; |
| } |
| + TRACE_EVENT0("webrtc", "BaseChannel::SendPacket"); |
| // Now that we are on the correct thread, ensure we have a place to send this |
| // packet before doing anything. (We might get RTCP packets that we don't |
| @@ -589,6 +632,7 @@ bool BaseChannel::SendPacket(bool rtcp, |
| updated_options = options; |
| // Protect if needed. |
| if (srtp_filter_.IsActive()) { |
| + TRACE_EVENT0("webrtc", "SRTP Encode"); |
| bool res; |
| uint8_t* data = packet->data(); |
| int len = static_cast<int>(packet->size()); |
| @@ -656,9 +700,9 @@ bool BaseChannel::SendPacket(bool rtcp, |
| } |
| // Bon voyage. |
| - int ret = |
| - channel->SendPacket(packet->data<char>(), packet->size(), updated_options, |
| - (secure() && secure_dtls()) ? PF_SRTP_BYPASS : 0); |
| + int flags = (secure() && secure_dtls()) ? PF_SRTP_BYPASS : PF_NORMAL; |
| + int ret = channel->SendPacket(packet->data<char>(), packet->size(), |
| + updated_options, flags); |
| if (ret != static_cast<int>(packet->size())) { |
| if (channel->GetError() == EWOULDBLOCK) { |
| LOG(LS_WARNING) << "Got EWOULDBLOCK from socket."; |
| @@ -687,6 +731,7 @@ bool BaseChannel::WantsPacket(bool rtcp, const rtc::CopyOnWriteBuffer* packet) { |
| void BaseChannel::HandlePacket(bool rtcp, rtc::CopyOnWriteBuffer* packet, |
| const rtc::PacketTime& packet_time) { |
| + RTC_DCHECK(network_thread_->IsCurrent()); |
| if (!WantsPacket(rtcp, packet)) { |
| return; |
| } |
| @@ -700,6 +745,7 @@ void BaseChannel::HandlePacket(bool rtcp, rtc::CopyOnWriteBuffer* packet, |
| // Unprotect the packet, if needed. |
| if (srtp_filter_.IsActive()) { |
| + TRACE_EVENT0("webrtc", "SRTP Decode"); |
| char* data = packet->data<char>(); |
| int len = static_cast<int>(packet->size()); |
| bool res; |
| @@ -743,12 +789,11 @@ void BaseChannel::HandlePacket(bool rtcp, rtc::CopyOnWriteBuffer* packet, |
| return; |
| } |
| - // Push it down to the media channel. |
| - if (!rtcp) { |
| - media_channel_->OnPacketReceived(packet, packet_time); |
| - } else { |
| - media_channel_->OnRtcpReceived(packet, packet_time); |
| - } |
| + ReceivedPacketMessageData* message_data = new ReceivedPacketMessageData; |
| + message_data->packet = std::move(*packet); |
| + message_data->packet_time = packet_time; |
| + int message_id = rtcp ? MSG_RECEIVED_RTCP_PACKET : MSG_RECEIVED_RTP_PACKET; |
| + worker_thread_->Post(this, message_id, message_data); |
|
pthatcher1
2016/04/29 23:36:23
Same here with AsyncInvoke
danilchap
2016/05/02 14:50:34
Done.
|
| } |
| bool BaseChannel::PushdownLocalDescription( |
| @@ -786,7 +831,7 @@ void BaseChannel::EnableMedia_w() { |
| LOG(LS_INFO) << "Channel enabled"; |
| enabled_ = true; |
| - ChangeState(); |
| + ChangeState_w(); |
| } |
| void BaseChannel::DisableMedia_w() { |
| @@ -796,20 +841,20 @@ void BaseChannel::DisableMedia_w() { |
| LOG(LS_INFO) << "Channel disabled"; |
| enabled_ = false; |
| - ChangeState(); |
| + ChangeState_w(); |
| } |
| -void BaseChannel::UpdateWritableState_w() { |
| +void BaseChannel::UpdateWritableState_n() { |
| if (transport_channel_ && transport_channel_->writable() && |
| (!rtcp_transport_channel_ || rtcp_transport_channel_->writable())) { |
| - ChannelWritable_w(); |
| + ChannelWritable_n(); |
| } else { |
| - ChannelNotWritable_w(); |
| + ChannelNotWritable_n(); |
| } |
| } |
| -void BaseChannel::ChannelWritable_w() { |
| - ASSERT(worker_thread_ == rtc::Thread::Current()); |
| +void BaseChannel::ChannelWritable_n() { |
| + RTC_DCHECK(network_thread_->IsCurrent()); |
| if (writable_) { |
| return; |
| } |
| @@ -829,13 +874,15 @@ void BaseChannel::ChannelWritable_w() { |
| } |
| was_ever_writable_ = true; |
| - MaybeSetupDtlsSrtp_w(); |
| + MaybeSetupDtlsSrtp_n(); |
| writable_ = true; |
| ChangeState(); |
| } |
| -void BaseChannel::SignalDtlsSetupFailure_w(bool rtcp) { |
| - ASSERT(worker_thread() == rtc::Thread::Current()); |
| +void BaseChannel::SignalDtlsSetupFailure_n(bool rtcp) { |
| + RTC_DCHECK(network_thread_->IsCurrent()); |
| + RTC_NOTREACHED(); |
|
pthatcher1
2016/04/29 23:36:23
This isn't reached?
danilchap
2016/05/02 14:50:34
In practice it isn't reached.
In theory it can be
|
| + // TODO(danilchap): Not allowed to invoke from network thread. Post instead. |
| signaling_thread()->Invoke<void>(Bind( |
| &BaseChannel::SignalDtlsSetupFailure_s, this, rtcp)); |
| } |
| @@ -864,7 +911,8 @@ bool BaseChannel::ShouldSetupDtlsSrtp() const { |
| // This function returns true if either DTLS-SRTP is not in use |
| // *or* DTLS-SRTP is successfully set up. |
| -bool BaseChannel::SetupDtlsSrtp(bool rtcp_channel) { |
| +bool BaseChannel::SetupDtlsSrtp_n(bool rtcp_channel) { |
| + RTC_DCHECK(network_thread_->IsCurrent()); |
| bool ret = false; |
| TransportChannel* channel = |
| @@ -950,7 +998,7 @@ bool BaseChannel::SetupDtlsSrtp(bool rtcp_channel) { |
| return ret; |
| } |
| -void BaseChannel::MaybeSetupDtlsSrtp_w() { |
| +void BaseChannel::MaybeSetupDtlsSrtp_n() { |
| if (srtp_filter_.IsActive()) { |
| return; |
| } |
| @@ -959,21 +1007,21 @@ void BaseChannel::MaybeSetupDtlsSrtp_w() { |
| return; |
| } |
| - if (!SetupDtlsSrtp(false)) { |
| - SignalDtlsSetupFailure_w(false); |
| + if (!SetupDtlsSrtp_n(false)) { |
| + SignalDtlsSetupFailure_n(false); |
| return; |
| } |
| if (rtcp_transport_channel_) { |
| - if (!SetupDtlsSrtp(true)) { |
| - SignalDtlsSetupFailure_w(true); |
| + if (!SetupDtlsSrtp_n(true)) { |
| + SignalDtlsSetupFailure_n(true); |
| return; |
| } |
| } |
| } |
| -void BaseChannel::ChannelNotWritable_w() { |
| - ASSERT(worker_thread_ == rtc::Thread::Current()); |
| +void BaseChannel::ChannelNotWritable_n() { |
| + RTC_DCHECK(network_thread_->IsCurrent()); |
| if (!writable_) |
| return; |
| @@ -982,7 +1030,7 @@ void BaseChannel::ChannelNotWritable_w() { |
| ChangeState(); |
| } |
| -bool BaseChannel::SetRtpTransportParameters_w( |
| +bool BaseChannel::SetRtpTransportParameters( |
| const MediaContentDescription* content, |
| ContentAction action, |
| ContentSource src, |
| @@ -993,19 +1041,22 @@ bool BaseChannel::SetRtpTransportParameters_w( |
| } |
| // Cache secure_required_ for belt and suspenders check on SendPacket |
| - if (src == CS_LOCAL) { |
| - set_secure_required(content->crypto_required() != CT_NONE); |
| - } |
| + return network_thread_->Invoke<bool>( |
| + [this, content, action, src, error_desc] { |
| + if (src == CS_LOCAL) { |
| + set_secure_required(content->crypto_required() != CT_NONE); |
| + } |
|
pthatcher1
2016/04/29 23:36:23
I think set_secure_required needs to be on the sig
danilchap
2016/05/02 14:50:34
Why?
secure_required accessor is used in tests onl
|
| - if (!SetSrtp_w(content->cryptos(), action, src, error_desc)) { |
| - return false; |
| - } |
| + if (!SetSrtp_n(content->cryptos(), action, src, error_desc)) { |
| + return false; |
| + } |
| - if (!SetRtcpMux_w(content->rtcp_mux(), action, src, error_desc)) { |
| - return false; |
| - } |
| + if (!SetRtcpMux_n(content->rtcp_mux(), action, src, error_desc)) { |
| + return false; |
| + } |
| - return true; |
| + return true; |
| + }); |
| } |
| // |dtls| will be set to true if DTLS is active for transport channel and |
| @@ -1022,7 +1073,7 @@ bool BaseChannel::CheckSrtpConfig(const std::vector<CryptoParams>& cryptos, |
| return true; |
| } |
| -bool BaseChannel::SetSrtp_w(const std::vector<CryptoParams>& cryptos, |
| +bool BaseChannel::SetSrtp_n(const std::vector<CryptoParams>& cryptos, |
| ContentAction action, |
| ContentSource src, |
| std::string* error_desc) { |
| @@ -1070,11 +1121,10 @@ bool BaseChannel::SetSrtp_w(const std::vector<CryptoParams>& cryptos, |
| } |
| void BaseChannel::ActivateRtcpMux() { |
| - worker_thread_->Invoke<void>(Bind( |
| - &BaseChannel::ActivateRtcpMux_w, this)); |
| + network_thread_->Invoke<void>(Bind(&BaseChannel::ActivateRtcpMux_n, this)); |
| } |
| -void BaseChannel::ActivateRtcpMux_w() { |
| +void BaseChannel::ActivateRtcpMux_n() { |
| if (!rtcp_mux_filter_.IsActive()) { |
| rtcp_mux_filter_.SetActive(); |
| set_rtcp_transport_channel(nullptr, true); |
| @@ -1082,7 +1132,8 @@ void BaseChannel::ActivateRtcpMux_w() { |
| } |
| } |
| -bool BaseChannel::SetRtcpMux_w(bool enable, ContentAction action, |
| +bool BaseChannel::SetRtcpMux_n(bool enable, |
| + ContentAction action, |
| ContentSource src, |
| std::string* error_desc) { |
| bool ret = false; |
| @@ -1121,7 +1172,7 @@ bool BaseChannel::SetRtcpMux_w(bool enable, ContentAction action, |
| if (rtcp_mux_filter_.IsActive()) { |
| // If the RTP transport is already writable, then so are we. |
| if (transport_channel_->writable()) { |
| - ChannelWritable_w(); |
| + ChannelWritable_n(); |
| } |
| } |
| @@ -1296,40 +1347,84 @@ void BaseChannel::MaybeCacheRtpAbsSendTimeHeaderExtension( |
| void BaseChannel::OnMessage(rtc::Message *pmsg) { |
| TRACE_EVENT0("webrtc", "BaseChannel::OnMessage"); |
| switch (pmsg->message_id) { |
| - case MSG_RTPPACKET: |
| - case MSG_RTCPPACKET: { |
| - PacketMessageData* data = static_cast<PacketMessageData*>(pmsg->pdata); |
| - SendPacket(pmsg->message_id == MSG_RTCPPACKET, &data->packet, |
| - data->options); |
| - delete data; // because it is Posted |
| + case MSG_SENDING_RTP_PACKET: |
| + case MSG_SENDING_RTCP_PACKET: { |
| + RTC_DCHECK(network_thread_->IsCurrent()); |
| + SendingPacketMessageData* data = |
| + static_cast<SendingPacketMessageData*>(pmsg->pdata); |
| + bool rtcp = pmsg->message_id == MSG_SENDING_RTCP_PACKET; |
| + SendPacket(rtcp, &data->packet, data->options); |
| + delete data; |
| break; |
| } |
| case MSG_FIRSTPACKETRECEIVED: { |
| SignalFirstPacketReceived(this); |
| break; |
| } |
| + case MSG_RECEIVED_RTP_PACKET: { |
| + RTC_DCHECK(worker_thread_->IsCurrent()); |
| + ReceivedPacketMessageData* data = |
| + static_cast<ReceivedPacketMessageData*>(pmsg->pdata); |
| + media_channel_->OnPacketReceived(&data->packet, data->packet_time); |
| + delete data; |
| + break; |
| + } |
| + case MSG_RECEIVED_RTCP_PACKET: { |
| + RTC_DCHECK(worker_thread_->IsCurrent()); |
| + ReceivedPacketMessageData* data = |
| + static_cast<ReceivedPacketMessageData*>(pmsg->pdata); |
| + media_channel_->OnRtcpReceived(&data->packet, data->packet_time); |
| + delete data; |
| + break; |
| + } |
| + case MSG_CHANGE_STATE: { |
| + RTC_DCHECK(worker_thread_->IsCurrent()); |
| + ChangeState_w(); |
| + break; |
| + } |
| + case MSG_READY_TO_SEND: { |
| + RTC_DCHECK(worker_thread_->IsCurrent()); |
| + media_channel_->OnReadyToSend(true); |
| + break; |
| + } |
| + case MSG_NOT_READY_TO_SEND: { |
| + RTC_DCHECK(worker_thread_->IsCurrent()); |
| + media_channel_->OnReadyToSend(false); |
| + break; |
| + } |
| + case MSG_NETWORK_ROUTE_CHANGED: { |
| + RTC_DCHECK(worker_thread_->IsCurrent()); |
| + NetworkRouteChanged* data = |
| + static_cast<NetworkRouteChanged*>(pmsg->pdata); |
| + media_channel()->OnNetworkRouteChanged(data->transport_name, |
| + data->network_route); |
| + delete data; |
| + break; |
| + } |
| } |
| } |
| void BaseChannel::FlushRtcpMessages() { |
| // Flush all remaining RTCP messages. This should only be called in |
| // destructor. |
| - ASSERT(rtc::Thread::Current() == worker_thread_); |
| - rtc::MessageList rtcp_messages; |
| - worker_thread_->Clear(this, MSG_RTCPPACKET, &rtcp_messages); |
| - for (rtc::MessageList::iterator it = rtcp_messages.begin(); |
| - it != rtcp_messages.end(); ++it) { |
| - worker_thread_->Send(this, MSG_RTCPPACKET, it->pdata); |
| - } |
| + network_thread_->Invoke<void>([this] { |
| + rtc::MessageList rtcp_messages; |
| + network_thread_->Clear(this, MSG_SENDING_RTCP_PACKET, &rtcp_messages); |
| + for (const auto& message : rtcp_messages) { |
| + network_thread_->Send(this, MSG_SENDING_RTCP_PACKET, message.pdata); |
| + } |
| + }); |
| } |
| -VoiceChannel::VoiceChannel(rtc::Thread* thread, |
| +VoiceChannel::VoiceChannel(rtc::Thread* worker_thread, |
| + rtc::Thread* network_thread, |
| MediaEngineInterface* media_engine, |
| VoiceMediaChannel* media_channel, |
| TransportController* transport_controller, |
| const std::string& content_name, |
| bool rtcp) |
| - : BaseChannel(thread, |
| + : BaseChannel(worker_thread, |
| + network_thread, |
| media_channel, |
| transport_controller, |
| content_name, |
| @@ -1487,7 +1582,12 @@ void VoiceChannel::OnChannelRead(TransportChannel* channel, |
| } |
| } |
| -void VoiceChannel::ChangeState() { |
| +void BaseChannel::ChangeState() { |
| + RTC_DCHECK(network_thread_->IsCurrent()); |
| + worker_thread_->Post(this, MSG_CHANGE_STATE); |
| +} |
| + |
| +void VoiceChannel::ChangeState_w() { |
| // Render incoming data if we're the active call, and we have the local |
| // content. We receive data on the default channel and multiplexed streams. |
| bool recv = IsReadyToReceive(); |
| @@ -1521,7 +1621,7 @@ bool VoiceChannel::SetLocalContent_w(const MediaContentDescription* content, |
| return false; |
| } |
| - if (!SetRtpTransportParameters_w(content, action, CS_LOCAL, error_desc)) { |
| + if (!SetRtpTransportParameters(content, action, CS_LOCAL, error_desc)) { |
| return false; |
| } |
| @@ -1547,7 +1647,7 @@ bool VoiceChannel::SetLocalContent_w(const MediaContentDescription* content, |
| } |
| set_local_content_direction(content->direction()); |
| - ChangeState(); |
| + ChangeState_w(); |
| return true; |
| } |
| @@ -1566,7 +1666,7 @@ bool VoiceChannel::SetRemoteContent_w(const MediaContentDescription* content, |
| return false; |
| } |
| - if (!SetRtpTransportParameters_w(content, action, CS_REMOTE, error_desc)) { |
| + if (!SetRtpTransportParameters(content, action, CS_REMOTE, error_desc)) { |
| return false; |
| } |
| @@ -1598,7 +1698,7 @@ bool VoiceChannel::SetRemoteContent_w(const MediaContentDescription* content, |
| } |
| set_remote_content_direction(content->direction()); |
| - ChangeState(); |
| + ChangeState_w(); |
| return true; |
| } |
| @@ -1656,12 +1756,14 @@ void VoiceChannel::GetSrtpCryptoSuites(std::vector<int>* crypto_suites) const { |
| GetSupportedAudioCryptoSuites(crypto_suites); |
| } |
| -VideoChannel::VideoChannel(rtc::Thread* thread, |
| +VideoChannel::VideoChannel(rtc::Thread* worker_thread, |
| + rtc::Thread* network_thread, |
| VideoMediaChannel* media_channel, |
| TransportController* transport_controller, |
| const std::string& content_name, |
| bool rtcp) |
| - : BaseChannel(thread, |
| + : BaseChannel(worker_thread, |
| + network_thread, |
| media_channel, |
| transport_controller, |
| content_name, |
| @@ -1723,7 +1825,8 @@ bool VideoChannel::SetRtpParameters_w(uint32_t ssrc, |
| webrtc::RtpParameters parameters) { |
| return media_channel()->SetRtpParameters(ssrc, parameters); |
| } |
| -void VideoChannel::ChangeState() { |
| + |
| +void VideoChannel::ChangeState_w() { |
| // Send outgoing data if we're the active call, we have the remote content, |
| // and we have had some form of connectivity. |
| bool send = IsReadyToSend(); |
| @@ -1775,7 +1878,7 @@ bool VideoChannel::SetLocalContent_w(const MediaContentDescription* content, |
| return false; |
| } |
| - if (!SetRtpTransportParameters_w(content, action, CS_LOCAL, error_desc)) { |
| + if (!SetRtpTransportParameters(content, action, CS_LOCAL, error_desc)) { |
| return false; |
| } |
| @@ -1801,7 +1904,7 @@ bool VideoChannel::SetLocalContent_w(const MediaContentDescription* content, |
| } |
| set_local_content_direction(content->direction()); |
| - ChangeState(); |
| + ChangeState_w(); |
| return true; |
| } |
| @@ -1820,8 +1923,7 @@ bool VideoChannel::SetRemoteContent_w(const MediaContentDescription* content, |
| return false; |
| } |
| - |
| - if (!SetRtpTransportParameters_w(content, action, CS_REMOTE, error_desc)) { |
| + if (!SetRtpTransportParameters(content, action, CS_REMOTE, error_desc)) { |
| return false; |
| } |
| @@ -1854,7 +1956,7 @@ bool VideoChannel::SetRemoteContent_w(const MediaContentDescription* content, |
| } |
| set_remote_content_direction(content->direction()); |
| - ChangeState(); |
| + ChangeState_w(); |
| return true; |
| } |
| @@ -1889,12 +1991,14 @@ void VideoChannel::GetSrtpCryptoSuites(std::vector<int>* crypto_suites) const { |
| GetSupportedVideoCryptoSuites(crypto_suites); |
| } |
| -DataChannel::DataChannel(rtc::Thread* thread, |
| +DataChannel::DataChannel(rtc::Thread* worker_thread, |
| + rtc::Thread* network_thread, |
| DataMediaChannel* media_channel, |
| TransportController* transport_controller, |
| const std::string& content_name, |
| bool rtcp) |
| - : BaseChannel(thread, |
| + : BaseChannel(worker_thread, |
| + network_thread, |
| media_channel, |
| transport_controller, |
| content_name, |
| @@ -1998,7 +2102,7 @@ bool DataChannel::SetLocalContent_w(const MediaContentDescription* content, |
| } |
| if (data_channel_type_ == DCT_RTP) { |
| - if (!SetRtpTransportParameters_w(content, action, CS_LOCAL, error_desc)) { |
| + if (!SetRtpTransportParameters(content, action, CS_LOCAL, error_desc)) { |
| return false; |
| } |
| } |
| @@ -2030,7 +2134,7 @@ bool DataChannel::SetLocalContent_w(const MediaContentDescription* content, |
| } |
| set_local_content_direction(content->direction()); |
| - ChangeState(); |
| + ChangeState_w(); |
| return true; |
| } |
| @@ -2060,7 +2164,7 @@ bool DataChannel::SetRemoteContent_w(const MediaContentDescription* content, |
| LOG(LS_INFO) << "Setting remote data description"; |
| if (data_channel_type_ == DCT_RTP && |
| - !SetRtpTransportParameters_w(content, action, CS_REMOTE, error_desc)) { |
| + !SetRtpTransportParameters(content, action, CS_REMOTE, error_desc)) { |
| return false; |
| } |
| @@ -2085,11 +2189,11 @@ bool DataChannel::SetRemoteContent_w(const MediaContentDescription* content, |
| } |
| set_remote_content_direction(content->direction()); |
| - ChangeState(); |
| + ChangeState_w(); |
| return true; |
| } |
| -void DataChannel::ChangeState() { |
| +void DataChannel::ChangeState_w() { |
| // Render incoming data if we're the active call, and we have the local |
| // content. We receive data on the default channel and multiplexed streams. |
| bool recv = IsReadyToReceive(); |