Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(96)

Side by Side Diff: webrtc/call/call.cc

Issue 2867943003: New class RtpDemuxer and RtpPacketSinkInterface, use in Call. (Closed)
Patch Set: Fix Flexfec recursion issue in a different way. Created 3 years, 7 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 /* 1 /*
2 * Copyright (c) 2013 The WebRTC project authors. All Rights Reserved. 2 * Copyright (c) 2013 The WebRTC project authors. All Rights Reserved.
3 * 3 *
4 * Use of this source code is governed by a BSD-style license 4 * Use of this source code is governed by a BSD-style license
5 * that can be found in the LICENSE file in the root of the source 5 * that can be found in the LICENSE file in the root of the source
6 * tree. An additional intellectual property rights grant can be found 6 * tree. An additional intellectual property rights grant can be found
7 * in the file PATENTS. All contributing project authors may 7 * in the file PATENTS. All contributing project authors may
8 * be found in the AUTHORS file in the root of the source tree. 8 * be found in the AUTHORS file in the root of the source tree.
9 */ 9 */
10 10
(...skipping 16 matching lines...) Expand all
27 #include "webrtc/base/logging.h" 27 #include "webrtc/base/logging.h"
28 #include "webrtc/base/optional.h" 28 #include "webrtc/base/optional.h"
29 #include "webrtc/base/ptr_util.h" 29 #include "webrtc/base/ptr_util.h"
30 #include "webrtc/base/task_queue.h" 30 #include "webrtc/base/task_queue.h"
31 #include "webrtc/base/thread_annotations.h" 31 #include "webrtc/base/thread_annotations.h"
32 #include "webrtc/base/thread_checker.h" 32 #include "webrtc/base/thread_checker.h"
33 #include "webrtc/base/trace_event.h" 33 #include "webrtc/base/trace_event.h"
34 #include "webrtc/call/bitrate_allocator.h" 34 #include "webrtc/call/bitrate_allocator.h"
35 #include "webrtc/call/call.h" 35 #include "webrtc/call/call.h"
36 #include "webrtc/call/flexfec_receive_stream_impl.h" 36 #include "webrtc/call/flexfec_receive_stream_impl.h"
37 #include "webrtc/call/rtp_demuxer.h"
37 #include "webrtc/call/rtp_transport_controller_send.h" 38 #include "webrtc/call/rtp_transport_controller_send.h"
38 #include "webrtc/config.h" 39 #include "webrtc/config.h"
39 #include "webrtc/logging/rtc_event_log/rtc_event_log.h" 40 #include "webrtc/logging/rtc_event_log/rtc_event_log.h"
40 #include "webrtc/modules/bitrate_controller/include/bitrate_controller.h" 41 #include "webrtc/modules/bitrate_controller/include/bitrate_controller.h"
41 #include "webrtc/modules/congestion_controller/include/receive_side_congestion_c ontroller.h" 42 #include "webrtc/modules/congestion_controller/include/receive_side_congestion_c ontroller.h"
42 #include "webrtc/modules/pacing/paced_sender.h" 43 #include "webrtc/modules/pacing/paced_sender.h"
43 #include "webrtc/modules/rtp_rtcp/include/flexfec_receiver.h" 44 #include "webrtc/modules/rtp_rtcp/include/flexfec_receiver.h"
44 #include "webrtc/modules/rtp_rtcp/include/rtp_header_parser.h" 45 #include "webrtc/modules/rtp_rtcp/include/rtp_header_parser.h"
45 #include "webrtc/modules/rtp_rtcp/source/byte_io.h" 46 #include "webrtc/modules/rtp_rtcp/source/byte_io.h"
46 #include "webrtc/modules/rtp_rtcp/source/rtp_header_extension.h" 47 #include "webrtc/modules/rtp_rtcp/source/rtp_header_extension.h"
(...skipping 150 matching lines...) Expand 10 before | Expand all | Expand 10 after
197 const std::unique_ptr<BitrateAllocator> bitrate_allocator_; 198 const std::unique_ptr<BitrateAllocator> bitrate_allocator_;
198 Call::Config config_; 199 Call::Config config_;
199 rtc::ThreadChecker configuration_thread_checker_; 200 rtc::ThreadChecker configuration_thread_checker_;
200 201
201 NetworkState audio_network_state_; 202 NetworkState audio_network_state_;
202 NetworkState video_network_state_; 203 NetworkState video_network_state_;
203 204
204 std::unique_ptr<RWLockWrapper> receive_crit_; 205 std::unique_ptr<RWLockWrapper> receive_crit_;
205 // Audio, Video, and FlexFEC receive streams are owned by the client that 206 // Audio, Video, and FlexFEC receive streams are owned by the client that
206 // creates them. 207 // creates them.
207 std::map<uint32_t, AudioReceiveStream*> audio_receive_ssrcs_ 208 std::set<AudioReceiveStream*> audio_receive_streams_
208 GUARDED_BY(receive_crit_);
209 std::map<uint32_t, VideoReceiveStream*> video_receive_ssrcs_
210 GUARDED_BY(receive_crit_); 209 GUARDED_BY(receive_crit_);
211 std::set<VideoReceiveStream*> video_receive_streams_ 210 std::set<VideoReceiveStream*> video_receive_streams_
212 GUARDED_BY(receive_crit_); 211 GUARDED_BY(receive_crit_);
213 // Each media stream could conceivably be protected by multiple FlexFEC 212
214 // streams.
215 std::multimap<uint32_t, FlexfecReceiveStreamImpl*>
216 flexfec_receive_ssrcs_media_ GUARDED_BY(receive_crit_);
217 std::map<uint32_t, FlexfecReceiveStreamImpl*>
218 flexfec_receive_ssrcs_protection_ GUARDED_BY(receive_crit_);
219 std::set<FlexfecReceiveStreamImpl*> flexfec_receive_streams_
220 GUARDED_BY(receive_crit_);
221 std::map<std::string, AudioReceiveStream*> sync_stream_mapping_ 213 std::map<std::string, AudioReceiveStream*> sync_stream_mapping_
222 GUARDED_BY(receive_crit_); 214 GUARDED_BY(receive_crit_);
223 215
216 // TODO(nisse): Should eventually be part of injected
217 // RtpTransportControllerReceive, with a single demuxer in the bundled case.
218 RtpDemuxer audio_rtp_demuxer_ GUARDED_BY(receive_crit_);
219 RtpDemuxer video_rtp_demuxer_ GUARDED_BY(receive_crit_);
220
224 // This extra map is used for receive processing which is 221 // This extra map is used for receive processing which is
225 // independent of media type. 222 // independent of media type.
226 223
227 // TODO(nisse): In the RTP transport refactoring, we should have a 224 // TODO(nisse): In the RTP transport refactoring, we should have a
228 // single mapping from ssrc to a more abstract receive stream, with 225 // single mapping from ssrc to a more abstract receive stream, with
229 // accessor methods for all configuration we need at this level. 226 // accessor methods for all configuration we need at this level.
230 struct ReceiveRtpConfig { 227 struct ReceiveRtpConfig {
231 ReceiveRtpConfig() = default; // Needed by std::map 228 ReceiveRtpConfig() = default; // Needed by std::map
232 ReceiveRtpConfig(const std::vector<RtpExtension>& extensions, 229 ReceiveRtpConfig(const std::vector<RtpExtension>& extensions,
233 bool use_send_side_bwe) 230 bool use_send_side_bwe)
(...skipping 136 matching lines...) Expand 10 before | Expand all | Expand 10 after
370 367
371 pacer_thread_->Start(); 368 pacer_thread_->Start();
372 } 369 }
373 370
374 Call::~Call() { 371 Call::~Call() {
375 RTC_DCHECK(configuration_thread_checker_.CalledOnValidThread()); 372 RTC_DCHECK(configuration_thread_checker_.CalledOnValidThread());
376 373
377 RTC_CHECK(audio_send_ssrcs_.empty()); 374 RTC_CHECK(audio_send_ssrcs_.empty());
378 RTC_CHECK(video_send_ssrcs_.empty()); 375 RTC_CHECK(video_send_ssrcs_.empty());
379 RTC_CHECK(video_send_streams_.empty()); 376 RTC_CHECK(video_send_streams_.empty());
380 RTC_CHECK(audio_receive_ssrcs_.empty()); 377 RTC_CHECK(audio_receive_streams_.empty());
381 RTC_CHECK(video_receive_ssrcs_.empty());
382 RTC_CHECK(video_receive_streams_.empty()); 378 RTC_CHECK(video_receive_streams_.empty());
383 379
384 pacer_thread_->Stop(); 380 pacer_thread_->Stop();
385 pacer_thread_->DeRegisterModule(transport_send_->send_side_cc()->pacer()); 381 pacer_thread_->DeRegisterModule(transport_send_->send_side_cc()->pacer());
386 pacer_thread_->DeRegisterModule( 382 pacer_thread_->DeRegisterModule(
387 receive_side_cc_.GetRemoteBitrateEstimator(true)); 383 receive_side_cc_.GetRemoteBitrateEstimator(true));
388 module_process_thread_->DeRegisterModule(transport_send_->send_side_cc()); 384 module_process_thread_->DeRegisterModule(transport_send_->send_side_cc());
389 module_process_thread_->DeRegisterModule(&receive_side_cc_); 385 module_process_thread_->DeRegisterModule(&receive_side_cc_);
390 module_process_thread_->DeRegisterModule(call_stats_.get()); 386 module_process_thread_->DeRegisterModule(call_stats_.get());
391 module_process_thread_->Stop(); 387 module_process_thread_->Stop();
(...skipping 121 matching lines...) Expand 10 before | Expand all | Expand 10 after
513 config, config_.audio_state, &worker_queue_, transport_send_.get(), 509 config, config_.audio_state, &worker_queue_, transport_send_.get(),
514 bitrate_allocator_.get(), event_log_, call_stats_->rtcp_rtt_stats()); 510 bitrate_allocator_.get(), event_log_, call_stats_->rtcp_rtt_stats());
515 { 511 {
516 WriteLockScoped write_lock(*send_crit_); 512 WriteLockScoped write_lock(*send_crit_);
517 RTC_DCHECK(audio_send_ssrcs_.find(config.rtp.ssrc) == 513 RTC_DCHECK(audio_send_ssrcs_.find(config.rtp.ssrc) ==
518 audio_send_ssrcs_.end()); 514 audio_send_ssrcs_.end());
519 audio_send_ssrcs_[config.rtp.ssrc] = send_stream; 515 audio_send_ssrcs_[config.rtp.ssrc] = send_stream;
520 } 516 }
521 { 517 {
522 ReadLockScoped read_lock(*receive_crit_); 518 ReadLockScoped read_lock(*receive_crit_);
523 for (const auto& kv : audio_receive_ssrcs_) { 519 for (AudioReceiveStream* stream : audio_receive_streams_) {
524 if (kv.second->config().rtp.local_ssrc == config.rtp.ssrc) { 520 if (stream->config().rtp.local_ssrc == config.rtp.ssrc) {
525 kv.second->AssociateSendStream(send_stream); 521 stream->AssociateSendStream(send_stream);
526 } 522 }
527 } 523 }
528 } 524 }
529 send_stream->SignalNetworkState(audio_network_state_); 525 send_stream->SignalNetworkState(audio_network_state_);
530 UpdateAggregateNetworkState(); 526 UpdateAggregateNetworkState();
531 return send_stream; 527 return send_stream;
532 } 528 }
533 529
534 void Call::DestroyAudioSendStream(webrtc::AudioSendStream* send_stream) { 530 void Call::DestroyAudioSendStream(webrtc::AudioSendStream* send_stream) {
535 TRACE_EVENT0("webrtc", "Call::DestroyAudioSendStream"); 531 TRACE_EVENT0("webrtc", "Call::DestroyAudioSendStream");
536 RTC_DCHECK(configuration_thread_checker_.CalledOnValidThread()); 532 RTC_DCHECK(configuration_thread_checker_.CalledOnValidThread());
537 RTC_DCHECK(send_stream != nullptr); 533 RTC_DCHECK(send_stream != nullptr);
538 534
539 send_stream->Stop(); 535 send_stream->Stop();
540 536
541 webrtc::internal::AudioSendStream* audio_send_stream = 537 webrtc::internal::AudioSendStream* audio_send_stream =
542 static_cast<webrtc::internal::AudioSendStream*>(send_stream); 538 static_cast<webrtc::internal::AudioSendStream*>(send_stream);
543 uint32_t ssrc = audio_send_stream->config().rtp.ssrc; 539 uint32_t ssrc = audio_send_stream->config().rtp.ssrc;
544 { 540 {
545 WriteLockScoped write_lock(*send_crit_); 541 WriteLockScoped write_lock(*send_crit_);
546 size_t num_deleted = audio_send_ssrcs_.erase(ssrc); 542 size_t num_deleted = audio_send_ssrcs_.erase(ssrc);
547 RTC_DCHECK_EQ(1, num_deleted); 543 RTC_DCHECK_EQ(1, num_deleted);
548 } 544 }
549 { 545 {
550 ReadLockScoped read_lock(*receive_crit_); 546 ReadLockScoped read_lock(*receive_crit_);
551 for (const auto& kv : audio_receive_ssrcs_) { 547 for (AudioReceiveStream* stream : audio_receive_streams_) {
552 if (kv.second->config().rtp.local_ssrc == ssrc) { 548 if (stream->config().rtp.local_ssrc == ssrc) {
553 kv.second->AssociateSendStream(nullptr); 549 stream->AssociateSendStream(nullptr);
554 } 550 }
555 } 551 }
556 } 552 }
557 UpdateAggregateNetworkState(); 553 UpdateAggregateNetworkState();
558 delete audio_send_stream; 554 delete audio_send_stream;
559 } 555 }
560 556
561 webrtc::AudioReceiveStream* Call::CreateAudioReceiveStream( 557 webrtc::AudioReceiveStream* Call::CreateAudioReceiveStream(
562 const webrtc::AudioReceiveStream::Config& config) { 558 const webrtc::AudioReceiveStream::Config& config) {
563 TRACE_EVENT0("webrtc", "Call::CreateAudioReceiveStream"); 559 TRACE_EVENT0("webrtc", "Call::CreateAudioReceiveStream");
564 RTC_DCHECK(configuration_thread_checker_.CalledOnValidThread()); 560 RTC_DCHECK(configuration_thread_checker_.CalledOnValidThread());
565 event_log_->LogAudioReceiveStreamConfig(config); 561 event_log_->LogAudioReceiveStreamConfig(config);
566 AudioReceiveStream* receive_stream = 562 AudioReceiveStream* receive_stream =
567 new AudioReceiveStream(transport_send_->packet_router(), config, 563 new AudioReceiveStream(transport_send_->packet_router(), config,
568 config_.audio_state, event_log_); 564 config_.audio_state, event_log_);
569 { 565 {
570 WriteLockScoped write_lock(*receive_crit_); 566 WriteLockScoped write_lock(*receive_crit_);
571 RTC_DCHECK(audio_receive_ssrcs_.find(config.rtp.remote_ssrc) == 567 audio_rtp_demuxer_.AddSink(config.rtp.remote_ssrc, receive_stream);
572 audio_receive_ssrcs_.end());
573 audio_receive_ssrcs_[config.rtp.remote_ssrc] = receive_stream;
574 receive_rtp_config_[config.rtp.remote_ssrc] = 568 receive_rtp_config_[config.rtp.remote_ssrc] =
575 ReceiveRtpConfig(config.rtp.extensions, UseSendSideBwe(config)); 569 ReceiveRtpConfig(config.rtp.extensions, UseSendSideBwe(config));
570 audio_receive_streams_.insert(receive_stream);
576 571
577 ConfigureSync(config.sync_group); 572 ConfigureSync(config.sync_group);
578 } 573 }
579 { 574 {
580 ReadLockScoped read_lock(*send_crit_); 575 ReadLockScoped read_lock(*send_crit_);
581 auto it = audio_send_ssrcs_.find(config.rtp.local_ssrc); 576 auto it = audio_send_ssrcs_.find(config.rtp.local_ssrc);
582 if (it != audio_send_ssrcs_.end()) { 577 if (it != audio_send_ssrcs_.end()) {
583 receive_stream->AssociateSendStream(it->second); 578 receive_stream->AssociateSendStream(it->second);
584 } 579 }
585 } 580 }
586 receive_stream->SignalNetworkState(audio_network_state_); 581 receive_stream->SignalNetworkState(audio_network_state_);
587 UpdateAggregateNetworkState(); 582 UpdateAggregateNetworkState();
588 return receive_stream; 583 return receive_stream;
589 } 584 }
590 585
591 void Call::DestroyAudioReceiveStream( 586 void Call::DestroyAudioReceiveStream(
592 webrtc::AudioReceiveStream* receive_stream) { 587 webrtc::AudioReceiveStream* receive_stream) {
593 TRACE_EVENT0("webrtc", "Call::DestroyAudioReceiveStream"); 588 TRACE_EVENT0("webrtc", "Call::DestroyAudioReceiveStream");
594 RTC_DCHECK(configuration_thread_checker_.CalledOnValidThread()); 589 RTC_DCHECK(configuration_thread_checker_.CalledOnValidThread());
595 RTC_DCHECK(receive_stream != nullptr); 590 RTC_DCHECK(receive_stream != nullptr);
596 webrtc::internal::AudioReceiveStream* audio_receive_stream = 591 webrtc::internal::AudioReceiveStream* audio_receive_stream =
597 static_cast<webrtc::internal::AudioReceiveStream*>(receive_stream); 592 static_cast<webrtc::internal::AudioReceiveStream*>(receive_stream);
598 { 593 {
599 WriteLockScoped write_lock(*receive_crit_); 594 WriteLockScoped write_lock(*receive_crit_);
600 const AudioReceiveStream::Config& config = audio_receive_stream->config(); 595 const AudioReceiveStream::Config& config = audio_receive_stream->config();
601 uint32_t ssrc = config.rtp.remote_ssrc; 596 uint32_t ssrc = config.rtp.remote_ssrc;
602 receive_side_cc_.GetRemoteBitrateEstimator(UseSendSideBwe(config)) 597 receive_side_cc_.GetRemoteBitrateEstimator(UseSendSideBwe(config))
603 ->RemoveStream(ssrc); 598 ->RemoveStream(ssrc);
604 size_t num_deleted = audio_receive_ssrcs_.erase(ssrc); 599 size_t num_deleted = audio_rtp_demuxer_.RemoveSink(audio_receive_stream);
605 RTC_DCHECK(num_deleted == 1); 600 RTC_DCHECK(num_deleted == 1);
601 audio_receive_streams_.erase(audio_receive_stream);
606 const std::string& sync_group = audio_receive_stream->config().sync_group; 602 const std::string& sync_group = audio_receive_stream->config().sync_group;
607 const auto it = sync_stream_mapping_.find(sync_group); 603 const auto it = sync_stream_mapping_.find(sync_group);
608 if (it != sync_stream_mapping_.end() && 604 if (it != sync_stream_mapping_.end() &&
609 it->second == audio_receive_stream) { 605 it->second == audio_receive_stream) {
610 sync_stream_mapping_.erase(it); 606 sync_stream_mapping_.erase(it);
611 ConfigureSync(sync_group); 607 ConfigureSync(sync_group);
612 } 608 }
613 receive_rtp_config_.erase(ssrc); 609 receive_rtp_config_.erase(ssrc);
614 } 610 }
615 UpdateAggregateNetworkState(); 611 UpdateAggregateNetworkState();
(...skipping 76 matching lines...) Expand 10 before | Expand all | Expand 10 after
692 VideoReceiveStream* receive_stream = 688 VideoReceiveStream* receive_stream =
693 new VideoReceiveStream(num_cpu_cores_, transport_send_->packet_router(), 689 new VideoReceiveStream(num_cpu_cores_, transport_send_->packet_router(),
694 std::move(configuration), 690 std::move(configuration),
695 module_process_thread_.get(), call_stats_.get()); 691 module_process_thread_.get(), call_stats_.get());
696 692
697 const webrtc::VideoReceiveStream::Config& config = receive_stream->config(); 693 const webrtc::VideoReceiveStream::Config& config = receive_stream->config();
698 ReceiveRtpConfig receive_config(config.rtp.extensions, 694 ReceiveRtpConfig receive_config(config.rtp.extensions,
699 UseSendSideBwe(config)); 695 UseSendSideBwe(config));
700 { 696 {
701 WriteLockScoped write_lock(*receive_crit_); 697 WriteLockScoped write_lock(*receive_crit_);
702 RTC_DCHECK(video_receive_ssrcs_.find(config.rtp.remote_ssrc) == 698 video_rtp_demuxer_.AddSink(config.rtp.remote_ssrc, receive_stream);
703 video_receive_ssrcs_.end());
704 video_receive_ssrcs_[config.rtp.remote_ssrc] = receive_stream;
705 if (config.rtp.rtx_ssrc) { 699 if (config.rtp.rtx_ssrc) {
706 video_receive_ssrcs_[config.rtp.rtx_ssrc] = receive_stream; 700 video_rtp_demuxer_.AddSink(config.rtp.rtx_ssrc, receive_stream);
707 // We record identical config for the rtx stream as for the main 701 // We record identical config for the rtx stream as for the main
708 // stream. Since the transport_send_cc negotiation is per payload 702 // stream. Since the transport_send_cc negotiation is per payload
709 // type, we may get an incorrect value for the rtx stream, but 703 // type, we may get an incorrect value for the rtx stream, but
710 // that is unlikely to matter in practice. 704 // that is unlikely to matter in practice.
711 receive_rtp_config_[config.rtp.rtx_ssrc] = receive_config; 705 receive_rtp_config_[config.rtp.rtx_ssrc] = receive_config;
712 } 706 }
713 receive_rtp_config_[config.rtp.remote_ssrc] = receive_config; 707 receive_rtp_config_[config.rtp.remote_ssrc] = receive_config;
714 video_receive_streams_.insert(receive_stream); 708 video_receive_streams_.insert(receive_stream);
715 ConfigureSync(config.sync_group); 709 ConfigureSync(config.sync_group);
716 } 710 }
717 receive_stream->SignalNetworkState(video_network_state_); 711 receive_stream->SignalNetworkState(video_network_state_);
718 UpdateAggregateNetworkState(); 712 UpdateAggregateNetworkState();
719 event_log_->LogVideoReceiveStreamConfig(config); 713 event_log_->LogVideoReceiveStreamConfig(config);
720 return receive_stream; 714 return receive_stream;
721 } 715 }
722 716
723 void Call::DestroyVideoReceiveStream( 717 void Call::DestroyVideoReceiveStream(
724 webrtc::VideoReceiveStream* receive_stream) { 718 webrtc::VideoReceiveStream* receive_stream) {
725 TRACE_EVENT0("webrtc", "Call::DestroyVideoReceiveStream"); 719 TRACE_EVENT0("webrtc", "Call::DestroyVideoReceiveStream");
726 RTC_DCHECK(configuration_thread_checker_.CalledOnValidThread()); 720 RTC_DCHECK(configuration_thread_checker_.CalledOnValidThread());
727 RTC_DCHECK(receive_stream != nullptr); 721 RTC_DCHECK(receive_stream != nullptr);
728 VideoReceiveStream* receive_stream_impl = nullptr; 722 VideoReceiveStream* receive_stream_impl =
723 static_cast<VideoReceiveStream*>(receive_stream);
724 const VideoReceiveStream::Config& config = receive_stream_impl->config();
729 { 725 {
730 WriteLockScoped write_lock(*receive_crit_); 726 WriteLockScoped write_lock(*receive_crit_);
731 // Remove all ssrcs pointing to a receive stream. As RTX retransmits on a 727 // Remove all ssrcs pointing to a receive stream. As RTX retransmits on a
732 // separate SSRC there can be either one or two. 728 // separate SSRC there can be either one or two.
733 auto it = video_receive_ssrcs_.begin(); 729 size_t num_deleted = video_rtp_demuxer_.RemoveSink(receive_stream_impl);
734 while (it != video_receive_ssrcs_.end()) { 730 RTC_DCHECK_GE(num_deleted, 1);
735 if (it->second == static_cast<VideoReceiveStream*>(receive_stream)) { 731 receive_rtp_config_.erase(config.rtp.remote_ssrc);
736 if (receive_stream_impl != nullptr) 732 if (config.rtp.rtx_ssrc) {
737 RTC_DCHECK(receive_stream_impl == it->second); 733 receive_rtp_config_.erase(config.rtp.rtx_ssrc);
738 receive_stream_impl = it->second;
739 receive_rtp_config_.erase(it->first);
740 it = video_receive_ssrcs_.erase(it);
741 } else {
742 ++it;
743 }
744 } 734 }
745 video_receive_streams_.erase(receive_stream_impl); 735 video_receive_streams_.erase(receive_stream_impl);
746 RTC_CHECK(receive_stream_impl != nullptr); 736 ConfigureSync(config.sync_group);
747 ConfigureSync(receive_stream_impl->config().sync_group);
748 } 737 }
749 const VideoReceiveStream::Config& config = receive_stream_impl->config();
750 738
751 receive_side_cc_.GetRemoteBitrateEstimator(UseSendSideBwe(config)) 739 receive_side_cc_.GetRemoteBitrateEstimator(UseSendSideBwe(config))
752 ->RemoveStream(config.rtp.remote_ssrc); 740 ->RemoveStream(config.rtp.remote_ssrc);
753 741
754 UpdateAggregateNetworkState(); 742 UpdateAggregateNetworkState();
755 delete receive_stream_impl; 743 delete receive_stream_impl;
756 } 744 }
757 745
758 FlexfecReceiveStream* Call::CreateFlexfecReceiveStream( 746 FlexfecReceiveStream* Call::CreateFlexfecReceiveStream(
759 const FlexfecReceiveStream::Config& config) { 747 const FlexfecReceiveStream::Config& config) {
760 TRACE_EVENT0("webrtc", "Call::CreateFlexfecReceiveStream"); 748 TRACE_EVENT0("webrtc", "Call::CreateFlexfecReceiveStream");
761 RTC_DCHECK(configuration_thread_checker_.CalledOnValidThread()); 749 RTC_DCHECK(configuration_thread_checker_.CalledOnValidThread());
762 750
763 RecoveredPacketReceiver* recovered_packet_receiver = this; 751 RecoveredPacketReceiver* recovered_packet_receiver = this;
764 FlexfecReceiveStreamImpl* receive_stream = new FlexfecReceiveStreamImpl( 752 FlexfecReceiveStreamImpl* receive_stream = new FlexfecReceiveStreamImpl(
765 config, recovered_packet_receiver, call_stats_->rtcp_rtt_stats(), 753 config, recovered_packet_receiver, call_stats_->rtcp_rtt_stats(),
766 module_process_thread_.get()); 754 module_process_thread_.get());
767 755
768 { 756 {
769 WriteLockScoped write_lock(*receive_crit_); 757 WriteLockScoped write_lock(*receive_crit_);
770 758 video_rtp_demuxer_.AddSink(config.remote_ssrc, receive_stream);
771 RTC_DCHECK(flexfec_receive_streams_.find(receive_stream) ==
772 flexfec_receive_streams_.end());
773 flexfec_receive_streams_.insert(receive_stream);
774 759
775 for (auto ssrc : config.protected_media_ssrcs) 760 for (auto ssrc : config.protected_media_ssrcs)
776 flexfec_receive_ssrcs_media_.insert(std::make_pair(ssrc, receive_stream)); 761 video_rtp_demuxer_.AddSink(ssrc, receive_stream);
777
778 RTC_DCHECK(flexfec_receive_ssrcs_protection_.find(config.remote_ssrc) ==
779 flexfec_receive_ssrcs_protection_.end());
780 flexfec_receive_ssrcs_protection_[config.remote_ssrc] = receive_stream;
781 762
782 RTC_DCHECK(receive_rtp_config_.find(config.remote_ssrc) == 763 RTC_DCHECK(receive_rtp_config_.find(config.remote_ssrc) ==
783 receive_rtp_config_.end()); 764 receive_rtp_config_.end());
784 receive_rtp_config_[config.remote_ssrc] = 765 receive_rtp_config_[config.remote_ssrc] =
785 ReceiveRtpConfig(config.rtp_header_extensions, UseSendSideBwe(config)); 766 ReceiveRtpConfig(config.rtp_header_extensions, UseSendSideBwe(config));
786 } 767 }
787 768
788 // TODO(brandtr): Store config in RtcEventLog here. 769 // TODO(brandtr): Store config in RtcEventLog here.
789 770
790 return receive_stream; 771 return receive_stream;
(...skipping 11 matching lines...) Expand all
802 { 783 {
803 WriteLockScoped write_lock(*receive_crit_); 784 WriteLockScoped write_lock(*receive_crit_);
804 785
805 const FlexfecReceiveStream::Config& config = 786 const FlexfecReceiveStream::Config& config =
806 receive_stream_impl->GetConfig(); 787 receive_stream_impl->GetConfig();
807 uint32_t ssrc = config.remote_ssrc; 788 uint32_t ssrc = config.remote_ssrc;
808 receive_rtp_config_.erase(ssrc); 789 receive_rtp_config_.erase(ssrc);
809 790
810 // Remove all SSRCs pointing to the FlexfecReceiveStreamImpl to be 791 // Remove all SSRCs pointing to the FlexfecReceiveStreamImpl to be
811 // destroyed. 792 // destroyed.
812 auto prot_it = flexfec_receive_ssrcs_protection_.begin(); 793 video_rtp_demuxer_.RemoveSink(receive_stream_impl);
813 while (prot_it != flexfec_receive_ssrcs_protection_.end()) {
814 if (prot_it->second == receive_stream_impl)
815 prot_it = flexfec_receive_ssrcs_protection_.erase(prot_it);
816 else
817 ++prot_it;
818 }
819 auto media_it = flexfec_receive_ssrcs_media_.begin();
820 while (media_it != flexfec_receive_ssrcs_media_.end()) {
821 if (media_it->second == receive_stream_impl)
822 media_it = flexfec_receive_ssrcs_media_.erase(media_it);
823 else
824 ++media_it;
825 }
826
827 receive_side_cc_.GetRemoteBitrateEstimator(UseSendSideBwe(config)) 794 receive_side_cc_.GetRemoteBitrateEstimator(UseSendSideBwe(config))
828 ->RemoveStream(ssrc); 795 ->RemoveStream(ssrc);
829
830 flexfec_receive_streams_.erase(receive_stream_impl);
831 } 796 }
832 797
833 delete receive_stream_impl; 798 delete receive_stream_impl;
834 } 799 }
835 800
836 Call::Stats Call::GetStats() const { 801 Call::Stats Call::GetStats() const {
837 // TODO(solenberg): Some test cases in EndToEndTest use this from a different 802 // TODO(solenberg): Some test cases in EndToEndTest use this from a different
838 // thread. Re-enable once that is fixed. 803 // thread. Re-enable once that is fixed.
839 // RTC_DCHECK(configuration_thread_checker_.CalledOnValidThread()); 804 // RTC_DCHECK(configuration_thread_checker_.CalledOnValidThread());
840 Stats stats; 805 Stats stats;
(...skipping 66 matching lines...) Expand 10 before | Expand all | Expand 10 after
907 ReadLockScoped read_lock(*send_crit_); 872 ReadLockScoped read_lock(*send_crit_);
908 for (auto& kv : audio_send_ssrcs_) { 873 for (auto& kv : audio_send_ssrcs_) {
909 kv.second->SignalNetworkState(audio_network_state_); 874 kv.second->SignalNetworkState(audio_network_state_);
910 } 875 }
911 for (auto& kv : video_send_ssrcs_) { 876 for (auto& kv : video_send_ssrcs_) {
912 kv.second->SignalNetworkState(video_network_state_); 877 kv.second->SignalNetworkState(video_network_state_);
913 } 878 }
914 } 879 }
915 { 880 {
916 ReadLockScoped read_lock(*receive_crit_); 881 ReadLockScoped read_lock(*receive_crit_);
917 for (auto& kv : audio_receive_ssrcs_) { 882 for (AudioReceiveStream* audio_receive_stream : audio_receive_streams_) {
918 kv.second->SignalNetworkState(audio_network_state_); 883 audio_receive_stream->SignalNetworkState(audio_network_state_);
919 } 884 }
920 for (auto& kv : video_receive_ssrcs_) { 885 for (VideoReceiveStream* video_receive_stream : video_receive_streams_) {
921 kv.second->SignalNetworkState(video_network_state_); 886 video_receive_stream->SignalNetworkState(video_network_state_);
922 } 887 }
923 } 888 }
924 } 889 }
925 890
926 void Call::OnTransportOverheadChanged(MediaType media, 891 void Call::OnTransportOverheadChanged(MediaType media,
927 int transport_overhead_per_packet) { 892 int transport_overhead_per_packet) {
928 switch (media) { 893 switch (media) {
929 case MediaType::AUDIO: { 894 case MediaType::AUDIO: {
930 ReadLockScoped read_lock(*send_crit_); 895 ReadLockScoped read_lock(*send_crit_);
931 for (auto& kv : audio_send_ssrcs_) { 896 for (auto& kv : audio_send_ssrcs_) {
(...skipping 61 matching lines...) Expand 10 before | Expand all | Expand 10 after
993 bool have_video = false; 958 bool have_video = false;
994 { 959 {
995 ReadLockScoped read_lock(*send_crit_); 960 ReadLockScoped read_lock(*send_crit_);
996 if (audio_send_ssrcs_.size() > 0) 961 if (audio_send_ssrcs_.size() > 0)
997 have_audio = true; 962 have_audio = true;
998 if (video_send_ssrcs_.size() > 0) 963 if (video_send_ssrcs_.size() > 0)
999 have_video = true; 964 have_video = true;
1000 } 965 }
1001 { 966 {
1002 ReadLockScoped read_lock(*receive_crit_); 967 ReadLockScoped read_lock(*receive_crit_);
1003 if (audio_receive_ssrcs_.size() > 0) 968 if (audio_receive_streams_.size() > 0)
1004 have_audio = true; 969 have_audio = true;
1005 if (video_receive_ssrcs_.size() > 0) 970 if (video_receive_streams_.size() > 0)
1006 have_video = true; 971 have_video = true;
1007 } 972 }
1008 973
1009 NetworkState aggregate_state = kNetworkDown; 974 NetworkState aggregate_state = kNetworkDown;
1010 if ((have_video && video_network_state_ == kNetworkUp) || 975 if ((have_video && video_network_state_ == kNetworkUp) ||
1011 (have_audio && audio_network_state_ == kNetworkUp)) { 976 (have_audio && audio_network_state_ == kNetworkUp)) {
1012 aggregate_state = kNetworkUp; 977 aggregate_state = kNetworkUp;
1013 } 978 }
1014 979
1015 LOG(LS_INFO) << "UpdateAggregateNetworkState: aggregate_state=" 980 LOG(LS_INFO) << "UpdateAggregateNetworkState: aggregate_state="
(...skipping 70 matching lines...) Expand 10 before | Expand all | Expand 10 after
1086 if (sync_group.empty()) 1051 if (sync_group.empty())
1087 return; 1052 return;
1088 1053
1089 AudioReceiveStream* sync_audio_stream = nullptr; 1054 AudioReceiveStream* sync_audio_stream = nullptr;
1090 // Find existing audio stream. 1055 // Find existing audio stream.
1091 const auto it = sync_stream_mapping_.find(sync_group); 1056 const auto it = sync_stream_mapping_.find(sync_group);
1092 if (it != sync_stream_mapping_.end()) { 1057 if (it != sync_stream_mapping_.end()) {
1093 sync_audio_stream = it->second; 1058 sync_audio_stream = it->second;
1094 } else { 1059 } else {
1095 // No configured audio stream, see if we can find one. 1060 // No configured audio stream, see if we can find one.
1096 for (const auto& kv : audio_receive_ssrcs_) { 1061 for (AudioReceiveStream* stream : audio_receive_streams_) {
1097 if (kv.second->config().sync_group == sync_group) { 1062 if (stream->config().sync_group == sync_group) {
1098 if (sync_audio_stream != nullptr) { 1063 if (sync_audio_stream != nullptr) {
1099 LOG(LS_WARNING) << "Attempting to sync more than one audio stream " 1064 LOG(LS_WARNING) << "Attempting to sync more than one audio stream "
1100 "within the same sync group. This is not " 1065 "within the same sync group. This is not "
1101 "supported in the current implementation."; 1066 "supported in the current implementation.";
1102 break; 1067 break;
1103 } 1068 }
1104 sync_audio_stream = kv.second; 1069 sync_audio_stream = stream;
1105 } 1070 }
1106 } 1071 }
1107 } 1072 }
1108 if (sync_audio_stream) 1073 if (sync_audio_stream)
1109 sync_stream_mapping_[sync_group] = sync_audio_stream; 1074 sync_stream_mapping_[sync_group] = sync_audio_stream;
1110 size_t num_synced_streams = 0; 1075 size_t num_synced_streams = 0;
1111 for (VideoReceiveStream* video_stream : video_receive_streams_) { 1076 for (VideoReceiveStream* video_stream : video_receive_streams_) {
1112 if (video_stream->config().sync_group != sync_group) 1077 if (video_stream->config().sync_group != sync_group)
1113 continue; 1078 continue;
1114 ++num_synced_streams; 1079 ++num_synced_streams;
(...skipping 29 matching lines...) Expand all
1144 bool rtcp_delivered = false; 1109 bool rtcp_delivered = false;
1145 if (media_type == MediaType::ANY || media_type == MediaType::VIDEO) { 1110 if (media_type == MediaType::ANY || media_type == MediaType::VIDEO) {
1146 ReadLockScoped read_lock(*receive_crit_); 1111 ReadLockScoped read_lock(*receive_crit_);
1147 for (VideoReceiveStream* stream : video_receive_streams_) { 1112 for (VideoReceiveStream* stream : video_receive_streams_) {
1148 if (stream->DeliverRtcp(packet, length)) 1113 if (stream->DeliverRtcp(packet, length))
1149 rtcp_delivered = true; 1114 rtcp_delivered = true;
1150 } 1115 }
1151 } 1116 }
1152 if (media_type == MediaType::ANY || media_type == MediaType::AUDIO) { 1117 if (media_type == MediaType::ANY || media_type == MediaType::AUDIO) {
1153 ReadLockScoped read_lock(*receive_crit_); 1118 ReadLockScoped read_lock(*receive_crit_);
1154 for (auto& kv : audio_receive_ssrcs_) { 1119 for (AudioReceiveStream* stream : audio_receive_streams_) {
1155 if (kv.second->DeliverRtcp(packet, length)) 1120 if (stream->DeliverRtcp(packet, length))
1156 rtcp_delivered = true; 1121 rtcp_delivered = true;
1157 } 1122 }
1158 } 1123 }
1159 if (media_type == MediaType::ANY || media_type == MediaType::VIDEO) { 1124 if (media_type == MediaType::ANY || media_type == MediaType::VIDEO) {
1160 ReadLockScoped read_lock(*send_crit_); 1125 ReadLockScoped read_lock(*send_crit_);
1161 for (VideoSendStream* stream : video_send_streams_) { 1126 for (VideoSendStream* stream : video_send_streams_) {
1162 if (stream->DeliverRtcp(packet, length)) 1127 if (stream->DeliverRtcp(packet, length))
1163 rtcp_delivered = true; 1128 rtcp_delivered = true;
1164 } 1129 }
1165 } 1130 }
(...skipping 23 matching lines...) Expand all
1189 // TODO(nisse): We should parse the RTP header only here, and pass 1154 // TODO(nisse): We should parse the RTP header only here, and pass
1190 // on parsed_packet to the receive streams. 1155 // on parsed_packet to the receive streams.
1191 rtc::Optional<RtpPacketReceived> parsed_packet = 1156 rtc::Optional<RtpPacketReceived> parsed_packet =
1192 ParseRtpPacket(packet, length, &packet_time); 1157 ParseRtpPacket(packet, length, &packet_time);
1193 1158
1194 if (!parsed_packet) 1159 if (!parsed_packet)
1195 return DELIVERY_PACKET_ERROR; 1160 return DELIVERY_PACKET_ERROR;
1196 1161
1197 NotifyBweOfReceivedPacket(*parsed_packet, media_type); 1162 NotifyBweOfReceivedPacket(*parsed_packet, media_type);
1198 1163
1199 uint32_t ssrc = parsed_packet->Ssrc();
1200
1201 if (media_type == MediaType::AUDIO) { 1164 if (media_type == MediaType::AUDIO) {
1202 auto it = audio_receive_ssrcs_.find(ssrc); 1165 if (audio_rtp_demuxer_.OnRtpPacket(*parsed_packet)) {
1203 if (it != audio_receive_ssrcs_.end()) {
1204 received_bytes_per_second_counter_.Add(static_cast<int>(length)); 1166 received_bytes_per_second_counter_.Add(static_cast<int>(length));
1205 received_audio_bytes_per_second_counter_.Add(static_cast<int>(length)); 1167 received_audio_bytes_per_second_counter_.Add(static_cast<int>(length));
1206 it->second->OnRtpPacket(*parsed_packet);
1207 event_log_->LogRtpHeader(kIncomingPacket, media_type, packet, length); 1168 event_log_->LogRtpHeader(kIncomingPacket, media_type, packet, length);
1208 return DELIVERY_OK; 1169 return DELIVERY_OK;
1209 } 1170 }
1210 } 1171 } else if (media_type == MediaType::VIDEO) {
1211 if (media_type == MediaType::VIDEO) { 1172 if (video_rtp_demuxer_.OnRtpPacket(*parsed_packet)) {
1212 auto it = video_receive_ssrcs_.find(ssrc);
1213 if (it != video_receive_ssrcs_.end()) {
1214 received_bytes_per_second_counter_.Add(static_cast<int>(length)); 1173 received_bytes_per_second_counter_.Add(static_cast<int>(length));
1215 received_video_bytes_per_second_counter_.Add(static_cast<int>(length)); 1174 received_video_bytes_per_second_counter_.Add(static_cast<int>(length));
1216 it->second->OnRtpPacket(*parsed_packet);
1217
1218 // Deliver media packets to FlexFEC subsystem.
1219 auto it_bounds = flexfec_receive_ssrcs_media_.equal_range(ssrc);
1220 for (auto it = it_bounds.first; it != it_bounds.second; ++it)
1221 it->second->OnRtpPacket(*parsed_packet);
1222
1223 event_log_->LogRtpHeader(kIncomingPacket, media_type, packet, length); 1175 event_log_->LogRtpHeader(kIncomingPacket, media_type, packet, length);
1224 return DELIVERY_OK; 1176 return DELIVERY_OK;
1225 } 1177 }
1226 }
1227 if (media_type == MediaType::VIDEO) {
1228 received_bytes_per_second_counter_.Add(static_cast<int>(length));
1229 // TODO(brandtr): Update here when FlexFEC supports protecting audio.
1230 received_video_bytes_per_second_counter_.Add(static_cast<int>(length));
1231 auto it = flexfec_receive_ssrcs_protection_.find(ssrc);
1232 if (it != flexfec_receive_ssrcs_protection_.end()) {
1233 it->second->OnRtpPacket(*parsed_packet);
1234 event_log_->LogRtpHeader(kIncomingPacket, media_type, packet, length);
1235 return DELIVERY_OK;
1236 }
1237 } 1178 }
1238 return DELIVERY_UNKNOWN_SSRC; 1179 return DELIVERY_UNKNOWN_SSRC;
1239 } 1180 }
1240 1181
1241 PacketReceiver::DeliveryStatus Call::DeliverPacket( 1182 PacketReceiver::DeliveryStatus Call::DeliverPacket(
1242 MediaType media_type, 1183 MediaType media_type,
1243 const uint8_t* packet, 1184 const uint8_t* packet,
1244 size_t length, 1185 size_t length,
1245 const PacketTime& packet_time) { 1186 const PacketTime& packet_time) {
1246 // TODO(solenberg): Tests call this function on a network thread, libjingle 1187 // TODO(solenberg): Tests call this function on a network thread, libjingle
(...skipping 10 matching lines...) Expand all
1257 // audio packets with FlexFEC. 1198 // audio packets with FlexFEC.
1258 void Call::OnRecoveredPacket(const uint8_t* packet, size_t length) { 1199 void Call::OnRecoveredPacket(const uint8_t* packet, size_t length) {
1259 ReadLockScoped read_lock(*receive_crit_); 1200 ReadLockScoped read_lock(*receive_crit_);
1260 rtc::Optional<RtpPacketReceived> parsed_packet = 1201 rtc::Optional<RtpPacketReceived> parsed_packet =
1261 ParseRtpPacket(packet, length, nullptr); 1202 ParseRtpPacket(packet, length, nullptr);
1262 if (!parsed_packet) 1203 if (!parsed_packet)
1263 return; 1204 return;
1264 1205
1265 parsed_packet->set_recovered(true); 1206 parsed_packet->set_recovered(true);
1266 1207
1267 auto it = video_receive_ssrcs_.find(parsed_packet->Ssrc()); 1208 video_rtp_demuxer_.OnRtpPacket(*parsed_packet);
1268 if (it == video_receive_ssrcs_.end())
1269 return;
1270
1271 it->second->OnRtpPacket(*parsed_packet);
1272 } 1209 }
1273 1210
1274 void Call::NotifyBweOfReceivedPacket(const RtpPacketReceived& packet, 1211 void Call::NotifyBweOfReceivedPacket(const RtpPacketReceived& packet,
1275 MediaType media_type) { 1212 MediaType media_type) {
1276 auto it = receive_rtp_config_.find(packet.Ssrc()); 1213 auto it = receive_rtp_config_.find(packet.Ssrc());
1277 bool use_send_side_bwe = 1214 bool use_send_side_bwe =
1278 (it != receive_rtp_config_.end()) && it->second.use_send_side_bwe; 1215 (it != receive_rtp_config_.end()) && it->second.use_send_side_bwe;
1279 1216
1280 RTPHeader header; 1217 RTPHeader header;
1281 packet.GetHeader(&header); 1218 packet.GetHeader(&header);
(...skipping 13 matching lines...) Expand all
1295 (use_send_side_bwe && header.extension.hasTransportSequenceNumber)) { 1232 (use_send_side_bwe && header.extension.hasTransportSequenceNumber)) {
1296 receive_side_cc_.OnReceivedPacket( 1233 receive_side_cc_.OnReceivedPacket(
1297 packet.arrival_time_ms(), packet.payload_size() + packet.padding_size(), 1234 packet.arrival_time_ms(), packet.payload_size() + packet.padding_size(),
1298 header); 1235 header);
1299 } 1236 }
1300 } 1237 }
1301 1238
1302 } // namespace internal 1239 } // namespace internal
1303 1240
1304 } // namespace webrtc 1241 } // namespace webrtc
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698