Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(141)

Side by Side Diff: webrtc/call/call.cc

Issue 2867943003: New class RtpDemuxer and RtpPacketSinkInterface, use in Call. (Closed)
Patch Set: Address danil's comments. Created 3 years, 7 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 /* 1 /*
2 * Copyright (c) 2013 The WebRTC project authors. All Rights Reserved. 2 * Copyright (c) 2013 The WebRTC project authors. All Rights Reserved.
3 * 3 *
4 * Use of this source code is governed by a BSD-style license 4 * Use of this source code is governed by a BSD-style license
5 * that can be found in the LICENSE file in the root of the source 5 * that can be found in the LICENSE file in the root of the source
6 * tree. An additional intellectual property rights grant can be found 6 * tree. An additional intellectual property rights grant can be found
7 * in the file PATENTS. All contributing project authors may 7 * in the file PATENTS. All contributing project authors may
8 * be found in the AUTHORS file in the root of the source tree. 8 * be found in the AUTHORS file in the root of the source tree.
9 */ 9 */
10 10
(...skipping 15 matching lines...) Expand all
26 #include "webrtc/base/location.h" 26 #include "webrtc/base/location.h"
27 #include "webrtc/base/logging.h" 27 #include "webrtc/base/logging.h"
28 #include "webrtc/base/optional.h" 28 #include "webrtc/base/optional.h"
29 #include "webrtc/base/task_queue.h" 29 #include "webrtc/base/task_queue.h"
30 #include "webrtc/base/thread_annotations.h" 30 #include "webrtc/base/thread_annotations.h"
31 #include "webrtc/base/thread_checker.h" 31 #include "webrtc/base/thread_checker.h"
32 #include "webrtc/base/trace_event.h" 32 #include "webrtc/base/trace_event.h"
33 #include "webrtc/call/bitrate_allocator.h" 33 #include "webrtc/call/bitrate_allocator.h"
34 #include "webrtc/call/call.h" 34 #include "webrtc/call/call.h"
35 #include "webrtc/call/flexfec_receive_stream_impl.h" 35 #include "webrtc/call/flexfec_receive_stream_impl.h"
36 #include "webrtc/call/rtp_demuxer.h"
36 #include "webrtc/call/rtp_transport_controller_send.h" 37 #include "webrtc/call/rtp_transport_controller_send.h"
37 #include "webrtc/config.h" 38 #include "webrtc/config.h"
38 #include "webrtc/logging/rtc_event_log/rtc_event_log.h" 39 #include "webrtc/logging/rtc_event_log/rtc_event_log.h"
39 #include "webrtc/modules/bitrate_controller/include/bitrate_controller.h" 40 #include "webrtc/modules/bitrate_controller/include/bitrate_controller.h"
40 #include "webrtc/modules/congestion_controller/include/receive_side_congestion_c ontroller.h" 41 #include "webrtc/modules/congestion_controller/include/receive_side_congestion_c ontroller.h"
41 #include "webrtc/modules/pacing/paced_sender.h" 42 #include "webrtc/modules/pacing/paced_sender.h"
42 #include "webrtc/modules/rtp_rtcp/include/flexfec_receiver.h" 43 #include "webrtc/modules/rtp_rtcp/include/flexfec_receiver.h"
43 #include "webrtc/modules/rtp_rtcp/include/rtp_header_parser.h" 44 #include "webrtc/modules/rtp_rtcp/include/rtp_header_parser.h"
44 #include "webrtc/modules/rtp_rtcp/source/byte_io.h" 45 #include "webrtc/modules/rtp_rtcp/source/byte_io.h"
45 #include "webrtc/modules/rtp_rtcp/source/rtp_header_extension.h" 46 #include "webrtc/modules/rtp_rtcp/source/rtp_header_extension.h"
(...skipping 150 matching lines...) Expand 10 before | Expand all | Expand 10 after
196 const std::unique_ptr<BitrateAllocator> bitrate_allocator_; 197 const std::unique_ptr<BitrateAllocator> bitrate_allocator_;
197 Call::Config config_; 198 Call::Config config_;
198 rtc::ThreadChecker configuration_thread_checker_; 199 rtc::ThreadChecker configuration_thread_checker_;
199 200
200 NetworkState audio_network_state_; 201 NetworkState audio_network_state_;
201 NetworkState video_network_state_; 202 NetworkState video_network_state_;
202 203
203 std::unique_ptr<RWLockWrapper> receive_crit_; 204 std::unique_ptr<RWLockWrapper> receive_crit_;
204 // Audio, Video, and FlexFEC receive streams are owned by the client that 205 // Audio, Video, and FlexFEC receive streams are owned by the client that
205 // creates them. 206 // creates them.
206 std::map<uint32_t, AudioReceiveStream*> audio_receive_ssrcs_ 207 std::set<AudioReceiveStream*> audio_receive_streams_
207 GUARDED_BY(receive_crit_);
208 std::map<uint32_t, VideoReceiveStream*> video_receive_ssrcs_
209 GUARDED_BY(receive_crit_); 208 GUARDED_BY(receive_crit_);
210 std::set<VideoReceiveStream*> video_receive_streams_ 209 std::set<VideoReceiveStream*> video_receive_streams_
211 GUARDED_BY(receive_crit_); 210 GUARDED_BY(receive_crit_);
212 // Each media stream could conceivably be protected by multiple FlexFEC 211
213 // streams.
214 std::multimap<uint32_t, FlexfecReceiveStreamImpl*>
215 flexfec_receive_ssrcs_media_ GUARDED_BY(receive_crit_);
216 std::map<uint32_t, FlexfecReceiveStreamImpl*>
217 flexfec_receive_ssrcs_protection_ GUARDED_BY(receive_crit_);
218 std::set<FlexfecReceiveStreamImpl*> flexfec_receive_streams_
219 GUARDED_BY(receive_crit_);
220 std::map<std::string, AudioReceiveStream*> sync_stream_mapping_ 212 std::map<std::string, AudioReceiveStream*> sync_stream_mapping_
221 GUARDED_BY(receive_crit_); 213 GUARDED_BY(receive_crit_);
222 214
215 // TODO(nisse): Should eventually be part of injected
216 // RtpTransportControllerReceive.
217 RtpDemuxer audio_rtp_demuxer_ GUARDED_BY(receive_crit_);
218 RtpDemuxer video_rtp_demuxer_ GUARDED_BY(receive_crit_);
219
223 // This extra map is used for receive processing which is 220 // This extra map is used for receive processing which is
224 // independent of media type. 221 // independent of media type.
225 222
226 // TODO(nisse): In the RTP transport refactoring, we should have a 223 // TODO(nisse): In the RTP transport refactoring, we should have a
227 // single mapping from ssrc to a more abstract receive stream, with 224 // single mapping from ssrc to a more abstract receive stream, with
228 // accessor methods for all configuration we need at this level. 225 // accessor methods for all configuration we need at this level.
229 struct ReceiveRtpConfig { 226 struct ReceiveRtpConfig {
230 ReceiveRtpConfig() = default; // Needed by std::map 227 ReceiveRtpConfig() = default; // Needed by std::map
231 ReceiveRtpConfig(const std::vector<RtpExtension>& extensions, 228 ReceiveRtpConfig(const std::vector<RtpExtension>& extensions,
232 bool use_send_side_bwe) 229 bool use_send_side_bwe)
(...skipping 131 matching lines...) Expand 10 before | Expand all | Expand 10 after
364 361
365 pacer_thread_->Start(); 362 pacer_thread_->Start();
366 } 363 }
367 364
368 Call::~Call() { 365 Call::~Call() {
369 RTC_DCHECK(configuration_thread_checker_.CalledOnValidThread()); 366 RTC_DCHECK(configuration_thread_checker_.CalledOnValidThread());
370 367
371 RTC_CHECK(audio_send_ssrcs_.empty()); 368 RTC_CHECK(audio_send_ssrcs_.empty());
372 RTC_CHECK(video_send_ssrcs_.empty()); 369 RTC_CHECK(video_send_ssrcs_.empty());
373 RTC_CHECK(video_send_streams_.empty()); 370 RTC_CHECK(video_send_streams_.empty());
374 RTC_CHECK(audio_receive_ssrcs_.empty()); 371 RTC_CHECK(audio_receive_streams_.empty());
375 RTC_CHECK(video_receive_ssrcs_.empty());
376 RTC_CHECK(video_receive_streams_.empty()); 372 RTC_CHECK(video_receive_streams_.empty());
377 373
378 pacer_thread_->Stop(); 374 pacer_thread_->Stop();
379 pacer_thread_->DeRegisterModule(transport_send_->send_side_cc()->pacer()); 375 pacer_thread_->DeRegisterModule(transport_send_->send_side_cc()->pacer());
380 pacer_thread_->DeRegisterModule( 376 pacer_thread_->DeRegisterModule(
381 receive_side_cc_.GetRemoteBitrateEstimator(true)); 377 receive_side_cc_.GetRemoteBitrateEstimator(true));
382 module_process_thread_->DeRegisterModule(transport_send_->send_side_cc()); 378 module_process_thread_->DeRegisterModule(transport_send_->send_side_cc());
383 module_process_thread_->DeRegisterModule(&receive_side_cc_); 379 module_process_thread_->DeRegisterModule(&receive_side_cc_);
384 module_process_thread_->DeRegisterModule(call_stats_.get()); 380 module_process_thread_->DeRegisterModule(call_stats_.get());
385 module_process_thread_->Stop(); 381 module_process_thread_->Stop();
(...skipping 121 matching lines...) Expand 10 before | Expand all | Expand 10 after
507 config, config_.audio_state, &worker_queue_, transport_send_.get(), 503 config, config_.audio_state, &worker_queue_, transport_send_.get(),
508 bitrate_allocator_.get(), event_log_, call_stats_->rtcp_rtt_stats()); 504 bitrate_allocator_.get(), event_log_, call_stats_->rtcp_rtt_stats());
509 { 505 {
510 WriteLockScoped write_lock(*send_crit_); 506 WriteLockScoped write_lock(*send_crit_);
511 RTC_DCHECK(audio_send_ssrcs_.find(config.rtp.ssrc) == 507 RTC_DCHECK(audio_send_ssrcs_.find(config.rtp.ssrc) ==
512 audio_send_ssrcs_.end()); 508 audio_send_ssrcs_.end());
513 audio_send_ssrcs_[config.rtp.ssrc] = send_stream; 509 audio_send_ssrcs_[config.rtp.ssrc] = send_stream;
514 } 510 }
515 { 511 {
516 ReadLockScoped read_lock(*receive_crit_); 512 ReadLockScoped read_lock(*receive_crit_);
517 for (const auto& kv : audio_receive_ssrcs_) { 513 for (AudioReceiveStream* stream : audio_receive_streams_) {
518 if (kv.second->config().rtp.local_ssrc == config.rtp.ssrc) { 514 if (stream->config().rtp.local_ssrc == config.rtp.ssrc) {
519 kv.second->AssociateSendStream(send_stream); 515 stream->AssociateSendStream(send_stream);
520 } 516 }
521 } 517 }
522 } 518 }
523 send_stream->SignalNetworkState(audio_network_state_); 519 send_stream->SignalNetworkState(audio_network_state_);
524 UpdateAggregateNetworkState(); 520 UpdateAggregateNetworkState();
525 return send_stream; 521 return send_stream;
526 } 522 }
527 523
528 void Call::DestroyAudioSendStream(webrtc::AudioSendStream* send_stream) { 524 void Call::DestroyAudioSendStream(webrtc::AudioSendStream* send_stream) {
529 TRACE_EVENT0("webrtc", "Call::DestroyAudioSendStream"); 525 TRACE_EVENT0("webrtc", "Call::DestroyAudioSendStream");
530 RTC_DCHECK(configuration_thread_checker_.CalledOnValidThread()); 526 RTC_DCHECK(configuration_thread_checker_.CalledOnValidThread());
531 RTC_DCHECK(send_stream != nullptr); 527 RTC_DCHECK(send_stream != nullptr);
532 528
533 send_stream->Stop(); 529 send_stream->Stop();
534 530
535 webrtc::internal::AudioSendStream* audio_send_stream = 531 webrtc::internal::AudioSendStream* audio_send_stream =
536 static_cast<webrtc::internal::AudioSendStream*>(send_stream); 532 static_cast<webrtc::internal::AudioSendStream*>(send_stream);
537 uint32_t ssrc = audio_send_stream->config().rtp.ssrc; 533 uint32_t ssrc = audio_send_stream->config().rtp.ssrc;
538 { 534 {
539 WriteLockScoped write_lock(*send_crit_); 535 WriteLockScoped write_lock(*send_crit_);
540 size_t num_deleted = audio_send_ssrcs_.erase(ssrc); 536 size_t num_deleted = audio_send_ssrcs_.erase(ssrc);
541 RTC_DCHECK_EQ(1, num_deleted); 537 RTC_DCHECK_EQ(1, num_deleted);
542 } 538 }
543 { 539 {
544 ReadLockScoped read_lock(*receive_crit_); 540 ReadLockScoped read_lock(*receive_crit_);
545 for (const auto& kv : audio_receive_ssrcs_) { 541 for (AudioReceiveStream* stream : audio_receive_streams_) {
546 if (kv.second->config().rtp.local_ssrc == ssrc) { 542 if (stream->config().rtp.local_ssrc == ssrc) {
547 kv.second->AssociateSendStream(nullptr); 543 stream->AssociateSendStream(nullptr);
548 } 544 }
549 } 545 }
550 } 546 }
551 UpdateAggregateNetworkState(); 547 UpdateAggregateNetworkState();
552 delete audio_send_stream; 548 delete audio_send_stream;
553 } 549 }
554 550
555 webrtc::AudioReceiveStream* Call::CreateAudioReceiveStream( 551 webrtc::AudioReceiveStream* Call::CreateAudioReceiveStream(
556 const webrtc::AudioReceiveStream::Config& config) { 552 const webrtc::AudioReceiveStream::Config& config) {
557 TRACE_EVENT0("webrtc", "Call::CreateAudioReceiveStream"); 553 TRACE_EVENT0("webrtc", "Call::CreateAudioReceiveStream");
558 RTC_DCHECK(configuration_thread_checker_.CalledOnValidThread()); 554 RTC_DCHECK(configuration_thread_checker_.CalledOnValidThread());
559 event_log_->LogAudioReceiveStreamConfig(config); 555 event_log_->LogAudioReceiveStreamConfig(config);
560 AudioReceiveStream* receive_stream = 556 AudioReceiveStream* receive_stream =
561 new AudioReceiveStream(transport_send_->packet_router(), config, 557 new AudioReceiveStream(transport_send_->packet_router(), config,
562 config_.audio_state, event_log_); 558 config_.audio_state, event_log_);
563 { 559 {
564 WriteLockScoped write_lock(*receive_crit_); 560 WriteLockScoped write_lock(*receive_crit_);
565 RTC_DCHECK(audio_receive_ssrcs_.find(config.rtp.remote_ssrc) == 561 audio_rtp_demuxer_.AddSink(config.rtp.remote_ssrc, receive_stream);
566 audio_receive_ssrcs_.end());
567 audio_receive_ssrcs_[config.rtp.remote_ssrc] = receive_stream;
568 receive_rtp_config_[config.rtp.remote_ssrc] = 562 receive_rtp_config_[config.rtp.remote_ssrc] =
569 ReceiveRtpConfig(config.rtp.extensions, UseSendSideBwe(config)); 563 ReceiveRtpConfig(config.rtp.extensions, UseSendSideBwe(config));
564 audio_receive_streams_.insert(receive_stream);
570 565
571 ConfigureSync(config.sync_group); 566 ConfigureSync(config.sync_group);
572 } 567 }
573 { 568 {
574 ReadLockScoped read_lock(*send_crit_); 569 ReadLockScoped read_lock(*send_crit_);
575 auto it = audio_send_ssrcs_.find(config.rtp.local_ssrc); 570 auto it = audio_send_ssrcs_.find(config.rtp.local_ssrc);
576 if (it != audio_send_ssrcs_.end()) { 571 if (it != audio_send_ssrcs_.end()) {
577 receive_stream->AssociateSendStream(it->second); 572 receive_stream->AssociateSendStream(it->second);
578 } 573 }
579 } 574 }
580 receive_stream->SignalNetworkState(audio_network_state_); 575 receive_stream->SignalNetworkState(audio_network_state_);
581 UpdateAggregateNetworkState(); 576 UpdateAggregateNetworkState();
582 return receive_stream; 577 return receive_stream;
583 } 578 }
584 579
585 void Call::DestroyAudioReceiveStream( 580 void Call::DestroyAudioReceiveStream(
586 webrtc::AudioReceiveStream* receive_stream) { 581 webrtc::AudioReceiveStream* receive_stream) {
587 TRACE_EVENT0("webrtc", "Call::DestroyAudioReceiveStream"); 582 TRACE_EVENT0("webrtc", "Call::DestroyAudioReceiveStream");
588 RTC_DCHECK(configuration_thread_checker_.CalledOnValidThread()); 583 RTC_DCHECK(configuration_thread_checker_.CalledOnValidThread());
589 RTC_DCHECK(receive_stream != nullptr); 584 RTC_DCHECK(receive_stream != nullptr);
590 webrtc::internal::AudioReceiveStream* audio_receive_stream = 585 webrtc::internal::AudioReceiveStream* audio_receive_stream =
591 static_cast<webrtc::internal::AudioReceiveStream*>(receive_stream); 586 static_cast<webrtc::internal::AudioReceiveStream*>(receive_stream);
592 { 587 {
593 WriteLockScoped write_lock(*receive_crit_); 588 WriteLockScoped write_lock(*receive_crit_);
594 const AudioReceiveStream::Config& config = audio_receive_stream->config(); 589 const AudioReceiveStream::Config& config = audio_receive_stream->config();
595 uint32_t ssrc = config.rtp.remote_ssrc; 590 uint32_t ssrc = config.rtp.remote_ssrc;
596 receive_side_cc_.GetRemoteBitrateEstimator(UseSendSideBwe(config)) 591 receive_side_cc_.GetRemoteBitrateEstimator(UseSendSideBwe(config))
597 ->RemoveStream(ssrc); 592 ->RemoveStream(ssrc);
598 size_t num_deleted = audio_receive_ssrcs_.erase(ssrc); 593 size_t num_deleted = audio_rtp_demuxer_.RemoveSink(audio_receive_stream);
599 RTC_DCHECK(num_deleted == 1); 594 RTC_DCHECK(num_deleted == 1);
595 audio_receive_streams_.erase(audio_receive_stream);
600 const std::string& sync_group = audio_receive_stream->config().sync_group; 596 const std::string& sync_group = audio_receive_stream->config().sync_group;
601 const auto it = sync_stream_mapping_.find(sync_group); 597 const auto it = sync_stream_mapping_.find(sync_group);
602 if (it != sync_stream_mapping_.end() && 598 if (it != sync_stream_mapping_.end() &&
603 it->second == audio_receive_stream) { 599 it->second == audio_receive_stream) {
604 sync_stream_mapping_.erase(it); 600 sync_stream_mapping_.erase(it);
605 ConfigureSync(sync_group); 601 ConfigureSync(sync_group);
606 } 602 }
607 receive_rtp_config_.erase(ssrc); 603 receive_rtp_config_.erase(ssrc);
608 } 604 }
609 UpdateAggregateNetworkState(); 605 UpdateAggregateNetworkState();
(...skipping 76 matching lines...) Expand 10 before | Expand all | Expand 10 after
686 VideoReceiveStream* receive_stream = 682 VideoReceiveStream* receive_stream =
687 new VideoReceiveStream(num_cpu_cores_, transport_send_->packet_router(), 683 new VideoReceiveStream(num_cpu_cores_, transport_send_->packet_router(),
688 std::move(configuration), 684 std::move(configuration),
689 module_process_thread_.get(), call_stats_.get()); 685 module_process_thread_.get(), call_stats_.get());
690 686
691 const webrtc::VideoReceiveStream::Config& config = receive_stream->config(); 687 const webrtc::VideoReceiveStream::Config& config = receive_stream->config();
692 ReceiveRtpConfig receive_config(config.rtp.extensions, 688 ReceiveRtpConfig receive_config(config.rtp.extensions,
693 UseSendSideBwe(config)); 689 UseSendSideBwe(config));
694 { 690 {
695 WriteLockScoped write_lock(*receive_crit_); 691 WriteLockScoped write_lock(*receive_crit_);
696 RTC_DCHECK(video_receive_ssrcs_.find(config.rtp.remote_ssrc) == 692 video_rtp_demuxer_.AddSink(config.rtp.remote_ssrc, receive_stream);
697 video_receive_ssrcs_.end());
698 video_receive_ssrcs_[config.rtp.remote_ssrc] = receive_stream;
699 if (config.rtp.rtx_ssrc) { 693 if (config.rtp.rtx_ssrc) {
700 video_receive_ssrcs_[config.rtp.rtx_ssrc] = receive_stream; 694 video_rtp_demuxer_.AddSink(config.rtp.rtx_ssrc, receive_stream);
701 // We record identical config for the rtx stream as for the main 695 // We record identical config for the rtx stream as for the main
702 // stream. Since the transport_send_cc negotiation is per payload 696 // stream. Since the transport_send_cc negotiation is per payload
703 // type, we may get an incorrect value for the rtx stream, but 697 // type, we may get an incorrect value for the rtx stream, but
704 // that is unlikely to matter in practice. 698 // that is unlikely to matter in practice.
705 receive_rtp_config_[config.rtp.rtx_ssrc] = receive_config; 699 receive_rtp_config_[config.rtp.rtx_ssrc] = receive_config;
706 } 700 }
707 receive_rtp_config_[config.rtp.remote_ssrc] = receive_config; 701 receive_rtp_config_[config.rtp.remote_ssrc] = receive_config;
708 video_receive_streams_.insert(receive_stream); 702 video_receive_streams_.insert(receive_stream);
709 ConfigureSync(config.sync_group); 703 ConfigureSync(config.sync_group);
710 } 704 }
711 receive_stream->SignalNetworkState(video_network_state_); 705 receive_stream->SignalNetworkState(video_network_state_);
712 UpdateAggregateNetworkState(); 706 UpdateAggregateNetworkState();
713 event_log_->LogVideoReceiveStreamConfig(config); 707 event_log_->LogVideoReceiveStreamConfig(config);
714 return receive_stream; 708 return receive_stream;
715 } 709 }
716 710
717 void Call::DestroyVideoReceiveStream( 711 void Call::DestroyVideoReceiveStream(
718 webrtc::VideoReceiveStream* receive_stream) { 712 webrtc::VideoReceiveStream* receive_stream) {
719 TRACE_EVENT0("webrtc", "Call::DestroyVideoReceiveStream"); 713 TRACE_EVENT0("webrtc", "Call::DestroyVideoReceiveStream");
720 RTC_DCHECK(configuration_thread_checker_.CalledOnValidThread()); 714 RTC_DCHECK(configuration_thread_checker_.CalledOnValidThread());
721 RTC_DCHECK(receive_stream != nullptr); 715 RTC_DCHECK(receive_stream != nullptr);
722 VideoReceiveStream* receive_stream_impl = nullptr; 716 VideoReceiveStream* receive_stream_impl =
717 static_cast<VideoReceiveStream*>(receive_stream);
718 const VideoReceiveStream::Config& config = receive_stream_impl->config();
723 { 719 {
724 WriteLockScoped write_lock(*receive_crit_); 720 WriteLockScoped write_lock(*receive_crit_);
725 // Remove all ssrcs pointing to a receive stream. As RTX retransmits on a 721 // Remove all ssrcs pointing to a receive stream. As RTX retransmits on a
726 // separate SSRC there can be either one or two. 722 // separate SSRC there can be either one or two.
727 auto it = video_receive_ssrcs_.begin(); 723 size_t num_deleted = video_rtp_demuxer_.RemoveSink(receive_stream_impl);
728 while (it != video_receive_ssrcs_.end()) { 724 RTC_DCHECK_GE(num_deleted, 1);
729 if (it->second == static_cast<VideoReceiveStream*>(receive_stream)) { 725 receive_rtp_config_.erase(config.rtp.remote_ssrc);
730 if (receive_stream_impl != nullptr) 726 if (config.rtp.rtx_ssrc) {
731 RTC_DCHECK(receive_stream_impl == it->second); 727 receive_rtp_config_.erase(config.rtp.rtx_ssrc);
732 receive_stream_impl = it->second;
733 receive_rtp_config_.erase(it->first);
734 it = video_receive_ssrcs_.erase(it);
735 } else {
736 ++it;
737 }
738 } 728 }
739 video_receive_streams_.erase(receive_stream_impl); 729 video_receive_streams_.erase(receive_stream_impl);
740 RTC_CHECK(receive_stream_impl != nullptr); 730 ConfigureSync(config.sync_group);
741 ConfigureSync(receive_stream_impl->config().sync_group);
742 } 731 }
743 const VideoReceiveStream::Config& config = receive_stream_impl->config();
744 732
745 receive_side_cc_.GetRemoteBitrateEstimator(UseSendSideBwe(config)) 733 receive_side_cc_.GetRemoteBitrateEstimator(UseSendSideBwe(config))
746 ->RemoveStream(config.rtp.remote_ssrc); 734 ->RemoveStream(config.rtp.remote_ssrc);
747 735
748 UpdateAggregateNetworkState(); 736 UpdateAggregateNetworkState();
749 delete receive_stream_impl; 737 delete receive_stream_impl;
750 } 738 }
751 739
752 FlexfecReceiveStream* Call::CreateFlexfecReceiveStream( 740 FlexfecReceiveStream* Call::CreateFlexfecReceiveStream(
753 const FlexfecReceiveStream::Config& config) { 741 const FlexfecReceiveStream::Config& config) {
754 TRACE_EVENT0("webrtc", "Call::CreateFlexfecReceiveStream"); 742 TRACE_EVENT0("webrtc", "Call::CreateFlexfecReceiveStream");
755 RTC_DCHECK(configuration_thread_checker_.CalledOnValidThread()); 743 RTC_DCHECK(configuration_thread_checker_.CalledOnValidThread());
756 744
757 RecoveredPacketReceiver* recovered_packet_receiver = this; 745 RecoveredPacketReceiver* recovered_packet_receiver = this;
758 FlexfecReceiveStreamImpl* receive_stream = new FlexfecReceiveStreamImpl( 746 FlexfecReceiveStreamImpl* receive_stream = new FlexfecReceiveStreamImpl(
759 config, recovered_packet_receiver, call_stats_->rtcp_rtt_stats(), 747 config, recovered_packet_receiver, call_stats_->rtcp_rtt_stats(),
760 module_process_thread_.get()); 748 module_process_thread_.get());
761 749
762 { 750 {
763 WriteLockScoped write_lock(*receive_crit_); 751 WriteLockScoped write_lock(*receive_crit_);
764 752 video_rtp_demuxer_.AddSink(config.remote_ssrc, receive_stream);
765 RTC_DCHECK(flexfec_receive_streams_.find(receive_stream) ==
766 flexfec_receive_streams_.end());
767 flexfec_receive_streams_.insert(receive_stream);
768 753
769 for (auto ssrc : config.protected_media_ssrcs) 754 for (auto ssrc : config.protected_media_ssrcs)
770 flexfec_receive_ssrcs_media_.insert(std::make_pair(ssrc, receive_stream)); 755 video_rtp_demuxer_.AddSink(ssrc, receive_stream);
771
772 RTC_DCHECK(flexfec_receive_ssrcs_protection_.find(config.remote_ssrc) ==
773 flexfec_receive_ssrcs_protection_.end());
774 flexfec_receive_ssrcs_protection_[config.remote_ssrc] = receive_stream;
775 756
776 RTC_DCHECK(receive_rtp_config_.find(config.remote_ssrc) == 757 RTC_DCHECK(receive_rtp_config_.find(config.remote_ssrc) ==
777 receive_rtp_config_.end()); 758 receive_rtp_config_.end());
778 receive_rtp_config_[config.remote_ssrc] = 759 receive_rtp_config_[config.remote_ssrc] =
779 ReceiveRtpConfig(config.rtp_header_extensions, UseSendSideBwe(config)); 760 ReceiveRtpConfig(config.rtp_header_extensions, UseSendSideBwe(config));
780 } 761 }
781 762
782 // TODO(brandtr): Store config in RtcEventLog here. 763 // TODO(brandtr): Store config in RtcEventLog here.
783 764
784 return receive_stream; 765 return receive_stream;
(...skipping 11 matching lines...) Expand all
796 { 777 {
797 WriteLockScoped write_lock(*receive_crit_); 778 WriteLockScoped write_lock(*receive_crit_);
798 779
799 const FlexfecReceiveStream::Config& config = 780 const FlexfecReceiveStream::Config& config =
800 receive_stream_impl->GetConfig(); 781 receive_stream_impl->GetConfig();
801 uint32_t ssrc = config.remote_ssrc; 782 uint32_t ssrc = config.remote_ssrc;
802 receive_rtp_config_.erase(ssrc); 783 receive_rtp_config_.erase(ssrc);
803 784
804 // Remove all SSRCs pointing to the FlexfecReceiveStreamImpl to be 785 // Remove all SSRCs pointing to the FlexfecReceiveStreamImpl to be
805 // destroyed. 786 // destroyed.
806 auto prot_it = flexfec_receive_ssrcs_protection_.begin(); 787 video_rtp_demuxer_.RemoveSink(receive_stream_impl);
807 while (prot_it != flexfec_receive_ssrcs_protection_.end()) {
808 if (prot_it->second == receive_stream_impl)
809 prot_it = flexfec_receive_ssrcs_protection_.erase(prot_it);
810 else
811 ++prot_it;
812 }
813 auto media_it = flexfec_receive_ssrcs_media_.begin();
814 while (media_it != flexfec_receive_ssrcs_media_.end()) {
815 if (media_it->second == receive_stream_impl)
816 media_it = flexfec_receive_ssrcs_media_.erase(media_it);
817 else
818 ++media_it;
819 }
820
821 receive_side_cc_.GetRemoteBitrateEstimator(UseSendSideBwe(config)) 788 receive_side_cc_.GetRemoteBitrateEstimator(UseSendSideBwe(config))
822 ->RemoveStream(ssrc); 789 ->RemoveStream(ssrc);
823
824 flexfec_receive_streams_.erase(receive_stream_impl);
825 } 790 }
826 791
827 delete receive_stream_impl; 792 delete receive_stream_impl;
828 } 793 }
829 794
830 Call::Stats Call::GetStats() const { 795 Call::Stats Call::GetStats() const {
831 // TODO(solenberg): Some test cases in EndToEndTest use this from a different 796 // TODO(solenberg): Some test cases in EndToEndTest use this from a different
832 // thread. Re-enable once that is fixed. 797 // thread. Re-enable once that is fixed.
833 // RTC_DCHECK(configuration_thread_checker_.CalledOnValidThread()); 798 // RTC_DCHECK(configuration_thread_checker_.CalledOnValidThread());
834 Stats stats; 799 Stats stats;
(...skipping 66 matching lines...) Expand 10 before | Expand all | Expand 10 after
901 ReadLockScoped read_lock(*send_crit_); 866 ReadLockScoped read_lock(*send_crit_);
902 for (auto& kv : audio_send_ssrcs_) { 867 for (auto& kv : audio_send_ssrcs_) {
903 kv.second->SignalNetworkState(audio_network_state_); 868 kv.second->SignalNetworkState(audio_network_state_);
904 } 869 }
905 for (auto& kv : video_send_ssrcs_) { 870 for (auto& kv : video_send_ssrcs_) {
906 kv.second->SignalNetworkState(video_network_state_); 871 kv.second->SignalNetworkState(video_network_state_);
907 } 872 }
908 } 873 }
909 { 874 {
910 ReadLockScoped read_lock(*receive_crit_); 875 ReadLockScoped read_lock(*receive_crit_);
911 for (auto& kv : audio_receive_ssrcs_) { 876 for (AudioReceiveStream* audio_receive_stream : audio_receive_streams_) {
912 kv.second->SignalNetworkState(audio_network_state_); 877 audio_receive_stream->SignalNetworkState(audio_network_state_);
913 } 878 }
914 for (auto& kv : video_receive_ssrcs_) { 879 for (VideoReceiveStream* video_receive_stream : video_receive_streams_) {
915 kv.second->SignalNetworkState(video_network_state_); 880 video_receive_stream->SignalNetworkState(video_network_state_);
916 } 881 }
917 } 882 }
918 } 883 }
919 884
920 void Call::OnTransportOverheadChanged(MediaType media, 885 void Call::OnTransportOverheadChanged(MediaType media,
921 int transport_overhead_per_packet) { 886 int transport_overhead_per_packet) {
922 switch (media) { 887 switch (media) {
923 case MediaType::AUDIO: { 888 case MediaType::AUDIO: {
924 ReadLockScoped read_lock(*send_crit_); 889 ReadLockScoped read_lock(*send_crit_);
925 for (auto& kv : audio_send_ssrcs_) { 890 for (auto& kv : audio_send_ssrcs_) {
(...skipping 61 matching lines...) Expand 10 before | Expand all | Expand 10 after
987 bool have_video = false; 952 bool have_video = false;
988 { 953 {
989 ReadLockScoped read_lock(*send_crit_); 954 ReadLockScoped read_lock(*send_crit_);
990 if (audio_send_ssrcs_.size() > 0) 955 if (audio_send_ssrcs_.size() > 0)
991 have_audio = true; 956 have_audio = true;
992 if (video_send_ssrcs_.size() > 0) 957 if (video_send_ssrcs_.size() > 0)
993 have_video = true; 958 have_video = true;
994 } 959 }
995 { 960 {
996 ReadLockScoped read_lock(*receive_crit_); 961 ReadLockScoped read_lock(*receive_crit_);
997 if (audio_receive_ssrcs_.size() > 0) 962 if (audio_receive_streams_.size() > 0)
998 have_audio = true; 963 have_audio = true;
999 if (video_receive_ssrcs_.size() > 0) 964 if (video_receive_streams_.size() > 0)
1000 have_video = true; 965 have_video = true;
1001 } 966 }
1002 967
1003 NetworkState aggregate_state = kNetworkDown; 968 NetworkState aggregate_state = kNetworkDown;
1004 if ((have_video && video_network_state_ == kNetworkUp) || 969 if ((have_video && video_network_state_ == kNetworkUp) ||
1005 (have_audio && audio_network_state_ == kNetworkUp)) { 970 (have_audio && audio_network_state_ == kNetworkUp)) {
1006 aggregate_state = kNetworkUp; 971 aggregate_state = kNetworkUp;
1007 } 972 }
1008 973
1009 LOG(LS_INFO) << "UpdateAggregateNetworkState: aggregate_state=" 974 LOG(LS_INFO) << "UpdateAggregateNetworkState: aggregate_state="
(...skipping 70 matching lines...) Expand 10 before | Expand all | Expand 10 after
1080 if (sync_group.empty()) 1045 if (sync_group.empty())
1081 return; 1046 return;
1082 1047
1083 AudioReceiveStream* sync_audio_stream = nullptr; 1048 AudioReceiveStream* sync_audio_stream = nullptr;
1084 // Find existing audio stream. 1049 // Find existing audio stream.
1085 const auto it = sync_stream_mapping_.find(sync_group); 1050 const auto it = sync_stream_mapping_.find(sync_group);
1086 if (it != sync_stream_mapping_.end()) { 1051 if (it != sync_stream_mapping_.end()) {
1087 sync_audio_stream = it->second; 1052 sync_audio_stream = it->second;
1088 } else { 1053 } else {
1089 // No configured audio stream, see if we can find one. 1054 // No configured audio stream, see if we can find one.
1090 for (const auto& kv : audio_receive_ssrcs_) { 1055 for (AudioReceiveStream* stream : audio_receive_streams_) {
1091 if (kv.second->config().sync_group == sync_group) { 1056 if (stream->config().sync_group == sync_group) {
1092 if (sync_audio_stream != nullptr) { 1057 if (sync_audio_stream != nullptr) {
1093 LOG(LS_WARNING) << "Attempting to sync more than one audio stream " 1058 LOG(LS_WARNING) << "Attempting to sync more than one audio stream "
1094 "within the same sync group. This is not " 1059 "within the same sync group. This is not "
1095 "supported in the current implementation."; 1060 "supported in the current implementation.";
1096 break; 1061 break;
1097 } 1062 }
1098 sync_audio_stream = kv.second; 1063 sync_audio_stream = stream;
1099 } 1064 }
1100 } 1065 }
1101 } 1066 }
1102 if (sync_audio_stream) 1067 if (sync_audio_stream)
1103 sync_stream_mapping_[sync_group] = sync_audio_stream; 1068 sync_stream_mapping_[sync_group] = sync_audio_stream;
1104 size_t num_synced_streams = 0; 1069 size_t num_synced_streams = 0;
1105 for (VideoReceiveStream* video_stream : video_receive_streams_) { 1070 for (VideoReceiveStream* video_stream : video_receive_streams_) {
1106 if (video_stream->config().sync_group != sync_group) 1071 if (video_stream->config().sync_group != sync_group)
1107 continue; 1072 continue;
1108 ++num_synced_streams; 1073 ++num_synced_streams;
(...skipping 29 matching lines...) Expand all
1138 bool rtcp_delivered = false; 1103 bool rtcp_delivered = false;
1139 if (media_type == MediaType::ANY || media_type == MediaType::VIDEO) { 1104 if (media_type == MediaType::ANY || media_type == MediaType::VIDEO) {
1140 ReadLockScoped read_lock(*receive_crit_); 1105 ReadLockScoped read_lock(*receive_crit_);
1141 for (VideoReceiveStream* stream : video_receive_streams_) { 1106 for (VideoReceiveStream* stream : video_receive_streams_) {
1142 if (stream->DeliverRtcp(packet, length)) 1107 if (stream->DeliverRtcp(packet, length))
1143 rtcp_delivered = true; 1108 rtcp_delivered = true;
1144 } 1109 }
1145 } 1110 }
1146 if (media_type == MediaType::ANY || media_type == MediaType::AUDIO) { 1111 if (media_type == MediaType::ANY || media_type == MediaType::AUDIO) {
1147 ReadLockScoped read_lock(*receive_crit_); 1112 ReadLockScoped read_lock(*receive_crit_);
1148 for (auto& kv : audio_receive_ssrcs_) { 1113 for (AudioReceiveStream* stream : audio_receive_streams_) {
1149 if (kv.second->DeliverRtcp(packet, length)) 1114 if (stream->DeliverRtcp(packet, length))
1150 rtcp_delivered = true; 1115 rtcp_delivered = true;
1151 } 1116 }
1152 } 1117 }
1153 if (media_type == MediaType::ANY || media_type == MediaType::VIDEO) { 1118 if (media_type == MediaType::ANY || media_type == MediaType::VIDEO) {
1154 ReadLockScoped read_lock(*send_crit_); 1119 ReadLockScoped read_lock(*send_crit_);
1155 for (VideoSendStream* stream : video_send_streams_) { 1120 for (VideoSendStream* stream : video_send_streams_) {
1156 if (stream->DeliverRtcp(packet, length)) 1121 if (stream->DeliverRtcp(packet, length))
1157 rtcp_delivered = true; 1122 rtcp_delivered = true;
1158 } 1123 }
1159 } 1124 }
(...skipping 23 matching lines...) Expand all
1183 // TODO(nisse): We should parse the RTP header only here, and pass 1148 // TODO(nisse): We should parse the RTP header only here, and pass
1184 // on parsed_packet to the receive streams. 1149 // on parsed_packet to the receive streams.
1185 rtc::Optional<RtpPacketReceived> parsed_packet = 1150 rtc::Optional<RtpPacketReceived> parsed_packet =
1186 ParseRtpPacket(packet, length, packet_time); 1151 ParseRtpPacket(packet, length, packet_time);
1187 1152
1188 if (!parsed_packet) 1153 if (!parsed_packet)
1189 return DELIVERY_PACKET_ERROR; 1154 return DELIVERY_PACKET_ERROR;
1190 1155
1191 NotifyBweOfReceivedPacket(*parsed_packet, media_type); 1156 NotifyBweOfReceivedPacket(*parsed_packet, media_type);
1192 1157
1193 uint32_t ssrc = parsed_packet->Ssrc();
1194
1195 if (media_type == MediaType::AUDIO) { 1158 if (media_type == MediaType::AUDIO) {
1196 auto it = audio_receive_ssrcs_.find(ssrc); 1159 if (audio_rtp_demuxer_.OnRtpPacket(*parsed_packet)) {
1197 if (it != audio_receive_ssrcs_.end()) {
1198 received_bytes_per_second_counter_.Add(static_cast<int>(length)); 1160 received_bytes_per_second_counter_.Add(static_cast<int>(length));
1199 received_audio_bytes_per_second_counter_.Add(static_cast<int>(length)); 1161 received_audio_bytes_per_second_counter_.Add(static_cast<int>(length));
1200 it->second->OnRtpPacket(*parsed_packet);
1201 event_log_->LogRtpHeader(kIncomingPacket, media_type, packet, length); 1162 event_log_->LogRtpHeader(kIncomingPacket, media_type, packet, length);
1202 return DELIVERY_OK; 1163 return DELIVERY_OK;
1203 } 1164 }
1204 } 1165 } else if (media_type == MediaType::VIDEO) {
1205 if (media_type == MediaType::VIDEO) { 1166 if (video_rtp_demuxer_.OnRtpPacket(*parsed_packet)) {
1206 auto it = video_receive_ssrcs_.find(ssrc);
1207 if (it != video_receive_ssrcs_.end()) {
1208 received_bytes_per_second_counter_.Add(static_cast<int>(length)); 1167 received_bytes_per_second_counter_.Add(static_cast<int>(length));
1209 received_video_bytes_per_second_counter_.Add(static_cast<int>(length)); 1168 received_video_bytes_per_second_counter_.Add(static_cast<int>(length));
1210 it->second->OnRtpPacket(*parsed_packet);
1211
1212 // Deliver media packets to FlexFEC subsystem.
1213 auto it_bounds = flexfec_receive_ssrcs_media_.equal_range(ssrc);
1214 for (auto it = it_bounds.first; it != it_bounds.second; ++it)
1215 it->second->OnRtpPacket(*parsed_packet);
1216
1217 event_log_->LogRtpHeader(kIncomingPacket, media_type, packet, length); 1169 event_log_->LogRtpHeader(kIncomingPacket, media_type, packet, length);
1218 return DELIVERY_OK; 1170 return DELIVERY_OK;
1219 } 1171 }
1220 }
1221 if (media_type == MediaType::VIDEO) {
1222 received_bytes_per_second_counter_.Add(static_cast<int>(length));
1223 // TODO(brandtr): Update here when FlexFEC supports protecting audio.
1224 received_video_bytes_per_second_counter_.Add(static_cast<int>(length));
1225 auto it = flexfec_receive_ssrcs_protection_.find(ssrc);
1226 if (it != flexfec_receive_ssrcs_protection_.end()) {
1227 it->second->OnRtpPacket(*parsed_packet);
1228 event_log_->LogRtpHeader(kIncomingPacket, media_type, packet, length);
1229 return DELIVERY_OK;
1230 }
1231 } 1172 }
1232 return DELIVERY_UNKNOWN_SSRC; 1173 return DELIVERY_UNKNOWN_SSRC;
1233 } 1174 }
1234 1175
1235 PacketReceiver::DeliveryStatus Call::DeliverPacket( 1176 PacketReceiver::DeliveryStatus Call::DeliverPacket(
1236 MediaType media_type, 1177 MediaType media_type,
1237 const uint8_t* packet, 1178 const uint8_t* packet,
1238 size_t length, 1179 size_t length,
1239 const PacketTime& packet_time) { 1180 const PacketTime& packet_time) {
1240 // TODO(solenberg): Tests call this function on a network thread, libjingle 1181 // TODO(solenberg): Tests call this function on a network thread, libjingle
1241 // calls on the worker thread. We should move towards always using a network 1182 // calls on the worker thread. We should move towards always using a network
1242 // thread. Then this check can be enabled. 1183 // thread. Then this check can be enabled.
1243 // RTC_DCHECK(!configuration_thread_checker_.CalledOnValidThread()); 1184 // RTC_DCHECK(!configuration_thread_checker_.CalledOnValidThread());
1244 if (RtpHeaderParser::IsRtcp(packet, length)) 1185 if (RtpHeaderParser::IsRtcp(packet, length))
1245 return DeliverRtcp(media_type, packet, length); 1186 return DeliverRtcp(media_type, packet, length);
1246 1187
1247 return DeliverRtp(media_type, packet, length, packet_time); 1188 return DeliverRtp(media_type, packet, length, packet_time);
1248 } 1189 }
1249 1190
1250 // TODO(brandtr): Update this member function when we support protecting 1191 // TODO(brandtr): Update this member function when we support protecting
1251 // audio packets with FlexFEC. 1192 // audio packets with FlexFEC.
1252 bool Call::OnRecoveredPacket(const uint8_t* packet, size_t length) { 1193 bool Call::OnRecoveredPacket(const uint8_t* packet, size_t length) {
1194 #if 0
1253 uint32_t ssrc = ByteReader<uint32_t>::ReadBigEndian(&packet[8]); 1195 uint32_t ssrc = ByteReader<uint32_t>::ReadBigEndian(&packet[8]);
1254 ReadLockScoped read_lock(*receive_crit_); 1196 ReadLockScoped read_lock(*receive_crit_);
1255 auto it = video_receive_ssrcs_.find(ssrc); 1197 auto it = video_receive_ssrcs_.find(ssrc);
1256 if (it == video_receive_ssrcs_.end()) 1198 if (it == video_receive_ssrcs_.end())
1257 return false; 1199 return false;
1258 return it->second->OnRecoveredPacket(packet, length); 1200 return it->second->OnRecoveredPacket(packet, length);
1201 #else
1202 // TODO(nisse): How should we handle this? It might make sense to
1203 // parse packets here, add a "recovered" flag to RtpPacketReceived,
1204 // and then just pass it on to video_rtp_demuxer_.OnRtpPacket?
Taylor Brandstetter 2017/05/10 20:59:43 That makes sense. Or just have an "OnRecoveredRtpP
1205 return false;
1206 #endif
1259 } 1207 }
1260 1208
1261 void Call::NotifyBweOfReceivedPacket(const RtpPacketReceived& packet, 1209 void Call::NotifyBweOfReceivedPacket(const RtpPacketReceived& packet,
1262 MediaType media_type) { 1210 MediaType media_type) {
1263 auto it = receive_rtp_config_.find(packet.Ssrc()); 1211 auto it = receive_rtp_config_.find(packet.Ssrc());
1264 bool use_send_side_bwe = 1212 bool use_send_side_bwe =
1265 (it != receive_rtp_config_.end()) && it->second.use_send_side_bwe; 1213 (it != receive_rtp_config_.end()) && it->second.use_send_side_bwe;
1266 1214
1267 RTPHeader header; 1215 RTPHeader header;
1268 packet.GetHeader(&header); 1216 packet.GetHeader(&header);
(...skipping 13 matching lines...) Expand all
1282 (use_send_side_bwe && header.extension.hasTransportSequenceNumber)) { 1230 (use_send_side_bwe && header.extension.hasTransportSequenceNumber)) {
1283 receive_side_cc_.OnReceivedPacket( 1231 receive_side_cc_.OnReceivedPacket(
1284 packet.arrival_time_ms(), packet.payload_size() + packet.padding_size(), 1232 packet.arrival_time_ms(), packet.payload_size() + packet.padding_size(),
1285 header); 1233 header);
1286 } 1234 }
1287 } 1235 }
1288 1236
1289 } // namespace internal 1237 } // namespace internal
1290 1238
1291 } // namespace webrtc 1239 } // namespace webrtc
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698