| Index: webrtc/pc/channel.cc
|
| diff --git a/webrtc/pc/channel.cc b/webrtc/pc/channel.cc
|
| index 4d47e878b8e2b8c7d7c858c439d822f9d95a4720..7c2acf87af796ee4d67adc691fa947c1a0da7d6e 100644
|
| --- a/webrtc/pc/channel.cc
|
| +++ b/webrtc/pc/channel.cc
|
| @@ -37,6 +37,22 @@ bool SetRawAudioSink_w(VoiceMediaChannel* channel,
|
| channel->SetRawAudioSink(ssrc, std::move(*sink));
|
| return true;
|
| }
|
| +
|
| +struct PacketMessageData : public rtc::MessageData {
|
| + rtc::CopyOnWriteBuffer packet;
|
| + rtc::PacketOptions options;
|
| +};
|
| +
|
| +struct IncomingPacketMessageData : public rtc::MessageData {
|
| + rtc::CopyOnWriteBuffer packet;
|
| + rtc::PacketTime packet_time;
|
| +};
|
| +
|
| +struct ChangeState : public rtc::MessageData {
|
| + bool send;
|
| + bool recv;
|
| +};
|
| +
|
| } // namespace
|
|
|
| enum {
|
| @@ -48,6 +64,11 @@ enum {
|
| MSG_DATARECEIVED,
|
| MSG_FIRSTPACKETRECEIVED,
|
| MSG_STREAMCLOSEDREMOTELY,
|
| + MSG_INCOMING_RTP_PACKET,
|
| + MSG_INCOMING_RTCP_PACKET,
|
| + MSG_CHANGE_STATE,
|
| + MSG_NOT_READY_TO_SEND,
|
| + MSG_READY_TO_SEND,
|
| };
|
|
|
| // Value specified in RFC 5764.
|
| @@ -61,11 +82,6 @@ static void SafeSetError(const std::string& message, std::string* error_desc) {
|
| }
|
| }
|
|
|
| -struct PacketMessageData : public rtc::MessageData {
|
| - rtc::CopyOnWriteBuffer packet;
|
| - rtc::PacketOptions options;
|
| -};
|
| -
|
| struct VoiceChannelErrorMessageData : public rtc::MessageData {
|
| VoiceChannelErrorMessageData(uint32_t in_ssrc,
|
| VoiceMediaChannel::Error in_error)
|
| @@ -142,12 +158,14 @@ void RtpSendParametersFromMediaDescription(
|
| send_params->max_bandwidth_bps = desc->bandwidth();
|
| }
|
|
|
| -BaseChannel::BaseChannel(rtc::Thread* thread,
|
| +BaseChannel::BaseChannel(rtc::Thread* worker_thread,
|
| + rtc::Thread* network_thread,
|
| MediaChannel* media_channel,
|
| TransportController* transport_controller,
|
| const std::string& content_name,
|
| bool rtcp)
|
| - : worker_thread_(thread),
|
| + : worker_thread_(worker_thread),
|
| + network_thread_(network_thread),
|
| transport_controller_(transport_controller),
|
| media_channel_(media_channel),
|
| content_name_(content_name),
|
| @@ -165,33 +183,37 @@ BaseChannel::BaseChannel(rtc::Thread* thread,
|
| dtls_keyed_(false),
|
| secure_required_(false),
|
| rtp_abs_sendtime_extn_id_(-1) {
|
| - ASSERT(worker_thread_ == rtc::Thread::Current());
|
| + RTC_DCHECK(worker_thread_->IsCurrent());
|
| + RTC_DCHECK_EQ(network_thread, transport_controller->network_thread());
|
| LOG(LS_INFO) << "Created channel for " << content_name;
|
| }
|
|
|
| BaseChannel::~BaseChannel() {
|
| TRACE_EVENT0("webrtc", "BaseChannel::~BaseChannel");
|
| - ASSERT(worker_thread_ == rtc::Thread::Current());
|
| + RTC_DCHECK(worker_thread_->IsCurrent());
|
| Deinit();
|
| StopConnectionMonitor();
|
| FlushRtcpMessages(); // Send any outstanding RTCP packets.
|
| worker_thread_->Clear(this); // eats any outstanding messages or packets
|
| + network_thread_->Clear(this);
|
| // We must destroy the media channel before the transport channel, otherwise
|
| // the media channel may try to send on the dead transport channel. NULLing
|
| // is not an effective strategy since the sends will come on another thread.
|
| delete media_channel_;
|
| // Note that we don't just call set_transport_channel(nullptr) because that
|
| // would call a pure virtual method which we can't do from a destructor.
|
| - if (transport_channel_) {
|
| - DisconnectFromTransportChannel(transport_channel_);
|
| - transport_controller_->DestroyTransportChannel_w(
|
| - transport_name_, cricket::ICE_CANDIDATE_COMPONENT_RTP);
|
| - }
|
| - if (rtcp_transport_channel_) {
|
| - DisconnectFromTransportChannel(rtcp_transport_channel_);
|
| - transport_controller_->DestroyTransportChannel_w(
|
| - transport_name_, cricket::ICE_CANDIDATE_COMPONENT_RTCP);
|
| - }
|
| + network_thread_->Invoke<void>([this] {
|
| + if (transport_channel_) {
|
| + DisconnectFromTransportChannel(transport_channel_);
|
| + transport_controller_->DestroyTransportChannel_n(
|
| + transport_name_, cricket::ICE_CANDIDATE_COMPONENT_RTP);
|
| + }
|
| + if (rtcp_transport_channel_) {
|
| + DisconnectFromTransportChannel(rtcp_transport_channel_);
|
| + transport_controller_->DestroyTransportChannel_n(
|
| + transport_name_, cricket::ICE_CANDIDATE_COMPONENT_RTCP);
|
| + }
|
| + });
|
| LOG(LS_INFO) << "Destroyed channel";
|
| }
|
|
|
| @@ -219,12 +241,12 @@ void BaseChannel::Deinit() {
|
| }
|
|
|
| bool BaseChannel::SetTransport(const std::string& transport_name) {
|
| - return worker_thread_->Invoke<bool>(
|
| - Bind(&BaseChannel::SetTransport_w, this, transport_name));
|
| + return network_thread_->Invoke<bool>(
|
| + Bind(&BaseChannel::SetTransport_n, this, transport_name));
|
| }
|
|
|
| -bool BaseChannel::SetTransport_w(const std::string& transport_name) {
|
| - ASSERT(worker_thread_ == rtc::Thread::Current());
|
| +bool BaseChannel::SetTransport_n(const std::string& transport_name) {
|
| + RTC_DCHECK(network_thread_->IsCurrent());
|
|
|
| if (transport_name == transport_name_) {
|
| // Nothing to do if transport name isn't changing
|
| @@ -245,18 +267,14 @@ bool BaseChannel::SetTransport_w(const std::string& transport_name) {
|
| if (rtcp_transport_enabled()) {
|
| LOG(LS_INFO) << "Create RTCP TransportChannel for " << content_name()
|
| << " on " << transport_name << " transport ";
|
| - set_rtcp_transport_channel(
|
| - transport_controller_->CreateTransportChannel_w(
|
| - transport_name, cricket::ICE_CANDIDATE_COMPONENT_RTCP),
|
| - false /* update_writablity */);
|
| + set_rtcp_transport_channel(&transport_name, false /* update_writablity */);
|
| if (!rtcp_transport_channel()) {
|
| return false;
|
| }
|
| }
|
|
|
| // We're not updating the writablity during the transition state.
|
| - set_transport_channel(transport_controller_->CreateTransportChannel_w(
|
| - transport_name, cricket::ICE_CANDIDATE_COMPONENT_RTP));
|
| + set_transport_channel(transport_name);
|
| if (!transport_channel()) {
|
| return false;
|
| }
|
| @@ -265,26 +283,28 @@ bool BaseChannel::SetTransport_w(const std::string& transport_name) {
|
| if (rtcp_transport_enabled()) {
|
| // We can only update the RTCP ready to send after set_transport_channel has
|
| // handled channel writability.
|
| - SetReadyToSend(
|
| + SetReadyToSend_n(
|
| true, rtcp_transport_channel() && rtcp_transport_channel()->writable());
|
| }
|
| transport_name_ = transport_name;
|
| return true;
|
| }
|
|
|
| -void BaseChannel::set_transport_channel(TransportChannel* new_tc) {
|
| - ASSERT(worker_thread_ == rtc::Thread::Current());
|
| +void BaseChannel::set_transport_channel(const std::string& transport_name) {
|
| + RTC_DCHECK(network_thread_->IsCurrent());
|
|
|
| + TransportChannel* new_tc = transport_controller_->CreateTransportChannel_n(
|
| + transport_name, cricket::ICE_CANDIDATE_COMPONENT_RTP);
|
| TransportChannel* old_tc = transport_channel_;
|
| if (!old_tc && !new_tc) {
|
| // Nothing to do
|
| return;
|
| }
|
| - ASSERT(old_tc != new_tc);
|
| + RTC_DCHECK_NE(old_tc, new_tc);
|
|
|
| if (old_tc) {
|
| DisconnectFromTransportChannel(old_tc);
|
| - transport_controller_->DestroyTransportChannel_w(
|
| + transport_controller_->DestroyTransportChannel_n(
|
| transport_name_, cricket::ICE_CANDIDATE_COMPONENT_RTP);
|
| }
|
|
|
| @@ -299,24 +319,30 @@ void BaseChannel::set_transport_channel(TransportChannel* new_tc) {
|
|
|
| // Update aggregate writable/ready-to-send state between RTP and RTCP upon
|
| // setting new channel
|
| - UpdateWritableState_w();
|
| - SetReadyToSend(false, new_tc && new_tc->writable());
|
| + UpdateWritableState_n();
|
| + SetReadyToSend_n(false, new_tc && new_tc->writable());
|
| }
|
|
|
| -void BaseChannel::set_rtcp_transport_channel(TransportChannel* new_tc,
|
| +void BaseChannel::set_rtcp_transport_channel(const std::string* transport_name,
|
| bool update_writablity) {
|
| - ASSERT(worker_thread_ == rtc::Thread::Current());
|
| + RTC_DCHECK(network_thread_->IsCurrent());
|
| +
|
| + TransportChannel* new_tc =
|
| + transport_name
|
| + ? transport_controller_->CreateTransportChannel_n(
|
| + *transport_name, cricket::ICE_CANDIDATE_COMPONENT_RTCP)
|
| + : nullptr;
|
|
|
| TransportChannel* old_tc = rtcp_transport_channel_;
|
| if (!old_tc && !new_tc) {
|
| // Nothing to do
|
| return;
|
| }
|
| - ASSERT(old_tc != new_tc);
|
| + RTC_DCHECK_NE(old_tc, new_tc);
|
|
|
| if (old_tc) {
|
| DisconnectFromTransportChannel(old_tc);
|
| - transport_controller_->DestroyTransportChannel_w(
|
| + transport_controller_->DestroyTransportChannel_n(
|
| transport_name_, cricket::ICE_CANDIDATE_COMPONENT_RTCP);
|
| }
|
|
|
| @@ -335,13 +361,13 @@ void BaseChannel::set_rtcp_transport_channel(TransportChannel* new_tc,
|
| if (update_writablity) {
|
| // Update aggregate writable/ready-to-send state between RTP and RTCP upon
|
| // setting new channel
|
| - UpdateWritableState_w();
|
| - SetReadyToSend(true, new_tc && new_tc->writable());
|
| + UpdateWritableState_n();
|
| + SetReadyToSend_n(true, new_tc && new_tc->writable());
|
| }
|
| }
|
|
|
| void BaseChannel::ConnectToTransportChannel(TransportChannel* tc) {
|
| - ASSERT(worker_thread_ == rtc::Thread::Current());
|
| + RTC_DCHECK(network_thread_->IsCurrent());
|
|
|
| tc->SignalWritableState.connect(this, &BaseChannel::OnWritableState);
|
| tc->SignalReadPacket.connect(this, &BaseChannel::OnChannelRead);
|
| @@ -352,7 +378,7 @@ void BaseChannel::ConnectToTransportChannel(TransportChannel* tc) {
|
| }
|
|
|
| void BaseChannel::DisconnectFromTransportChannel(TransportChannel* tc) {
|
| - ASSERT(worker_thread_ == rtc::Thread::Current());
|
| + RTC_DCHECK(network_thread_->IsCurrent());
|
|
|
| tc->SignalWritableState.disconnect(this);
|
| tc->SignalReadPacket.disconnect(this);
|
| @@ -405,8 +431,8 @@ void BaseChannel::StartConnectionMonitor(int cms) {
|
| // We pass in the BaseChannel instead of the transport_channel_
|
| // because if the transport_channel_ changes, the ConnectionMonitor
|
| // would be pointing to the wrong TransportChannel.
|
| - connection_monitor_.reset(new ConnectionMonitor(
|
| - this, worker_thread(), rtc::Thread::Current()));
|
| + connection_monitor_.reset(
|
| + new ConnectionMonitor(this, network_thread(), rtc::Thread::Current()));
|
| connection_monitor_->SignalUpdate.connect(
|
| this, &BaseChannel::OnConnectionMonitorUpdate);
|
| connection_monitor_->Start(cms);
|
| @@ -420,7 +446,7 @@ void BaseChannel::StopConnectionMonitor() {
|
| }
|
|
|
| bool BaseChannel::GetConnectionStats(ConnectionInfos* infos) {
|
| - ASSERT(worker_thread_ == rtc::Thread::Current());
|
| + RTC_DCHECK(network_thread_->IsCurrent());
|
| return transport_channel_->GetStats(infos);
|
| }
|
|
|
| @@ -450,25 +476,29 @@ bool BaseChannel::SendRtcp(rtc::CopyOnWriteBuffer* packet,
|
|
|
| int BaseChannel::SetOption(SocketType type, rtc::Socket::Option opt,
|
| int value) {
|
| - TransportChannel* channel = NULL;
|
| - switch (type) {
|
| - case ST_RTP:
|
| - channel = transport_channel_;
|
| - socket_options_.push_back(
|
| - std::pair<rtc::Socket::Option, int>(opt, value));
|
| - break;
|
| - case ST_RTCP:
|
| - channel = rtcp_transport_channel_;
|
| - rtcp_socket_options_.push_back(
|
| - std::pair<rtc::Socket::Option, int>(opt, value));
|
| - break;
|
| - }
|
| - return channel ? channel->SetOption(opt, value) : -1;
|
| + return network_thread_->Invoke<int>([this, type, opt, value] {
|
| + TransportChannel* channel = nullptr;
|
| + switch (type) {
|
| + case ST_RTP:
|
| + channel = transport_channel_;
|
| + socket_options_.push_back(
|
| + std::pair<rtc::Socket::Option, int>(opt, value));
|
| + break;
|
| + case ST_RTCP:
|
| + channel = rtcp_transport_channel_;
|
| + rtcp_socket_options_.push_back(
|
| + std::pair<rtc::Socket::Option, int>(opt, value));
|
| + break;
|
| + }
|
| + return channel ? channel->SetOption(opt, value) : -1;
|
| + });
|
| }
|
|
|
| void BaseChannel::OnWritableState(TransportChannel* channel) {
|
| - ASSERT(channel == transport_channel_ || channel == rtcp_transport_channel_);
|
| - UpdateWritableState_w();
|
| + RTC_DCHECK(channel == transport_channel_ ||
|
| + channel == rtcp_transport_channel_);
|
| + RTC_DCHECK(network_thread_->IsCurrent());
|
| + UpdateWritableState_n();
|
| }
|
|
|
| void BaseChannel::OnChannelRead(TransportChannel* channel,
|
| @@ -477,7 +507,7 @@ void BaseChannel::OnChannelRead(TransportChannel* channel,
|
| int flags) {
|
| TRACE_EVENT0("webrtc", "BaseChannel::OnChannelRead");
|
| // OnChannelRead gets called from P2PSocket; now pass data to MediaEngine
|
| - ASSERT(worker_thread_ == rtc::Thread::Current());
|
| + RTC_DCHECK(network_thread_->IsCurrent());
|
|
|
| // When using RTCP multiplexing we might get RTCP packets on the RTP
|
| // transport. We feed RTP traffic into the demuxer to determine if it is RTCP.
|
| @@ -488,7 +518,7 @@ void BaseChannel::OnChannelRead(TransportChannel* channel,
|
|
|
| void BaseChannel::OnReadyToSend(TransportChannel* channel) {
|
| ASSERT(channel == transport_channel_ || channel == rtcp_transport_channel_);
|
| - SetReadyToSend(channel == rtcp_transport_channel_, true);
|
| + SetReadyToSend_n(channel == rtcp_transport_channel_, true);
|
| }
|
|
|
| void BaseChannel::OnDtlsState(TransportChannel* channel,
|
| @@ -519,25 +549,29 @@ void BaseChannel::OnSelectedCandidatePairChanged(
|
| selected_candidate_pair->remote_candidate().network_id(),
|
| last_sent_packet_id);
|
| }
|
| + // TODO(danilchap): reroute this call to worker thread when it will start to
|
| + // do something.
|
| media_channel()->OnNetworkRouteChanged(channel->transport_name(),
|
| network_route);
|
| }
|
|
|
| -void BaseChannel::SetReadyToSend(bool rtcp, bool ready) {
|
| +void BaseChannel::SetReadyToSend_n(bool rtcp, bool ready) {
|
| if (rtcp) {
|
| rtcp_ready_to_send_ = ready;
|
| } else {
|
| rtp_ready_to_send_ = ready;
|
| }
|
|
|
| - if (rtp_ready_to_send_ &&
|
| - // In the case of rtcp mux |rtcp_transport_channel_| will be null.
|
| - (rtcp_ready_to_send_ || !rtcp_transport_channel_)) {
|
| - // Notify the MediaChannel when both rtp and rtcp channel can send.
|
| - media_channel_->OnReadyToSend(true);
|
| + bool ready_to_send =
|
| + (rtp_ready_to_send_ &&
|
| + // In the case of rtcp mux |rtcp_transport_channel_| will be null.
|
| + (rtcp_ready_to_send_ || !rtcp_transport_channel_));
|
| +
|
| + if (worker_thread_->IsCurrent()) {
|
| + media_channel_->OnReadyToSend(ready_to_send);
|
| } else {
|
| - // Notify the MediaChannel when either rtp or rtcp channel can't send.
|
| - media_channel_->OnReadyToSend(false);
|
| + worker_thread_->Post(
|
| + this, ready_to_send ? MSG_READY_TO_SEND : MSG_NOT_READY_TO_SEND);
|
| }
|
| }
|
|
|
| @@ -550,22 +584,24 @@ bool BaseChannel::PacketIsRtcp(const TransportChannel* channel,
|
| bool BaseChannel::SendPacket(bool rtcp,
|
| rtc::CopyOnWriteBuffer* packet,
|
| const rtc::PacketOptions& options) {
|
| - // SendPacket gets called from MediaEngine, typically on an encoder thread.
|
| - // If the thread is not our worker thread, we will post to our worker
|
| - // so that the real work happens on our worker. This avoids us having to
|
| + // SendPacket gets called from MediaEngine, on a pacer or an encoder thread.
|
| + // If the thread is not our network thread, we will post to our network
|
| + // so that the real work happens on our network. This avoids us having to
|
| // synchronize access to all the pieces of the send path, including
|
| // SRTP and the inner workings of the transport channels.
|
| // The only downside is that we can't return a proper failure code if
|
| - // needed. Since UDP is unreliable anyway, this should be a non-issue.
|
| - if (rtc::Thread::Current() != worker_thread_) {
|
| + // needed. Since UDP is unreliable anyway, this should be a non-issue. ///
|
| + // except for tests....
|
| + if (!network_thread_->IsCurrent()) {
|
| // Avoid a copy by transferring the ownership of the packet data.
|
| int message_id = (!rtcp) ? MSG_RTPPACKET : MSG_RTCPPACKET;
|
| PacketMessageData* data = new PacketMessageData;
|
| data->packet = std::move(*packet);
|
| data->options = options;
|
| - worker_thread_->Post(this, message_id, data);
|
| + network_thread_->Post(this, message_id, data);
|
| return true;
|
| }
|
| + TRACE_EVENT0("webrtc", "BaseChannel::SendPacket");
|
|
|
| // Now that we are on the correct thread, ensure we have a place to send this
|
| // packet before doing anything. (We might get RTCP packets that we don't
|
| @@ -589,6 +625,7 @@ bool BaseChannel::SendPacket(bool rtcp,
|
| updated_options = options;
|
| // Protect if needed.
|
| if (srtp_filter_.IsActive()) {
|
| + TRACE_EVENT0("webrtc", "SRTP");
|
| bool res;
|
| uint8_t* data = packet->data();
|
| int len = static_cast<int>(packet->size());
|
| @@ -650,19 +687,17 @@ bool BaseChannel::SendPacket(bool rtcp,
|
| // This is a double check for something that supposedly can't happen.
|
| LOG(LS_ERROR) << "Can't send outgoing " << PacketType(rtcp)
|
| << " packet when SRTP is inactive and crypto is required";
|
| -
|
| ASSERT(false);
|
| return false;
|
| }
|
|
|
| - // Bon voyage.
|
| - int ret =
|
| - channel->SendPacket(packet->data<char>(), packet->size(), updated_options,
|
| - (secure() && secure_dtls()) ? PF_SRTP_BYPASS : 0);
|
| + int flags = (secure() && secure_dtls()) ? PF_SRTP_BYPASS : PF_NORMAL;
|
| + int ret = channel->SendPacket(packet->data<char>(), packet->size(),
|
| + updated_options, flags);
|
| if (ret != static_cast<int>(packet->size())) {
|
| if (channel->GetError() == EWOULDBLOCK) {
|
| LOG(LS_WARNING) << "Got EWOULDBLOCK from socket.";
|
| - SetReadyToSend(rtcp, false);
|
| + SetReadyToSend_n(rtcp, false);
|
| }
|
| return false;
|
| }
|
| @@ -685,8 +720,10 @@ bool BaseChannel::WantsPacket(bool rtcp, const rtc::CopyOnWriteBuffer* packet) {
|
| return bundle_filter_.DemuxPacket(packet->data(), packet->size());
|
| }
|
|
|
| -void BaseChannel::HandlePacket(bool rtcp, rtc::CopyOnWriteBuffer* packet,
|
| +void BaseChannel::HandlePacket(bool rtcp,
|
| + rtc::CopyOnWriteBuffer* packet,
|
| const rtc::PacketTime& packet_time) {
|
| + RTC_DCHECK(network_thread_->IsCurrent());
|
| if (!WantsPacket(rtcp, packet)) {
|
| return;
|
| }
|
| @@ -700,6 +737,7 @@ void BaseChannel::HandlePacket(bool rtcp, rtc::CopyOnWriteBuffer* packet,
|
|
|
| // Unprotect the packet, if needed.
|
| if (srtp_filter_.IsActive()) {
|
| + TRACE_EVENT0("webrtc", "SRTP");
|
| char* data = packet->data<char>();
|
| int len = static_cast<int>(packet->size());
|
| bool res;
|
| @@ -743,11 +781,19 @@ void BaseChannel::HandlePacket(bool rtcp, rtc::CopyOnWriteBuffer* packet,
|
| return;
|
| }
|
|
|
| - // Push it down to the media channel.
|
| - if (!rtcp) {
|
| - media_channel_->OnPacketReceived(packet, packet_time);
|
| + if (worker_thread_->IsCurrent()) {
|
| + // Push it down to the media channel.
|
| + if (!rtcp) {
|
| + media_channel_->OnPacketReceived(packet, packet_time);
|
| + } else {
|
| + media_channel_->OnRtcpReceived(packet, packet_time);
|
| + }
|
| } else {
|
| - media_channel_->OnRtcpReceived(packet, packet_time);
|
| + IncomingPacketMessageData* message_data = new IncomingPacketMessageData;
|
| + message_data->packet = std::move(*packet);
|
| + message_data->packet_time = packet_time;
|
| + int message_id = rtcp ? MSG_INCOMING_RTCP_PACKET : MSG_INCOMING_RTP_PACKET;
|
| + worker_thread_->Post(this, message_id, message_data);
|
| }
|
| }
|
|
|
| @@ -786,30 +832,30 @@ void BaseChannel::EnableMedia_w() {
|
|
|
| LOG(LS_INFO) << "Channel enabled";
|
| enabled_ = true;
|
| - ChangeState();
|
| + ChangeState_w();
|
| }
|
|
|
| void BaseChannel::DisableMedia_w() {
|
| - ASSERT(worker_thread_ == rtc::Thread::Current());
|
| + RTC_DCHECK(worker_thread_->IsCurrent());
|
| if (!enabled_)
|
| return;
|
|
|
| LOG(LS_INFO) << "Channel disabled";
|
| enabled_ = false;
|
| - ChangeState();
|
| + ChangeState_w();
|
| }
|
|
|
| -void BaseChannel::UpdateWritableState_w() {
|
| +void BaseChannel::UpdateWritableState_n() {
|
| if (transport_channel_ && transport_channel_->writable() &&
|
| (!rtcp_transport_channel_ || rtcp_transport_channel_->writable())) {
|
| - ChannelWritable_w();
|
| + ChannelWritable_n();
|
| } else {
|
| - ChannelNotWritable_w();
|
| + ChannelNotWritable_n();
|
| }
|
| }
|
|
|
| -void BaseChannel::ChannelWritable_w() {
|
| - ASSERT(worker_thread_ == rtc::Thread::Current());
|
| +void BaseChannel::ChannelWritable_n() {
|
| + RTC_DCHECK(network_thread_->IsCurrent());
|
| if (writable_) {
|
| return;
|
| }
|
| @@ -829,19 +875,21 @@ void BaseChannel::ChannelWritable_w() {
|
| }
|
|
|
| was_ever_writable_ = true;
|
| - MaybeSetupDtlsSrtp_w();
|
| + MaybeSetupDtlsSrtp_n();
|
| writable_ = true;
|
| - ChangeState();
|
| + ChangeState_n();
|
| }
|
|
|
| -void BaseChannel::SignalDtlsSetupFailure_w(bool rtcp) {
|
| - ASSERT(worker_thread() == rtc::Thread::Current());
|
| +void BaseChannel::SignalDtlsSetupFailure_n(bool rtcp) {
|
| + RTC_DCHECK(network_thread_->IsCurrent());
|
| + RTC_NOTREACHED();
|
| + // TODO(danilchap): Not allowed to invoke from network thread. Post instead.
|
| signaling_thread()->Invoke<void>(Bind(
|
| &BaseChannel::SignalDtlsSetupFailure_s, this, rtcp));
|
| }
|
|
|
| void BaseChannel::SignalDtlsSetupFailure_s(bool rtcp) {
|
| - ASSERT(signaling_thread() == rtc::Thread::Current());
|
| + RTC_DCHECK(signaling_thread()->IsCurrent());
|
| SignalDtlsSetupFailure(this, rtcp);
|
| }
|
|
|
| @@ -864,7 +912,8 @@ bool BaseChannel::ShouldSetupDtlsSrtp() const {
|
|
|
| // This function returns true if either DTLS-SRTP is not in use
|
| // *or* DTLS-SRTP is successfully set up.
|
| -bool BaseChannel::SetupDtlsSrtp(bool rtcp_channel) {
|
| +bool BaseChannel::SetupDtlsSrtp_n(bool rtcp_channel) {
|
| + RTC_DCHECK(network_thread_->IsCurrent());
|
| bool ret = false;
|
|
|
| TransportChannel* channel =
|
| @@ -950,7 +999,7 @@ bool BaseChannel::SetupDtlsSrtp(bool rtcp_channel) {
|
| return ret;
|
| }
|
|
|
| -void BaseChannel::MaybeSetupDtlsSrtp_w() {
|
| +void BaseChannel::MaybeSetupDtlsSrtp_n() {
|
| if (srtp_filter_.IsActive()) {
|
| return;
|
| }
|
| @@ -959,30 +1008,30 @@ void BaseChannel::MaybeSetupDtlsSrtp_w() {
|
| return;
|
| }
|
|
|
| - if (!SetupDtlsSrtp(false)) {
|
| - SignalDtlsSetupFailure_w(false);
|
| + if (!SetupDtlsSrtp_n(false)) {
|
| + SignalDtlsSetupFailure_n(false);
|
| return;
|
| }
|
|
|
| if (rtcp_transport_channel_) {
|
| - if (!SetupDtlsSrtp(true)) {
|
| - SignalDtlsSetupFailure_w(true);
|
| + if (!SetupDtlsSrtp_n(true)) {
|
| + SignalDtlsSetupFailure_n(true);
|
| return;
|
| }
|
| }
|
| }
|
|
|
| -void BaseChannel::ChannelNotWritable_w() {
|
| - ASSERT(worker_thread_ == rtc::Thread::Current());
|
| +void BaseChannel::ChannelNotWritable_n() {
|
| + RTC_DCHECK(network_thread_->IsCurrent());
|
| if (!writable_)
|
| return;
|
|
|
| LOG(LS_INFO) << "Channel not writable (" << content_name_ << ")";
|
| writable_ = false;
|
| - ChangeState();
|
| + ChangeState_n();
|
| }
|
|
|
| -bool BaseChannel::SetRtpTransportParameters_w(
|
| +bool BaseChannel::SetRtpTransportParameters_n(
|
| const MediaContentDescription* content,
|
| ContentAction action,
|
| ContentSource src,
|
| @@ -997,11 +1046,11 @@ bool BaseChannel::SetRtpTransportParameters_w(
|
| set_secure_required(content->crypto_required() != CT_NONE);
|
| }
|
|
|
| - if (!SetSrtp_w(content->cryptos(), action, src, error_desc)) {
|
| + if (!SetSrtp_n(content->cryptos(), action, src, error_desc)) {
|
| return false;
|
| }
|
|
|
| - if (!SetRtcpMux_w(content->rtcp_mux(), action, src, error_desc)) {
|
| + if (!SetRtcpMux_n(content->rtcp_mux(), action, src, error_desc)) {
|
| return false;
|
| }
|
|
|
| @@ -1022,7 +1071,7 @@ bool BaseChannel::CheckSrtpConfig(const std::vector<CryptoParams>& cryptos,
|
| return true;
|
| }
|
|
|
| -bool BaseChannel::SetSrtp_w(const std::vector<CryptoParams>& cryptos,
|
| +bool BaseChannel::SetSrtp_n(const std::vector<CryptoParams>& cryptos,
|
| ContentAction action,
|
| ContentSource src,
|
| std::string* error_desc) {
|
| @@ -1070,11 +1119,10 @@ bool BaseChannel::SetSrtp_w(const std::vector<CryptoParams>& cryptos,
|
| }
|
|
|
| void BaseChannel::ActivateRtcpMux() {
|
| - worker_thread_->Invoke<void>(Bind(
|
| - &BaseChannel::ActivateRtcpMux_w, this));
|
| + network_thread_->Invoke<void>(Bind(&BaseChannel::ActivateRtcpMux_n, this));
|
| }
|
|
|
| -void BaseChannel::ActivateRtcpMux_w() {
|
| +void BaseChannel::ActivateRtcpMux_n() {
|
| if (!rtcp_mux_filter_.IsActive()) {
|
| rtcp_mux_filter_.SetActive();
|
| set_rtcp_transport_channel(nullptr, true);
|
| @@ -1082,7 +1130,8 @@ void BaseChannel::ActivateRtcpMux_w() {
|
| }
|
| }
|
|
|
| -bool BaseChannel::SetRtcpMux_w(bool enable, ContentAction action,
|
| +bool BaseChannel::SetRtcpMux_n(bool enable,
|
| + ContentAction action,
|
| ContentSource src,
|
| std::string* error_desc) {
|
| bool ret = false;
|
| @@ -1121,7 +1170,7 @@ bool BaseChannel::SetRtcpMux_w(bool enable, ContentAction action,
|
| if (rtcp_mux_filter_.IsActive()) {
|
| // If the RTP transport is already writable, then so are we.
|
| if (transport_channel_->writable()) {
|
| - ChannelWritable_w();
|
| + ChannelWritable_n();
|
| }
|
| }
|
|
|
| @@ -1298,6 +1347,7 @@ void BaseChannel::OnMessage(rtc::Message *pmsg) {
|
| switch (pmsg->message_id) {
|
| case MSG_RTPPACKET:
|
| case MSG_RTCPPACKET: {
|
| + RTC_DCHECK(network_thread_->IsCurrent());
|
| PacketMessageData* data = static_cast<PacketMessageData*>(pmsg->pdata);
|
| SendPacket(pmsg->message_id == MSG_RTCPPACKET, &data->packet,
|
| data->options);
|
| @@ -1308,28 +1358,62 @@ void BaseChannel::OnMessage(rtc::Message *pmsg) {
|
| SignalFirstPacketReceived(this);
|
| break;
|
| }
|
| + case MSG_INCOMING_RTP_PACKET: {
|
| + RTC_DCHECK(worker_thread_->IsCurrent());
|
| + IncomingPacketMessageData* data =
|
| + static_cast<IncomingPacketMessageData*>(pmsg->pdata);
|
| + media_channel_->OnPacketReceived(&data->packet, data->packet_time);
|
| + delete data;
|
| + break;
|
| + }
|
| + case MSG_INCOMING_RTCP_PACKET: {
|
| + RTC_DCHECK(worker_thread_->IsCurrent());
|
| + IncomingPacketMessageData* data =
|
| + static_cast<IncomingPacketMessageData*>(pmsg->pdata);
|
| + media_channel_->OnRtcpReceived(&data->packet, data->packet_time);
|
| + delete data;
|
| + break;
|
| + }
|
| + case MSG_CHANGE_STATE: {
|
| + RTC_DCHECK(worker_thread_->IsCurrent());
|
| + ChangeState_w();
|
| + break;
|
| + }
|
| + case MSG_READY_TO_SEND: {
|
| + RTC_DCHECK(worker_thread_->IsCurrent());
|
| + media_channel_->OnReadyToSend(true);
|
| + break;
|
| + }
|
| + case MSG_NOT_READY_TO_SEND: {
|
| + RTC_DCHECK(worker_thread_->IsCurrent());
|
| + media_channel_->OnReadyToSend(false);
|
| + break;
|
| + }
|
| }
|
| }
|
|
|
| void BaseChannel::FlushRtcpMessages() {
|
| // Flush all remaining RTCP messages. This should only be called in
|
| // destructor.
|
| - ASSERT(rtc::Thread::Current() == worker_thread_);
|
| - rtc::MessageList rtcp_messages;
|
| - worker_thread_->Clear(this, MSG_RTCPPACKET, &rtcp_messages);
|
| - for (rtc::MessageList::iterator it = rtcp_messages.begin();
|
| - it != rtcp_messages.end(); ++it) {
|
| - worker_thread_->Send(this, MSG_RTCPPACKET, it->pdata);
|
| - }
|
| + network_thread_->Invoke<void>([this] {
|
| + rtc::MessageList rtcp_messages;
|
| + network_thread_->Clear(this, MSG_RTCPPACKET, &rtcp_messages);
|
| + for (rtc::MessageList::iterator it = rtcp_messages.begin();
|
| + it != rtcp_messages.end(); ++it) {
|
| + network_thread_->Send(this, MSG_RTCPPACKET, it->pdata);
|
| + }
|
| + });
|
| }
|
|
|
| -VoiceChannel::VoiceChannel(rtc::Thread* thread,
|
| +VoiceChannel::VoiceChannel(rtc::Thread* worker_thread,
|
| + rtc::Thread* network_thread,
|
| MediaEngineInterface* media_engine,
|
| VoiceMediaChannel* media_channel,
|
| TransportController* transport_controller,
|
| const std::string& content_name,
|
| bool rtcp)
|
| - : BaseChannel(thread,
|
| + : BaseChannel(worker_thread,
|
| + network_thread,
|
| media_channel,
|
| transport_controller,
|
| content_name,
|
| @@ -1487,7 +1571,16 @@ void VoiceChannel::OnChannelRead(TransportChannel* channel,
|
| }
|
| }
|
|
|
| -void VoiceChannel::ChangeState() {
|
| +void BaseChannel::ChangeState_n() {
|
| + RTC_DCHECK(network_thread_->IsCurrent());
|
| + if (worker_thread_->IsCurrent()) {
|
| + ChangeState_w();
|
| + } else {
|
| + worker_thread_->Post(this, MSG_CHANGE_STATE);
|
| + }
|
| +}
|
| +
|
| +void VoiceChannel::ChangeState_w() {
|
| // Render incoming data if we're the active call, and we have the local
|
| // content. We receive data on the default channel and multiplexed streams.
|
| bool recv = IsReadyToReceive();
|
| @@ -1521,7 +1614,9 @@ bool VoiceChannel::SetLocalContent_w(const MediaContentDescription* content,
|
| return false;
|
| }
|
|
|
| - if (!SetRtpTransportParameters_w(content, action, CS_LOCAL, error_desc)) {
|
| + if (!InvokeOnNetwork(Bind(&BaseChannel::SetRtpTransportParameters_n,
|
| + static_cast<BaseChannel*>(this), content, action,
|
| + CS_LOCAL, error_desc))) {
|
| return false;
|
| }
|
|
|
| @@ -1547,7 +1642,7 @@ bool VoiceChannel::SetLocalContent_w(const MediaContentDescription* content,
|
| }
|
|
|
| set_local_content_direction(content->direction());
|
| - ChangeState();
|
| + ChangeState_w();
|
| return true;
|
| }
|
|
|
| @@ -1566,7 +1661,9 @@ bool VoiceChannel::SetRemoteContent_w(const MediaContentDescription* content,
|
| return false;
|
| }
|
|
|
| - if (!SetRtpTransportParameters_w(content, action, CS_REMOTE, error_desc)) {
|
| + if (!InvokeOnNetwork(Bind(&BaseChannel::SetRtpTransportParameters_n,
|
| + static_cast<BaseChannel*>(this), content, action,
|
| + CS_REMOTE, error_desc))) {
|
| return false;
|
| }
|
|
|
| @@ -1598,7 +1695,7 @@ bool VoiceChannel::SetRemoteContent_w(const MediaContentDescription* content,
|
| }
|
|
|
| set_remote_content_direction(content->direction());
|
| - ChangeState();
|
| + ChangeState_w();
|
| return true;
|
| }
|
|
|
| @@ -1656,12 +1753,14 @@ void VoiceChannel::GetSrtpCryptoSuites(std::vector<int>* crypto_suites) const {
|
| GetSupportedAudioCryptoSuites(crypto_suites);
|
| }
|
|
|
| -VideoChannel::VideoChannel(rtc::Thread* thread,
|
| +VideoChannel::VideoChannel(rtc::Thread* worker_thread,
|
| + rtc::Thread* network_thread,
|
| VideoMediaChannel* media_channel,
|
| TransportController* transport_controller,
|
| const std::string& content_name,
|
| bool rtcp)
|
| - : BaseChannel(thread,
|
| + : BaseChannel(worker_thread,
|
| + network_thread,
|
| media_channel,
|
| transport_controller,
|
| content_name,
|
| @@ -1723,7 +1822,8 @@ bool VideoChannel::SetRtpParameters_w(uint32_t ssrc,
|
| webrtc::RtpParameters parameters) {
|
| return media_channel()->SetRtpParameters(ssrc, parameters);
|
| }
|
| -void VideoChannel::ChangeState() {
|
| +
|
| +void VideoChannel::ChangeState_w() {
|
| // Send outgoing data if we're the active call, we have the remote content,
|
| // and we have had some form of connectivity.
|
| bool send = IsReadyToSend();
|
| @@ -1775,7 +1875,9 @@ bool VideoChannel::SetLocalContent_w(const MediaContentDescription* content,
|
| return false;
|
| }
|
|
|
| - if (!SetRtpTransportParameters_w(content, action, CS_LOCAL, error_desc)) {
|
| + if (!InvokeOnNetwork(Bind(&BaseChannel::SetRtpTransportParameters_n,
|
| + static_cast<BaseChannel*>(this), content, action,
|
| + CS_LOCAL, error_desc))) {
|
| return false;
|
| }
|
|
|
| @@ -1801,7 +1903,7 @@ bool VideoChannel::SetLocalContent_w(const MediaContentDescription* content,
|
| }
|
|
|
| set_local_content_direction(content->direction());
|
| - ChangeState();
|
| + ChangeState_w();
|
| return true;
|
| }
|
|
|
| @@ -1820,8 +1922,9 @@ bool VideoChannel::SetRemoteContent_w(const MediaContentDescription* content,
|
| return false;
|
| }
|
|
|
| -
|
| - if (!SetRtpTransportParameters_w(content, action, CS_REMOTE, error_desc)) {
|
| + if (!InvokeOnNetwork(Bind(&BaseChannel::SetRtpTransportParameters_n,
|
| + static_cast<BaseChannel*>(this), content, action,
|
| + CS_REMOTE, error_desc))) {
|
| return false;
|
| }
|
|
|
| @@ -1854,7 +1957,7 @@ bool VideoChannel::SetRemoteContent_w(const MediaContentDescription* content,
|
| }
|
|
|
| set_remote_content_direction(content->direction());
|
| - ChangeState();
|
| + ChangeState_w();
|
| return true;
|
| }
|
|
|
| @@ -1889,12 +1992,14 @@ void VideoChannel::GetSrtpCryptoSuites(std::vector<int>* crypto_suites) const {
|
| GetSupportedVideoCryptoSuites(crypto_suites);
|
| }
|
|
|
| -DataChannel::DataChannel(rtc::Thread* thread,
|
| +DataChannel::DataChannel(rtc::Thread* worker_thread,
|
| + rtc::Thread* network_thread,
|
| DataMediaChannel* media_channel,
|
| TransportController* transport_controller,
|
| const std::string& content_name,
|
| bool rtcp)
|
| - : BaseChannel(thread,
|
| + : BaseChannel(worker_thread,
|
| + network_thread,
|
| media_channel,
|
| transport_controller,
|
| content_name,
|
| @@ -1998,7 +2103,9 @@ bool DataChannel::SetLocalContent_w(const MediaContentDescription* content,
|
| }
|
|
|
| if (data_channel_type_ == DCT_RTP) {
|
| - if (!SetRtpTransportParameters_w(content, action, CS_LOCAL, error_desc)) {
|
| + if (!InvokeOnNetwork(Bind(&BaseChannel::SetRtpTransportParameters_n,
|
| + static_cast<BaseChannel*>(this), content, action,
|
| + CS_LOCAL, error_desc))) {
|
| return false;
|
| }
|
| }
|
| @@ -2030,7 +2137,7 @@ bool DataChannel::SetLocalContent_w(const MediaContentDescription* content,
|
| }
|
|
|
| set_local_content_direction(content->direction());
|
| - ChangeState();
|
| + ChangeState_w();
|
| return true;
|
| }
|
|
|
| @@ -2059,9 +2166,12 @@ bool DataChannel::SetRemoteContent_w(const MediaContentDescription* content,
|
| }
|
|
|
| LOG(LS_INFO) << "Setting remote data description";
|
| - if (data_channel_type_ == DCT_RTP &&
|
| - !SetRtpTransportParameters_w(content, action, CS_REMOTE, error_desc)) {
|
| - return false;
|
| + if (data_channel_type_ == DCT_RTP) {
|
| + if (!InvokeOnNetwork(Bind(&BaseChannel::SetRtpTransportParameters_n,
|
| + static_cast<BaseChannel*>(this), content, action,
|
| + CS_REMOTE, error_desc))) {
|
| + return false;
|
| + }
|
| }
|
|
|
|
|
| @@ -2085,11 +2195,11 @@ bool DataChannel::SetRemoteContent_w(const MediaContentDescription* content,
|
| }
|
|
|
| set_remote_content_direction(content->direction());
|
| - ChangeState();
|
| + ChangeState_w();
|
| return true;
|
| }
|
|
|
| -void DataChannel::ChangeState() {
|
| +void DataChannel::ChangeState_w() {
|
| // Render incoming data if we're the active call, and we have the local
|
| // content. We receive data on the default channel and multiplexed streams.
|
| bool recv = IsReadyToReceive();
|
|
|