Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(316)

Unified Diff: webrtc/pc/channel.cc

Issue 1903393004: Added network thread to rtc::BaseChannel (Closed) Base URL: https://chromium.googlesource.com/external/webrtc.git@master
Patch Set: feedback Created 4 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: webrtc/pc/channel.cc
diff --git a/webrtc/pc/channel.cc b/webrtc/pc/channel.cc
index 4d47e878b8e2b8c7d7c858c439d822f9d95a4720..3ed352490787e09378caff304bc94640b38804df 100644
--- a/webrtc/pc/channel.cc
+++ b/webrtc/pc/channel.cc
@@ -37,17 +37,44 @@ bool SetRawAudioSink_w(VoiceMediaChannel* channel,
channel->SetRawAudioSink(ssrc, std::move(*sink));
return true;
}
+
+struct SendingPacketMessageData : public rtc::MessageData {
+ rtc::CopyOnWriteBuffer packet;
+ rtc::PacketOptions options;
+};
+
+struct ReceivedPacketMessageData : public rtc::MessageData {
+ rtc::CopyOnWriteBuffer packet;
+ rtc::PacketTime packet_time;
+};
+
+struct ChangeState : public rtc::MessageData {
+ bool send;
+ bool recv;
+};
+
+struct NetworkRouteChanged : public rtc::MessageData {
+ std::string transport_name;
+ rtc::NetworkRoute network_route;
+};
+
} // namespace
enum {
MSG_EARLYMEDIATIMEOUT = 1,
- MSG_RTPPACKET,
- MSG_RTCPPACKET,
+ MSG_SENDING_RTP_PACKET,
+ MSG_SENDING_RTCP_PACKET,
MSG_CHANNEL_ERROR,
MSG_READYTOSENDDATA,
MSG_DATARECEIVED,
MSG_FIRSTPACKETRECEIVED,
MSG_STREAMCLOSEDREMOTELY,
+ MSG_RECEIVED_RTP_PACKET,
+ MSG_RECEIVED_RTCP_PACKET,
+ MSG_CHANGE_STATE,
+ MSG_NOT_READY_TO_SEND,
+ MSG_READY_TO_SEND,
+ MSG_NETWORK_ROUTE_CHANGED,
};
// Value specified in RFC 5764.
@@ -61,11 +88,6 @@ static void SafeSetError(const std::string& message, std::string* error_desc) {
}
}
-struct PacketMessageData : public rtc::MessageData {
- rtc::CopyOnWriteBuffer packet;
- rtc::PacketOptions options;
-};
-
struct VoiceChannelErrorMessageData : public rtc::MessageData {
VoiceChannelErrorMessageData(uint32_t in_ssrc,
VoiceMediaChannel::Error in_error)
@@ -142,12 +164,14 @@ void RtpSendParametersFromMediaDescription(
send_params->max_bandwidth_bps = desc->bandwidth();
}
-BaseChannel::BaseChannel(rtc::Thread* thread,
+BaseChannel::BaseChannel(rtc::Thread* worker_thread,
+ rtc::Thread* network_thread,
MediaChannel* media_channel,
TransportController* transport_controller,
const std::string& content_name,
bool rtcp)
- : worker_thread_(thread),
+ : worker_thread_(worker_thread),
+ network_thread_(network_thread),
transport_controller_(transport_controller),
media_channel_(media_channel),
content_name_(content_name),
@@ -166,6 +190,9 @@ BaseChannel::BaseChannel(rtc::Thread* thread,
secure_required_(false),
rtp_abs_sendtime_extn_id_(-1) {
ASSERT(worker_thread_ == rtc::Thread::Current());
+ if (transport_controller) {
+ RTC_DCHECK_EQ(network_thread, transport_controller->worker_thread());
+ }
LOG(LS_INFO) << "Created channel for " << content_name;
}
@@ -182,49 +209,60 @@ BaseChannel::~BaseChannel() {
delete media_channel_;
// Note that we don't just call set_transport_channel(nullptr) because that
// would call a pure virtual method which we can't do from a destructor.
- if (transport_channel_) {
- DisconnectFromTransportChannel(transport_channel_);
- transport_controller_->DestroyTransportChannel_w(
- transport_name_, cricket::ICE_CANDIDATE_COMPONENT_RTP);
- }
- if (rtcp_transport_channel_) {
- DisconnectFromTransportChannel(rtcp_transport_channel_);
- transport_controller_->DestroyTransportChannel_w(
- transport_name_, cricket::ICE_CANDIDATE_COMPONENT_RTCP);
- }
+ network_thread_->Invoke<void>([this] {
pthatcher1 2016/04/29 23:36:23 It seems risky to have the BaseChannel destructor
pthatcher1 2016/04/29 23:36:23 I don't think we're allowed to invoke lambdas beca
danilchap 2016/05/02 14:50:34 Wasn't sure if storing in sendlist queue was count
danilchap 2016/05/02 14:50:34 Worker can be blocked on network thread, but netwo
pthatcher1 2016/05/11 04:50:01 That's a good point. It's probably worth bringing
pthatcher1 2016/05/11 04:50:01 That's probably the only thing that makes sense.
danilchap 2016/05/11 12:19:16 Expanded comment above BaseChannel definition
+ if (transport_channel_) {
+ DisconnectFromTransportChannel(transport_channel_);
+ transport_controller_->DestroyTransportChannel_w(
+ transport_name_, cricket::ICE_CANDIDATE_COMPONENT_RTP);
+ }
+ if (rtcp_transport_channel_) {
+ DisconnectFromTransportChannel(rtcp_transport_channel_);
+ transport_controller_->DestroyTransportChannel_w(
+ transport_name_, cricket::ICE_CANDIDATE_COMPONENT_RTCP);
+ }
+ network_thread_->Clear(this);
+ });
LOG(LS_INFO) << "Destroyed channel";
}
bool BaseChannel::Init() {
- if (!SetTransport(content_name())) {
- return false;
- }
+ bool setup_transport = network_thread_->Invoke<bool>([this] {
+ if (!SetTransport_n(content_name())) {
+ return false;
+ }
- if (!SetDtlsSrtpCryptoSuites(transport_channel(), false)) {
- return false;
- }
- if (rtcp_transport_enabled() &&
- !SetDtlsSrtpCryptoSuites(rtcp_transport_channel(), true)) {
+ if (!SetDtlsSrtpCryptoSuites(transport_channel(), false)) {
pthatcher1 2016/04/29 23:36:23 I think SetDtlsSrtpCryptoSuites, rtcp_transport_en
danilchap 2016/05/02 14:50:34 Old code access them on worker, not signaling thre
pthatcher1 2016/05/11 04:50:01 Ah, true. I was thrown off by Init(). It should
danilchap 2016/05/11 12:19:16 Init renamed to Init_w, few more function got thre
+ return false;
+ }
+ if (rtcp_transport_enabled() &&
+ !SetDtlsSrtpCryptoSuites(rtcp_transport_channel(), true)) {
+ return false;
+ }
+ return true;
+ });
+ if (!setup_transport) {
return false;
}
// Both RTP and RTCP channels are set, we can call SetInterface on
// media channel and it can set network options.
+ RTC_DCHECK(worker_thread_->IsCurrent());
media_channel_->SetInterface(this);
return true;
}
void BaseChannel::Deinit() {
+ RTC_DCHECK(worker_thread_->IsCurrent());
media_channel_->SetInterface(NULL);
}
bool BaseChannel::SetTransport(const std::string& transport_name) {
- return worker_thread_->Invoke<bool>(
- Bind(&BaseChannel::SetTransport_w, this, transport_name));
+ return network_thread_->Invoke<bool>(
+ Bind(&BaseChannel::SetTransport_n, this, transport_name));
}
-bool BaseChannel::SetTransport_w(const std::string& transport_name) {
- ASSERT(worker_thread_ == rtc::Thread::Current());
+bool BaseChannel::SetTransport_n(const std::string& transport_name) {
+ RTC_DCHECK(network_thread_->IsCurrent());
if (transport_name == transport_name_) {
// Nothing to do if transport name isn't changing
@@ -273,7 +311,7 @@ bool BaseChannel::SetTransport_w(const std::string& transport_name) {
}
void BaseChannel::set_transport_channel(TransportChannel* new_tc) {
- ASSERT(worker_thread_ == rtc::Thread::Current());
+ RTC_DCHECK(network_thread_->IsCurrent());
TransportChannel* old_tc = transport_channel_;
if (!old_tc && !new_tc) {
@@ -299,13 +337,13 @@ void BaseChannel::set_transport_channel(TransportChannel* new_tc) {
// Update aggregate writable/ready-to-send state between RTP and RTCP upon
// setting new channel
- UpdateWritableState_w();
+ UpdateWritableState_n();
SetReadyToSend(false, new_tc && new_tc->writable());
}
void BaseChannel::set_rtcp_transport_channel(TransportChannel* new_tc,
bool update_writablity) {
- ASSERT(worker_thread_ == rtc::Thread::Current());
+ RTC_DCHECK(network_thread_->IsCurrent());
TransportChannel* old_tc = rtcp_transport_channel_;
if (!old_tc && !new_tc) {
@@ -335,13 +373,13 @@ void BaseChannel::set_rtcp_transport_channel(TransportChannel* new_tc,
if (update_writablity) {
// Update aggregate writable/ready-to-send state between RTP and RTCP upon
// setting new channel
- UpdateWritableState_w();
+ UpdateWritableState_n();
SetReadyToSend(true, new_tc && new_tc->writable());
}
}
void BaseChannel::ConnectToTransportChannel(TransportChannel* tc) {
- ASSERT(worker_thread_ == rtc::Thread::Current());
+ RTC_DCHECK(network_thread_->IsCurrent());
tc->SignalWritableState.connect(this, &BaseChannel::OnWritableState);
tc->SignalReadPacket.connect(this, &BaseChannel::OnChannelRead);
@@ -352,7 +390,7 @@ void BaseChannel::ConnectToTransportChannel(TransportChannel* tc) {
}
void BaseChannel::DisconnectFromTransportChannel(TransportChannel* tc) {
- ASSERT(worker_thread_ == rtc::Thread::Current());
+ RTC_DCHECK(network_thread_->IsCurrent());
tc->SignalWritableState.disconnect(this);
tc->SignalReadPacket.disconnect(this);
@@ -405,8 +443,8 @@ void BaseChannel::StartConnectionMonitor(int cms) {
// We pass in the BaseChannel instead of the transport_channel_
// because if the transport_channel_ changes, the ConnectionMonitor
// would be pointing to the wrong TransportChannel.
- connection_monitor_.reset(new ConnectionMonitor(
- this, worker_thread(), rtc::Thread::Current()));
+ connection_monitor_.reset(
+ new ConnectionMonitor(this, network_thread(), rtc::Thread::Current()));
pthatcher1 2016/04/29 23:36:23 We should comment that this is because GetConnecti
danilchap 2016/05/02 14:50:34 Done.
connection_monitor_->SignalUpdate.connect(
this, &BaseChannel::OnConnectionMonitorUpdate);
connection_monitor_->Start(cms);
@@ -420,7 +458,7 @@ void BaseChannel::StopConnectionMonitor() {
}
bool BaseChannel::GetConnectionStats(ConnectionInfos* infos) {
- ASSERT(worker_thread_ == rtc::Thread::Current());
+ RTC_DCHECK(network_thread_->IsCurrent());
return transport_channel_->GetStats(infos);
}
@@ -450,25 +488,29 @@ bool BaseChannel::SendRtcp(rtc::CopyOnWriteBuffer* packet,
int BaseChannel::SetOption(SocketType type, rtc::Socket::Option opt,
int value) {
- TransportChannel* channel = NULL;
- switch (type) {
- case ST_RTP:
- channel = transport_channel_;
- socket_options_.push_back(
- std::pair<rtc::Socket::Option, int>(opt, value));
- break;
- case ST_RTCP:
- channel = rtcp_transport_channel_;
- rtcp_socket_options_.push_back(
- std::pair<rtc::Socket::Option, int>(opt, value));
- break;
- }
- return channel ? channel->SetOption(opt, value) : -1;
+ return network_thread_->Invoke<int>([this, type, opt, value] {
+ TransportChannel* channel = nullptr;
+ switch (type) {
+ case ST_RTP:
+ channel = transport_channel_;
+ socket_options_.push_back(
+ std::pair<rtc::Socket::Option, int>(opt, value));
+ break;
+ case ST_RTCP:
+ channel = rtcp_transport_channel_;
+ rtcp_socket_options_.push_back(
+ std::pair<rtc::Socket::Option, int>(opt, value));
+ break;
+ }
+ return channel ? channel->SetOption(opt, value) : -1;
pthatcher1 2016/04/29 23:36:23 Only this last part should be on the network threa
danilchap 2016/05/02 14:50:34 transport_channels created/destroyed/set on networ
pthatcher1 2016/05/11 04:50:01 Ah, that's a good point. transport_channel_ and r
+ });
}
void BaseChannel::OnWritableState(TransportChannel* channel) {
- ASSERT(channel == transport_channel_ || channel == rtcp_transport_channel_);
- UpdateWritableState_w();
+ RTC_DCHECK(channel == transport_channel_ ||
+ channel == rtcp_transport_channel_);
+ RTC_DCHECK(network_thread_->IsCurrent());
+ UpdateWritableState_n();
}
void BaseChannel::OnChannelRead(TransportChannel* channel,
@@ -477,7 +519,7 @@ void BaseChannel::OnChannelRead(TransportChannel* channel,
int flags) {
TRACE_EVENT0("webrtc", "BaseChannel::OnChannelRead");
// OnChannelRead gets called from P2PSocket; now pass data to MediaEngine
- ASSERT(worker_thread_ == rtc::Thread::Current());
+ RTC_DCHECK(network_thread_->IsCurrent());
// When using RTCP multiplexing we might get RTCP packets on the RTP
// transport. We feed RTP traffic into the demuxer to determine if it is RTCP.
@@ -512,33 +554,33 @@ void BaseChannel::OnSelectedCandidatePairChanged(
CandidatePairInterface* selected_candidate_pair,
int last_sent_packet_id) {
ASSERT(channel == transport_channel_ || channel == rtcp_transport_channel_);
- rtc::NetworkRoute network_route;
+ RTC_DCHECK(network_thread_->IsCurrent());
+ NetworkRouteChanged* message = new NetworkRouteChanged;
if (selected_candidate_pair) {
- network_route = rtc::NetworkRoute(
+ message->network_route = rtc::NetworkRoute(
selected_candidate_pair->local_candidate().network_id(),
selected_candidate_pair->remote_candidate().network_id(),
last_sent_packet_id);
}
- media_channel()->OnNetworkRouteChanged(channel->transport_name(),
- network_route);
+ message->transport_name = channel->transport_name();
+ worker_thread_->Post(this, MSG_NETWORK_ROUTE_CHANGED, message);
pthatcher1 2016/04/29 23:36:23 Can't we do an AsyncInvoke on the worker_thread_ w
danilchap 2016/05/02 14:50:34 Didn't know there was AsyncInvoke. Done.
}
void BaseChannel::SetReadyToSend(bool rtcp, bool ready) {
+ RTC_DCHECK(network_thread_->IsCurrent());
if (rtcp) {
rtcp_ready_to_send_ = ready;
} else {
rtp_ready_to_send_ = ready;
}
- if (rtp_ready_to_send_ &&
- // In the case of rtcp mux |rtcp_transport_channel_| will be null.
- (rtcp_ready_to_send_ || !rtcp_transport_channel_)) {
- // Notify the MediaChannel when both rtp and rtcp channel can send.
- media_channel_->OnReadyToSend(true);
- } else {
- // Notify the MediaChannel when either rtp or rtcp channel can't send.
- media_channel_->OnReadyToSend(false);
- }
+ bool ready_to_send =
+ (rtp_ready_to_send_ &&
+ // In the case of rtcp mux |rtcp_transport_channel_| will be null.
+ (rtcp_ready_to_send_ || !rtcp_transport_channel_));
+
+ worker_thread_->Post(
+ this, ready_to_send ? MSG_READY_TO_SEND : MSG_NOT_READY_TO_SEND);
pthatcher1 2016/04/29 23:36:23 Same here.
danilchap 2016/05/02 14:50:34 Done.
}
bool BaseChannel::PacketIsRtcp(const TransportChannel* channel,
@@ -550,22 +592,23 @@ bool BaseChannel::PacketIsRtcp(const TransportChannel* channel,
bool BaseChannel::SendPacket(bool rtcp,
rtc::CopyOnWriteBuffer* packet,
const rtc::PacketOptions& options) {
- // SendPacket gets called from MediaEngine, typically on an encoder thread.
- // If the thread is not our worker thread, we will post to our worker
- // so that the real work happens on our worker. This avoids us having to
+ // SendPacket gets called from MediaEngine, on a pacer or an encoder thread.
+ // If the thread is not our network thread, we will post to our network
+ // so that the real work happens on our network. This avoids us having to
// synchronize access to all the pieces of the send path, including
// SRTP and the inner workings of the transport channels.
// The only downside is that we can't return a proper failure code if
// needed. Since UDP is unreliable anyway, this should be a non-issue.
- if (rtc::Thread::Current() != worker_thread_) {
+ if (!network_thread_->IsCurrent()) {
// Avoid a copy by transferring the ownership of the packet data.
- int message_id = (!rtcp) ? MSG_RTPPACKET : MSG_RTCPPACKET;
- PacketMessageData* data = new PacketMessageData;
+ int message_id = rtcp ? MSG_SENDING_RTCP_PACKET : MSG_SENDING_RTP_PACKET;
+ SendingPacketMessageData* data = new SendingPacketMessageData;
pthatcher1 2016/04/29 23:36:23 Why is this "Sending" instead of "Send".
danilchap 2016/05/02 14:50:34 For no good reason. Changed.
data->packet = std::move(*packet);
data->options = options;
- worker_thread_->Post(this, message_id, data);
+ network_thread_->Post(this, message_id, data);
return true;
}
+ TRACE_EVENT0("webrtc", "BaseChannel::SendPacket");
// Now that we are on the correct thread, ensure we have a place to send this
// packet before doing anything. (We might get RTCP packets that we don't
@@ -589,6 +632,7 @@ bool BaseChannel::SendPacket(bool rtcp,
updated_options = options;
// Protect if needed.
if (srtp_filter_.IsActive()) {
+ TRACE_EVENT0("webrtc", "SRTP Encode");
bool res;
uint8_t* data = packet->data();
int len = static_cast<int>(packet->size());
@@ -656,9 +700,9 @@ bool BaseChannel::SendPacket(bool rtcp,
}
// Bon voyage.
- int ret =
- channel->SendPacket(packet->data<char>(), packet->size(), updated_options,
- (secure() && secure_dtls()) ? PF_SRTP_BYPASS : 0);
+ int flags = (secure() && secure_dtls()) ? PF_SRTP_BYPASS : PF_NORMAL;
+ int ret = channel->SendPacket(packet->data<char>(), packet->size(),
+ updated_options, flags);
if (ret != static_cast<int>(packet->size())) {
if (channel->GetError() == EWOULDBLOCK) {
LOG(LS_WARNING) << "Got EWOULDBLOCK from socket.";
@@ -687,6 +731,7 @@ bool BaseChannel::WantsPacket(bool rtcp, const rtc::CopyOnWriteBuffer* packet) {
void BaseChannel::HandlePacket(bool rtcp, rtc::CopyOnWriteBuffer* packet,
const rtc::PacketTime& packet_time) {
+ RTC_DCHECK(network_thread_->IsCurrent());
if (!WantsPacket(rtcp, packet)) {
return;
}
@@ -700,6 +745,7 @@ void BaseChannel::HandlePacket(bool rtcp, rtc::CopyOnWriteBuffer* packet,
// Unprotect the packet, if needed.
if (srtp_filter_.IsActive()) {
+ TRACE_EVENT0("webrtc", "SRTP Decode");
char* data = packet->data<char>();
int len = static_cast<int>(packet->size());
bool res;
@@ -743,12 +789,11 @@ void BaseChannel::HandlePacket(bool rtcp, rtc::CopyOnWriteBuffer* packet,
return;
}
- // Push it down to the media channel.
- if (!rtcp) {
- media_channel_->OnPacketReceived(packet, packet_time);
- } else {
- media_channel_->OnRtcpReceived(packet, packet_time);
- }
+ ReceivedPacketMessageData* message_data = new ReceivedPacketMessageData;
+ message_data->packet = std::move(*packet);
+ message_data->packet_time = packet_time;
+ int message_id = rtcp ? MSG_RECEIVED_RTCP_PACKET : MSG_RECEIVED_RTP_PACKET;
+ worker_thread_->Post(this, message_id, message_data);
pthatcher1 2016/04/29 23:36:23 Same here with AsyncInvoke
danilchap 2016/05/02 14:50:34 Done.
}
bool BaseChannel::PushdownLocalDescription(
@@ -786,7 +831,7 @@ void BaseChannel::EnableMedia_w() {
LOG(LS_INFO) << "Channel enabled";
enabled_ = true;
- ChangeState();
+ ChangeState_w();
}
void BaseChannel::DisableMedia_w() {
@@ -796,20 +841,20 @@ void BaseChannel::DisableMedia_w() {
LOG(LS_INFO) << "Channel disabled";
enabled_ = false;
- ChangeState();
+ ChangeState_w();
}
-void BaseChannel::UpdateWritableState_w() {
+void BaseChannel::UpdateWritableState_n() {
if (transport_channel_ && transport_channel_->writable() &&
(!rtcp_transport_channel_ || rtcp_transport_channel_->writable())) {
- ChannelWritable_w();
+ ChannelWritable_n();
} else {
- ChannelNotWritable_w();
+ ChannelNotWritable_n();
}
}
-void BaseChannel::ChannelWritable_w() {
- ASSERT(worker_thread_ == rtc::Thread::Current());
+void BaseChannel::ChannelWritable_n() {
+ RTC_DCHECK(network_thread_->IsCurrent());
if (writable_) {
return;
}
@@ -829,13 +874,15 @@ void BaseChannel::ChannelWritable_w() {
}
was_ever_writable_ = true;
- MaybeSetupDtlsSrtp_w();
+ MaybeSetupDtlsSrtp_n();
writable_ = true;
ChangeState();
}
-void BaseChannel::SignalDtlsSetupFailure_w(bool rtcp) {
- ASSERT(worker_thread() == rtc::Thread::Current());
+void BaseChannel::SignalDtlsSetupFailure_n(bool rtcp) {
+ RTC_DCHECK(network_thread_->IsCurrent());
+ RTC_NOTREACHED();
pthatcher1 2016/04/29 23:36:23 This isn't reached?
danilchap 2016/05/02 14:50:34 In practice it isn't reached. In theory it can be
+ // TODO(danilchap): Not allowed to invoke from network thread. Post instead.
signaling_thread()->Invoke<void>(Bind(
&BaseChannel::SignalDtlsSetupFailure_s, this, rtcp));
}
@@ -864,7 +911,8 @@ bool BaseChannel::ShouldSetupDtlsSrtp() const {
// This function returns true if either DTLS-SRTP is not in use
// *or* DTLS-SRTP is successfully set up.
-bool BaseChannel::SetupDtlsSrtp(bool rtcp_channel) {
+bool BaseChannel::SetupDtlsSrtp_n(bool rtcp_channel) {
+ RTC_DCHECK(network_thread_->IsCurrent());
bool ret = false;
TransportChannel* channel =
@@ -950,7 +998,7 @@ bool BaseChannel::SetupDtlsSrtp(bool rtcp_channel) {
return ret;
}
-void BaseChannel::MaybeSetupDtlsSrtp_w() {
+void BaseChannel::MaybeSetupDtlsSrtp_n() {
if (srtp_filter_.IsActive()) {
return;
}
@@ -959,21 +1007,21 @@ void BaseChannel::MaybeSetupDtlsSrtp_w() {
return;
}
- if (!SetupDtlsSrtp(false)) {
- SignalDtlsSetupFailure_w(false);
+ if (!SetupDtlsSrtp_n(false)) {
+ SignalDtlsSetupFailure_n(false);
return;
}
if (rtcp_transport_channel_) {
- if (!SetupDtlsSrtp(true)) {
- SignalDtlsSetupFailure_w(true);
+ if (!SetupDtlsSrtp_n(true)) {
+ SignalDtlsSetupFailure_n(true);
return;
}
}
}
-void BaseChannel::ChannelNotWritable_w() {
- ASSERT(worker_thread_ == rtc::Thread::Current());
+void BaseChannel::ChannelNotWritable_n() {
+ RTC_DCHECK(network_thread_->IsCurrent());
if (!writable_)
return;
@@ -982,7 +1030,7 @@ void BaseChannel::ChannelNotWritable_w() {
ChangeState();
}
-bool BaseChannel::SetRtpTransportParameters_w(
+bool BaseChannel::SetRtpTransportParameters(
const MediaContentDescription* content,
ContentAction action,
ContentSource src,
@@ -993,19 +1041,22 @@ bool BaseChannel::SetRtpTransportParameters_w(
}
// Cache secure_required_ for belt and suspenders check on SendPacket
- if (src == CS_LOCAL) {
- set_secure_required(content->crypto_required() != CT_NONE);
- }
+ return network_thread_->Invoke<bool>(
+ [this, content, action, src, error_desc] {
+ if (src == CS_LOCAL) {
+ set_secure_required(content->crypto_required() != CT_NONE);
+ }
pthatcher1 2016/04/29 23:36:23 I think set_secure_required needs to be on the sig
danilchap 2016/05/02 14:50:34 Why? secure_required accessor is used in tests onl
- if (!SetSrtp_w(content->cryptos(), action, src, error_desc)) {
- return false;
- }
+ if (!SetSrtp_n(content->cryptos(), action, src, error_desc)) {
+ return false;
+ }
- if (!SetRtcpMux_w(content->rtcp_mux(), action, src, error_desc)) {
- return false;
- }
+ if (!SetRtcpMux_n(content->rtcp_mux(), action, src, error_desc)) {
+ return false;
+ }
- return true;
+ return true;
+ });
}
// |dtls| will be set to true if DTLS is active for transport channel and
@@ -1022,7 +1073,7 @@ bool BaseChannel::CheckSrtpConfig(const std::vector<CryptoParams>& cryptos,
return true;
}
-bool BaseChannel::SetSrtp_w(const std::vector<CryptoParams>& cryptos,
+bool BaseChannel::SetSrtp_n(const std::vector<CryptoParams>& cryptos,
ContentAction action,
ContentSource src,
std::string* error_desc) {
@@ -1070,11 +1121,10 @@ bool BaseChannel::SetSrtp_w(const std::vector<CryptoParams>& cryptos,
}
void BaseChannel::ActivateRtcpMux() {
- worker_thread_->Invoke<void>(Bind(
- &BaseChannel::ActivateRtcpMux_w, this));
+ network_thread_->Invoke<void>(Bind(&BaseChannel::ActivateRtcpMux_n, this));
}
-void BaseChannel::ActivateRtcpMux_w() {
+void BaseChannel::ActivateRtcpMux_n() {
if (!rtcp_mux_filter_.IsActive()) {
rtcp_mux_filter_.SetActive();
set_rtcp_transport_channel(nullptr, true);
@@ -1082,7 +1132,8 @@ void BaseChannel::ActivateRtcpMux_w() {
}
}
-bool BaseChannel::SetRtcpMux_w(bool enable, ContentAction action,
+bool BaseChannel::SetRtcpMux_n(bool enable,
+ ContentAction action,
ContentSource src,
std::string* error_desc) {
bool ret = false;
@@ -1121,7 +1172,7 @@ bool BaseChannel::SetRtcpMux_w(bool enable, ContentAction action,
if (rtcp_mux_filter_.IsActive()) {
// If the RTP transport is already writable, then so are we.
if (transport_channel_->writable()) {
- ChannelWritable_w();
+ ChannelWritable_n();
}
}
@@ -1296,40 +1347,84 @@ void BaseChannel::MaybeCacheRtpAbsSendTimeHeaderExtension(
void BaseChannel::OnMessage(rtc::Message *pmsg) {
TRACE_EVENT0("webrtc", "BaseChannel::OnMessage");
switch (pmsg->message_id) {
- case MSG_RTPPACKET:
- case MSG_RTCPPACKET: {
- PacketMessageData* data = static_cast<PacketMessageData*>(pmsg->pdata);
- SendPacket(pmsg->message_id == MSG_RTCPPACKET, &data->packet,
- data->options);
- delete data; // because it is Posted
+ case MSG_SENDING_RTP_PACKET:
+ case MSG_SENDING_RTCP_PACKET: {
+ RTC_DCHECK(network_thread_->IsCurrent());
+ SendingPacketMessageData* data =
+ static_cast<SendingPacketMessageData*>(pmsg->pdata);
+ bool rtcp = pmsg->message_id == MSG_SENDING_RTCP_PACKET;
+ SendPacket(rtcp, &data->packet, data->options);
+ delete data;
break;
}
case MSG_FIRSTPACKETRECEIVED: {
SignalFirstPacketReceived(this);
break;
}
+ case MSG_RECEIVED_RTP_PACKET: {
+ RTC_DCHECK(worker_thread_->IsCurrent());
+ ReceivedPacketMessageData* data =
+ static_cast<ReceivedPacketMessageData*>(pmsg->pdata);
+ media_channel_->OnPacketReceived(&data->packet, data->packet_time);
+ delete data;
+ break;
+ }
+ case MSG_RECEIVED_RTCP_PACKET: {
+ RTC_DCHECK(worker_thread_->IsCurrent());
+ ReceivedPacketMessageData* data =
+ static_cast<ReceivedPacketMessageData*>(pmsg->pdata);
+ media_channel_->OnRtcpReceived(&data->packet, data->packet_time);
+ delete data;
+ break;
+ }
+ case MSG_CHANGE_STATE: {
+ RTC_DCHECK(worker_thread_->IsCurrent());
+ ChangeState_w();
+ break;
+ }
+ case MSG_READY_TO_SEND: {
+ RTC_DCHECK(worker_thread_->IsCurrent());
+ media_channel_->OnReadyToSend(true);
+ break;
+ }
+ case MSG_NOT_READY_TO_SEND: {
+ RTC_DCHECK(worker_thread_->IsCurrent());
+ media_channel_->OnReadyToSend(false);
+ break;
+ }
+ case MSG_NETWORK_ROUTE_CHANGED: {
+ RTC_DCHECK(worker_thread_->IsCurrent());
+ NetworkRouteChanged* data =
+ static_cast<NetworkRouteChanged*>(pmsg->pdata);
+ media_channel()->OnNetworkRouteChanged(data->transport_name,
+ data->network_route);
+ delete data;
+ break;
+ }
}
}
void BaseChannel::FlushRtcpMessages() {
// Flush all remaining RTCP messages. This should only be called in
// destructor.
- ASSERT(rtc::Thread::Current() == worker_thread_);
- rtc::MessageList rtcp_messages;
- worker_thread_->Clear(this, MSG_RTCPPACKET, &rtcp_messages);
- for (rtc::MessageList::iterator it = rtcp_messages.begin();
- it != rtcp_messages.end(); ++it) {
- worker_thread_->Send(this, MSG_RTCPPACKET, it->pdata);
- }
+ network_thread_->Invoke<void>([this] {
+ rtc::MessageList rtcp_messages;
+ network_thread_->Clear(this, MSG_SENDING_RTCP_PACKET, &rtcp_messages);
+ for (const auto& message : rtcp_messages) {
+ network_thread_->Send(this, MSG_SENDING_RTCP_PACKET, message.pdata);
+ }
+ });
}
-VoiceChannel::VoiceChannel(rtc::Thread* thread,
+VoiceChannel::VoiceChannel(rtc::Thread* worker_thread,
+ rtc::Thread* network_thread,
MediaEngineInterface* media_engine,
VoiceMediaChannel* media_channel,
TransportController* transport_controller,
const std::string& content_name,
bool rtcp)
- : BaseChannel(thread,
+ : BaseChannel(worker_thread,
+ network_thread,
media_channel,
transport_controller,
content_name,
@@ -1487,7 +1582,12 @@ void VoiceChannel::OnChannelRead(TransportChannel* channel,
}
}
-void VoiceChannel::ChangeState() {
+void BaseChannel::ChangeState() {
+ RTC_DCHECK(network_thread_->IsCurrent());
+ worker_thread_->Post(this, MSG_CHANGE_STATE);
+}
+
+void VoiceChannel::ChangeState_w() {
// Render incoming data if we're the active call, and we have the local
// content. We receive data on the default channel and multiplexed streams.
bool recv = IsReadyToReceive();
@@ -1521,7 +1621,7 @@ bool VoiceChannel::SetLocalContent_w(const MediaContentDescription* content,
return false;
}
- if (!SetRtpTransportParameters_w(content, action, CS_LOCAL, error_desc)) {
+ if (!SetRtpTransportParameters(content, action, CS_LOCAL, error_desc)) {
return false;
}
@@ -1547,7 +1647,7 @@ bool VoiceChannel::SetLocalContent_w(const MediaContentDescription* content,
}
set_local_content_direction(content->direction());
- ChangeState();
+ ChangeState_w();
return true;
}
@@ -1566,7 +1666,7 @@ bool VoiceChannel::SetRemoteContent_w(const MediaContentDescription* content,
return false;
}
- if (!SetRtpTransportParameters_w(content, action, CS_REMOTE, error_desc)) {
+ if (!SetRtpTransportParameters(content, action, CS_REMOTE, error_desc)) {
return false;
}
@@ -1598,7 +1698,7 @@ bool VoiceChannel::SetRemoteContent_w(const MediaContentDescription* content,
}
set_remote_content_direction(content->direction());
- ChangeState();
+ ChangeState_w();
return true;
}
@@ -1656,12 +1756,14 @@ void VoiceChannel::GetSrtpCryptoSuites(std::vector<int>* crypto_suites) const {
GetSupportedAudioCryptoSuites(crypto_suites);
}
-VideoChannel::VideoChannel(rtc::Thread* thread,
+VideoChannel::VideoChannel(rtc::Thread* worker_thread,
+ rtc::Thread* network_thread,
VideoMediaChannel* media_channel,
TransportController* transport_controller,
const std::string& content_name,
bool rtcp)
- : BaseChannel(thread,
+ : BaseChannel(worker_thread,
+ network_thread,
media_channel,
transport_controller,
content_name,
@@ -1723,7 +1825,8 @@ bool VideoChannel::SetRtpParameters_w(uint32_t ssrc,
webrtc::RtpParameters parameters) {
return media_channel()->SetRtpParameters(ssrc, parameters);
}
-void VideoChannel::ChangeState() {
+
+void VideoChannel::ChangeState_w() {
// Send outgoing data if we're the active call, we have the remote content,
// and we have had some form of connectivity.
bool send = IsReadyToSend();
@@ -1775,7 +1878,7 @@ bool VideoChannel::SetLocalContent_w(const MediaContentDescription* content,
return false;
}
- if (!SetRtpTransportParameters_w(content, action, CS_LOCAL, error_desc)) {
+ if (!SetRtpTransportParameters(content, action, CS_LOCAL, error_desc)) {
return false;
}
@@ -1801,7 +1904,7 @@ bool VideoChannel::SetLocalContent_w(const MediaContentDescription* content,
}
set_local_content_direction(content->direction());
- ChangeState();
+ ChangeState_w();
return true;
}
@@ -1820,8 +1923,7 @@ bool VideoChannel::SetRemoteContent_w(const MediaContentDescription* content,
return false;
}
-
- if (!SetRtpTransportParameters_w(content, action, CS_REMOTE, error_desc)) {
+ if (!SetRtpTransportParameters(content, action, CS_REMOTE, error_desc)) {
return false;
}
@@ -1854,7 +1956,7 @@ bool VideoChannel::SetRemoteContent_w(const MediaContentDescription* content,
}
set_remote_content_direction(content->direction());
- ChangeState();
+ ChangeState_w();
return true;
}
@@ -1889,12 +1991,14 @@ void VideoChannel::GetSrtpCryptoSuites(std::vector<int>* crypto_suites) const {
GetSupportedVideoCryptoSuites(crypto_suites);
}
-DataChannel::DataChannel(rtc::Thread* thread,
+DataChannel::DataChannel(rtc::Thread* worker_thread,
+ rtc::Thread* network_thread,
DataMediaChannel* media_channel,
TransportController* transport_controller,
const std::string& content_name,
bool rtcp)
- : BaseChannel(thread,
+ : BaseChannel(worker_thread,
+ network_thread,
media_channel,
transport_controller,
content_name,
@@ -1998,7 +2102,7 @@ bool DataChannel::SetLocalContent_w(const MediaContentDescription* content,
}
if (data_channel_type_ == DCT_RTP) {
- if (!SetRtpTransportParameters_w(content, action, CS_LOCAL, error_desc)) {
+ if (!SetRtpTransportParameters(content, action, CS_LOCAL, error_desc)) {
return false;
}
}
@@ -2030,7 +2134,7 @@ bool DataChannel::SetLocalContent_w(const MediaContentDescription* content,
}
set_local_content_direction(content->direction());
- ChangeState();
+ ChangeState_w();
return true;
}
@@ -2060,7 +2164,7 @@ bool DataChannel::SetRemoteContent_w(const MediaContentDescription* content,
LOG(LS_INFO) << "Setting remote data description";
if (data_channel_type_ == DCT_RTP &&
- !SetRtpTransportParameters_w(content, action, CS_REMOTE, error_desc)) {
+ !SetRtpTransportParameters(content, action, CS_REMOTE, error_desc)) {
return false;
}
@@ -2085,11 +2189,11 @@ bool DataChannel::SetRemoteContent_w(const MediaContentDescription* content,
}
set_remote_content_direction(content->direction());
- ChangeState();
+ ChangeState_w();
return true;
}
-void DataChannel::ChangeState() {
+void DataChannel::ChangeState_w() {
// Render incoming data if we're the active call, and we have the local
// content. We receive data on the default channel and multiplexed streams.
bool recv = IsReadyToReceive();
« no previous file with comments | « webrtc/pc/channel.h ('k') | webrtc/pc/channel_unittest.cc » ('j') | webrtc/pc/channelmanager.h » ('J')

Powered by Google App Engine
This is Rietveld 408576698