 Chromium Code Reviews
 Chromium Code Reviews Issue 2867943003:
  New class RtpDemuxer and RtpPacketSinkInterface, use in Call.  (Closed)
    
  
    Issue 2867943003:
  New class RtpDemuxer and RtpPacketSinkInterface, use in Call.  (Closed) 
  | Index: webrtc/call/call.cc | 
| diff --git a/webrtc/call/call.cc b/webrtc/call/call.cc | 
| index 0e64269d4e4c70391a00be65da2440778bb14197..6328c15f7d3fff5c0e2e8521ac8a0c9bd622bb5b 100644 | 
| --- a/webrtc/call/call.cc | 
| +++ b/webrtc/call/call.cc | 
| @@ -33,6 +33,7 @@ | 
| #include "webrtc/call/bitrate_allocator.h" | 
| #include "webrtc/call/call.h" | 
| #include "webrtc/call/flexfec_receive_stream_impl.h" | 
| +#include "webrtc/call/rtp_demuxer.h" | 
| #include "webrtc/call/rtp_transport_controller_send.h" | 
| #include "webrtc/config.h" | 
| #include "webrtc/logging/rtc_event_log/rtc_event_log.h" | 
| @@ -203,23 +204,19 @@ class Call : public webrtc::Call, | 
| std::unique_ptr<RWLockWrapper> receive_crit_; | 
| // Audio, Video, and FlexFEC receive streams are owned by the client that | 
| // creates them. | 
| - std::map<uint32_t, AudioReceiveStream*> audio_receive_ssrcs_ | 
| - GUARDED_BY(receive_crit_); | 
| - std::map<uint32_t, VideoReceiveStream*> video_receive_ssrcs_ | 
| + std::set<AudioReceiveStream*> audio_receive_streams_ | 
| GUARDED_BY(receive_crit_); | 
| std::set<VideoReceiveStream*> video_receive_streams_ | 
| GUARDED_BY(receive_crit_); | 
| - // Each media stream could conceivably be protected by multiple FlexFEC | 
| - // streams. | 
| - std::multimap<uint32_t, FlexfecReceiveStreamImpl*> | 
| - flexfec_receive_ssrcs_media_ GUARDED_BY(receive_crit_); | 
| - std::map<uint32_t, FlexfecReceiveStreamImpl*> | 
| - flexfec_receive_ssrcs_protection_ GUARDED_BY(receive_crit_); | 
| - std::set<FlexfecReceiveStreamImpl*> flexfec_receive_streams_ | 
| - GUARDED_BY(receive_crit_); | 
| + | 
| std::map<std::string, AudioReceiveStream*> sync_stream_mapping_ | 
| GUARDED_BY(receive_crit_); | 
| + // TODO(nisse): Should eventually be part of injected | 
| + // RtpTransportControllerReceive. | 
| + RtpDemuxer audio_rtp_demuxer_ GUARDED_BY(receive_crit_); | 
| + RtpDemuxer video_rtp_demuxer_ GUARDED_BY(receive_crit_); | 
| + | 
| // This extra map is used for receive processing which is | 
| // independent of media type. | 
| @@ -371,8 +368,7 @@ Call::~Call() { | 
| RTC_CHECK(audio_send_ssrcs_.empty()); | 
| RTC_CHECK(video_send_ssrcs_.empty()); | 
| RTC_CHECK(video_send_streams_.empty()); | 
| - RTC_CHECK(audio_receive_ssrcs_.empty()); | 
| - RTC_CHECK(video_receive_ssrcs_.empty()); | 
| + RTC_CHECK(audio_receive_streams_.empty()); | 
| RTC_CHECK(video_receive_streams_.empty()); | 
| pacer_thread_->Stop(); | 
| @@ -514,9 +510,9 @@ webrtc::AudioSendStream* Call::CreateAudioSendStream( | 
| } | 
| { | 
| ReadLockScoped read_lock(*receive_crit_); | 
| - for (const auto& kv : audio_receive_ssrcs_) { | 
| - if (kv.second->config().rtp.local_ssrc == config.rtp.ssrc) { | 
| - kv.second->AssociateSendStream(send_stream); | 
| + for (AudioReceiveStream* stream : audio_receive_streams_) { | 
| + if (stream->config().rtp.local_ssrc == config.rtp.ssrc) { | 
| + stream->AssociateSendStream(send_stream); | 
| } | 
| } | 
| } | 
| @@ -542,9 +538,9 @@ void Call::DestroyAudioSendStream(webrtc::AudioSendStream* send_stream) { | 
| } | 
| { | 
| ReadLockScoped read_lock(*receive_crit_); | 
| - for (const auto& kv : audio_receive_ssrcs_) { | 
| - if (kv.second->config().rtp.local_ssrc == ssrc) { | 
| - kv.second->AssociateSendStream(nullptr); | 
| + for (AudioReceiveStream* stream : audio_receive_streams_) { | 
| + if (stream->config().rtp.local_ssrc == ssrc) { | 
| + stream->AssociateSendStream(nullptr); | 
| } | 
| } | 
| } | 
| @@ -562,11 +558,10 @@ webrtc::AudioReceiveStream* Call::CreateAudioReceiveStream( | 
| config_.audio_state, event_log_); | 
| { | 
| WriteLockScoped write_lock(*receive_crit_); | 
| - RTC_DCHECK(audio_receive_ssrcs_.find(config.rtp.remote_ssrc) == | 
| - audio_receive_ssrcs_.end()); | 
| - audio_receive_ssrcs_[config.rtp.remote_ssrc] = receive_stream; | 
| + audio_rtp_demuxer_.AddSink(config.rtp.remote_ssrc, receive_stream); | 
| receive_rtp_config_[config.rtp.remote_ssrc] = | 
| ReceiveRtpConfig(config.rtp.extensions, UseSendSideBwe(config)); | 
| + audio_receive_streams_.insert(receive_stream); | 
| ConfigureSync(config.sync_group); | 
| } | 
| @@ -595,8 +590,9 @@ void Call::DestroyAudioReceiveStream( | 
| uint32_t ssrc = config.rtp.remote_ssrc; | 
| receive_side_cc_.GetRemoteBitrateEstimator(UseSendSideBwe(config)) | 
| ->RemoveStream(ssrc); | 
| - size_t num_deleted = audio_receive_ssrcs_.erase(ssrc); | 
| + size_t num_deleted = audio_rtp_demuxer_.RemoveSink(audio_receive_stream); | 
| RTC_DCHECK(num_deleted == 1); | 
| + audio_receive_streams_.erase(audio_receive_stream); | 
| const std::string& sync_group = audio_receive_stream->config().sync_group; | 
| const auto it = sync_stream_mapping_.find(sync_group); | 
| if (it != sync_stream_mapping_.end() && | 
| @@ -693,11 +689,9 @@ webrtc::VideoReceiveStream* Call::CreateVideoReceiveStream( | 
| UseSendSideBwe(config)); | 
| { | 
| WriteLockScoped write_lock(*receive_crit_); | 
| - RTC_DCHECK(video_receive_ssrcs_.find(config.rtp.remote_ssrc) == | 
| - video_receive_ssrcs_.end()); | 
| - video_receive_ssrcs_[config.rtp.remote_ssrc] = receive_stream; | 
| + video_rtp_demuxer_.AddSink(config.rtp.remote_ssrc, receive_stream); | 
| if (config.rtp.rtx_ssrc) { | 
| - video_receive_ssrcs_[config.rtp.rtx_ssrc] = receive_stream; | 
| + video_rtp_demuxer_.AddSink(config.rtp.rtx_ssrc, receive_stream); | 
| // We record identical config for the rtx stream as for the main | 
| // stream. Since the transport_send_cc negotiation is per payload | 
| // type, we may get an incorrect value for the rtx stream, but | 
| @@ -719,28 +713,22 @@ void Call::DestroyVideoReceiveStream( | 
| TRACE_EVENT0("webrtc", "Call::DestroyVideoReceiveStream"); | 
| RTC_DCHECK(configuration_thread_checker_.CalledOnValidThread()); | 
| RTC_DCHECK(receive_stream != nullptr); | 
| - VideoReceiveStream* receive_stream_impl = nullptr; | 
| + VideoReceiveStream* receive_stream_impl = | 
| + static_cast<VideoReceiveStream*>(receive_stream); | 
| + const VideoReceiveStream::Config& config = receive_stream_impl->config(); | 
| { | 
| WriteLockScoped write_lock(*receive_crit_); | 
| // Remove all ssrcs pointing to a receive stream. As RTX retransmits on a | 
| // separate SSRC there can be either one or two. | 
| - auto it = video_receive_ssrcs_.begin(); | 
| - while (it != video_receive_ssrcs_.end()) { | 
| - if (it->second == static_cast<VideoReceiveStream*>(receive_stream)) { | 
| - if (receive_stream_impl != nullptr) | 
| - RTC_DCHECK(receive_stream_impl == it->second); | 
| - receive_stream_impl = it->second; | 
| - receive_rtp_config_.erase(it->first); | 
| - it = video_receive_ssrcs_.erase(it); | 
| - } else { | 
| - ++it; | 
| - } | 
| + size_t num_deleted = video_rtp_demuxer_.RemoveSink(receive_stream_impl); | 
| + RTC_DCHECK_GE(num_deleted, 1); | 
| + receive_rtp_config_.erase(config.rtp.remote_ssrc); | 
| + if (config.rtp.rtx_ssrc) { | 
| + receive_rtp_config_.erase(config.rtp.rtx_ssrc); | 
| } | 
| video_receive_streams_.erase(receive_stream_impl); | 
| - RTC_CHECK(receive_stream_impl != nullptr); | 
| - ConfigureSync(receive_stream_impl->config().sync_group); | 
| + ConfigureSync(config.sync_group); | 
| } | 
| - const VideoReceiveStream::Config& config = receive_stream_impl->config(); | 
| receive_side_cc_.GetRemoteBitrateEstimator(UseSendSideBwe(config)) | 
| ->RemoveStream(config.rtp.remote_ssrc); | 
| @@ -761,17 +749,10 @@ FlexfecReceiveStream* Call::CreateFlexfecReceiveStream( | 
| { | 
| WriteLockScoped write_lock(*receive_crit_); | 
| - | 
| - RTC_DCHECK(flexfec_receive_streams_.find(receive_stream) == | 
| - flexfec_receive_streams_.end()); | 
| - flexfec_receive_streams_.insert(receive_stream); | 
| + video_rtp_demuxer_.AddSink(config.remote_ssrc, receive_stream); | 
| for (auto ssrc : config.protected_media_ssrcs) | 
| - flexfec_receive_ssrcs_media_.insert(std::make_pair(ssrc, receive_stream)); | 
| - | 
| - RTC_DCHECK(flexfec_receive_ssrcs_protection_.find(config.remote_ssrc) == | 
| - flexfec_receive_ssrcs_protection_.end()); | 
| - flexfec_receive_ssrcs_protection_[config.remote_ssrc] = receive_stream; | 
| + video_rtp_demuxer_.AddSink(ssrc, receive_stream); | 
| RTC_DCHECK(receive_rtp_config_.find(config.remote_ssrc) == | 
| receive_rtp_config_.end()); | 
| @@ -803,25 +784,9 @@ void Call::DestroyFlexfecReceiveStream(FlexfecReceiveStream* receive_stream) { | 
| // Remove all SSRCs pointing to the FlexfecReceiveStreamImpl to be | 
| // destroyed. | 
| - auto prot_it = flexfec_receive_ssrcs_protection_.begin(); | 
| - while (prot_it != flexfec_receive_ssrcs_protection_.end()) { | 
| - if (prot_it->second == receive_stream_impl) | 
| - prot_it = flexfec_receive_ssrcs_protection_.erase(prot_it); | 
| - else | 
| - ++prot_it; | 
| - } | 
| - auto media_it = flexfec_receive_ssrcs_media_.begin(); | 
| - while (media_it != flexfec_receive_ssrcs_media_.end()) { | 
| - if (media_it->second == receive_stream_impl) | 
| - media_it = flexfec_receive_ssrcs_media_.erase(media_it); | 
| - else | 
| - ++media_it; | 
| - } | 
| - | 
| + video_rtp_demuxer_.RemoveSink(receive_stream_impl); | 
| receive_side_cc_.GetRemoteBitrateEstimator(UseSendSideBwe(config)) | 
| ->RemoveStream(ssrc); | 
| - | 
| - flexfec_receive_streams_.erase(receive_stream_impl); | 
| } | 
| delete receive_stream_impl; | 
| @@ -908,11 +873,11 @@ void Call::SignalChannelNetworkState(MediaType media, NetworkState state) { | 
| } | 
| { | 
| ReadLockScoped read_lock(*receive_crit_); | 
| - for (auto& kv : audio_receive_ssrcs_) { | 
| - kv.second->SignalNetworkState(audio_network_state_); | 
| + for (AudioReceiveStream* audio_receive_stream : audio_receive_streams_) { | 
| + audio_receive_stream->SignalNetworkState(audio_network_state_); | 
| } | 
| - for (auto& kv : video_receive_ssrcs_) { | 
| - kv.second->SignalNetworkState(video_network_state_); | 
| + for (VideoReceiveStream* video_receive_stream : video_receive_streams_) { | 
| + video_receive_stream->SignalNetworkState(video_network_state_); | 
| } | 
| } | 
| } | 
| @@ -994,9 +959,9 @@ void Call::UpdateAggregateNetworkState() { | 
| } | 
| { | 
| ReadLockScoped read_lock(*receive_crit_); | 
| - if (audio_receive_ssrcs_.size() > 0) | 
| + if (audio_receive_streams_.size() > 0) | 
| have_audio = true; | 
| - if (video_receive_ssrcs_.size() > 0) | 
| + if (video_receive_streams_.size() > 0) | 
| have_video = true; | 
| } | 
| @@ -1087,15 +1052,15 @@ void Call::ConfigureSync(const std::string& sync_group) { | 
| sync_audio_stream = it->second; | 
| } else { | 
| // No configured audio stream, see if we can find one. | 
| - for (const auto& kv : audio_receive_ssrcs_) { | 
| - if (kv.second->config().sync_group == sync_group) { | 
| + for (AudioReceiveStream* stream : audio_receive_streams_) { | 
| + if (stream->config().sync_group == sync_group) { | 
| if (sync_audio_stream != nullptr) { | 
| LOG(LS_WARNING) << "Attempting to sync more than one audio stream " | 
| "within the same sync group. This is not " | 
| "supported in the current implementation."; | 
| break; | 
| } | 
| - sync_audio_stream = kv.second; | 
| + sync_audio_stream = stream; | 
| } | 
| } | 
| } | 
| @@ -1145,8 +1110,8 @@ PacketReceiver::DeliveryStatus Call::DeliverRtcp(MediaType media_type, | 
| } | 
| if (media_type == MediaType::ANY || media_type == MediaType::AUDIO) { | 
| ReadLockScoped read_lock(*receive_crit_); | 
| - for (auto& kv : audio_receive_ssrcs_) { | 
| - if (kv.second->DeliverRtcp(packet, length)) | 
| + for (AudioReceiveStream* stream : audio_receive_streams_) { | 
| + if (stream->DeliverRtcp(packet, length)) | 
| rtcp_delivered = true; | 
| } | 
| } | 
| @@ -1190,41 +1155,17 @@ PacketReceiver::DeliveryStatus Call::DeliverRtp(MediaType media_type, | 
| NotifyBweOfReceivedPacket(*parsed_packet, media_type); | 
| - uint32_t ssrc = parsed_packet->Ssrc(); | 
| - | 
| if (media_type == MediaType::AUDIO) { | 
| - auto it = audio_receive_ssrcs_.find(ssrc); | 
| - if (it != audio_receive_ssrcs_.end()) { | 
| + if (audio_rtp_demuxer_.OnRtpPacket(*parsed_packet)) { | 
| received_bytes_per_second_counter_.Add(static_cast<int>(length)); | 
| received_audio_bytes_per_second_counter_.Add(static_cast<int>(length)); | 
| - it->second->OnRtpPacket(*parsed_packet); | 
| event_log_->LogRtpHeader(kIncomingPacket, media_type, packet, length); | 
| return DELIVERY_OK; | 
| } | 
| - } | 
| - if (media_type == MediaType::VIDEO) { | 
| - auto it = video_receive_ssrcs_.find(ssrc); | 
| - if (it != video_receive_ssrcs_.end()) { | 
| + } else if (media_type == MediaType::VIDEO) { | 
| + if (video_rtp_demuxer_.OnRtpPacket(*parsed_packet)) { | 
| received_bytes_per_second_counter_.Add(static_cast<int>(length)); | 
| received_video_bytes_per_second_counter_.Add(static_cast<int>(length)); | 
| - it->second->OnRtpPacket(*parsed_packet); | 
| - | 
| - // Deliver media packets to FlexFEC subsystem. | 
| - auto it_bounds = flexfec_receive_ssrcs_media_.equal_range(ssrc); | 
| - for (auto it = it_bounds.first; it != it_bounds.second; ++it) | 
| - it->second->OnRtpPacket(*parsed_packet); | 
| - | 
| - event_log_->LogRtpHeader(kIncomingPacket, media_type, packet, length); | 
| - return DELIVERY_OK; | 
| - } | 
| - } | 
| - if (media_type == MediaType::VIDEO) { | 
| - received_bytes_per_second_counter_.Add(static_cast<int>(length)); | 
| - // TODO(brandtr): Update here when FlexFEC supports protecting audio. | 
| - received_video_bytes_per_second_counter_.Add(static_cast<int>(length)); | 
| - auto it = flexfec_receive_ssrcs_protection_.find(ssrc); | 
| - if (it != flexfec_receive_ssrcs_protection_.end()) { | 
| - it->second->OnRtpPacket(*parsed_packet); | 
| event_log_->LogRtpHeader(kIncomingPacket, media_type, packet, length); | 
| return DELIVERY_OK; | 
| } | 
| @@ -1250,12 +1191,19 @@ PacketReceiver::DeliveryStatus Call::DeliverPacket( | 
| // TODO(brandtr): Update this member function when we support protecting | 
| // audio packets with FlexFEC. | 
| bool Call::OnRecoveredPacket(const uint8_t* packet, size_t length) { | 
| +#if 0 | 
| uint32_t ssrc = ByteReader<uint32_t>::ReadBigEndian(&packet[8]); | 
| ReadLockScoped read_lock(*receive_crit_); | 
| auto it = video_receive_ssrcs_.find(ssrc); | 
| if (it == video_receive_ssrcs_.end()) | 
| return false; | 
| return it->second->OnRecoveredPacket(packet, length); | 
| +#else | 
| + // TODO(nisse): How should we handle this? It might make sense to | 
| + // parse packets here, add a "recovered" flag to RtpPacketReceived, | 
| + // and then just pass it on to video_rtp_demuxer_.OnRtpPacket? | 
| 
Taylor Brandstetter
2017/05/10 20:59:43
That makes sense. Or just have an "OnRecoveredRtpP
 | 
| + return false; | 
| +#endif | 
| } | 
| void Call::NotifyBweOfReceivedPacket(const RtpPacketReceived& packet, |