| Index: webrtc/modules/pacing/paced_sender.cc
|
| diff --git a/webrtc/modules/pacing/paced_sender.cc b/webrtc/modules/pacing/paced_sender.cc
|
| index 3c299f827ee1654b4bd6aa011d5abd2393a95aee..e38405a6a24faa71145f89eb36b88a6b4c988493 100644
|
| --- a/webrtc/modules/pacing/paced_sender.cc
|
| +++ b/webrtc/modules/pacing/paced_sender.cc
|
| @@ -10,12 +10,11 @@
|
|
|
| #include "webrtc/modules/pacing/paced_sender.h"
|
|
|
| -#include <assert.h>
|
| -
|
| #include <map>
|
| #include <queue>
|
| #include <set>
|
|
|
| +#include "webrtc/base/checks.h"
|
| #include "webrtc/modules/include/module_common_types.h"
|
| #include "webrtc/modules/pacing/bitrate_prober.h"
|
| #include "webrtc/system_wrappers/include/clock.h"
|
| @@ -86,7 +85,11 @@ struct Comparator {
|
| // Class encapsulating a priority queue with some extensions.
|
| class PacketQueue {
|
| public:
|
| - PacketQueue() : bytes_(0) {}
|
| + explicit PacketQueue(Clock* clock)
|
| + : bytes_(0),
|
| + clock_(clock),
|
| + queue_time_sum_(0),
|
| + time_last_updated_(clock_->TimeInMilliseconds()) {}
|
| virtual ~PacketQueue() {}
|
|
|
| void Push(const Packet& packet) {
|
| @@ -114,6 +117,7 @@ class PacketQueue {
|
| void FinalizePop(const Packet& packet) {
|
| RemoveFromDupeSet(packet);
|
| bytes_ -= packet.bytes;
|
| + queue_time_sum_ -= (time_last_updated_ - packet.enqueue_time_ms);
|
| packet_list_.erase(packet.this_it);
|
| }
|
|
|
| @@ -123,13 +127,22 @@ class PacketQueue {
|
|
|
| uint64_t SizeInBytes() const { return bytes_; }
|
|
|
| - int64_t OldestEnqueueTime() const {
|
| - std::list<Packet>::const_reverse_iterator it = packet_list_.rbegin();
|
| + int64_t OldestEnqueueTimeMs() const {
|
| + auto it = packet_list_.rbegin();
|
| if (it == packet_list_.rend())
|
| return 0;
|
| return it->enqueue_time_ms;
|
| }
|
|
|
| + int64_t AverageQueueTimeMs() {
|
| + int64_t now = clock_->TimeInMilliseconds();
|
| + RTC_DCHECK_GE(now, time_last_updated_);
|
| + int64_t delta = now - time_last_updated_;
|
| + queue_time_sum_ += delta * prio_queue_.size();
|
| + time_last_updated_ = now;
|
| + return queue_time_sum_ / prio_queue_.size();
|
| + }
|
| +
|
| private:
|
| // Try to add a packet to the set of ssrc/seqno identifiers currently in the
|
| // queue. Return true if inserted, false if this is a duplicate.
|
| @@ -147,7 +160,7 @@ class PacketQueue {
|
|
|
| void RemoveFromDupeSet(const Packet& packet) {
|
| SsrcSeqNoMap::iterator it = dupe_map_.find(packet.ssrc);
|
| - assert(it != dupe_map_.end());
|
| + RTC_DCHECK(it != dupe_map_.end());
|
| it->second.erase(packet.sequence_number);
|
| if (it->second.empty()) {
|
| dupe_map_.erase(it);
|
| @@ -165,6 +178,9 @@ class PacketQueue {
|
| // Map<ssrc, set<seq_no> >, for checking duplicates.
|
| typedef std::map<uint32_t, std::set<uint16_t> > SsrcSeqNoMap;
|
| SsrcSeqNoMap dupe_map_;
|
| + Clock* const clock_;
|
| + int64_t queue_time_sum_;
|
| + int64_t time_last_updated_;
|
| };
|
|
|
| class IntervalBudget {
|
| @@ -209,6 +225,7 @@ class IntervalBudget {
|
| };
|
| } // namespace paced_sender
|
|
|
| +const int64_t PacedSender::kMaxQueueLengthMs = 2000;
|
| const float PacedSender::kDefaultPaceMultiplier = 2.5f;
|
|
|
| PacedSender::PacedSender(Clock* clock,
|
| @@ -225,8 +242,9 @@ PacedSender::PacedSender(Clock* clock,
|
| padding_budget_(new paced_sender::IntervalBudget(min_bitrate_kbps)),
|
| prober_(new BitrateProber()),
|
| bitrate_bps_(1000 * bitrate_kbps),
|
| + max_bitrate_kbps_(max_bitrate_kbps),
|
| time_last_update_us_(clock->TimeInMicroseconds()),
|
| - packets_(new paced_sender::PacketQueue()),
|
| + packets_(new paced_sender::PacketQueue(clock)),
|
| packet_counter_(0) {
|
| UpdateBytesPerInterval(kMinPacketLimitMs);
|
| }
|
| @@ -244,7 +262,7 @@ void PacedSender::Resume() {
|
| }
|
|
|
| void PacedSender::SetProbingEnabled(bool enabled) {
|
| - assert(packet_counter_ == 0);
|
| + RTC_CHECK_EQ(0u, packet_counter_);
|
| probing_enabled_ = enabled;
|
| }
|
|
|
| @@ -252,9 +270,12 @@ void PacedSender::UpdateBitrate(int bitrate_kbps,
|
| int max_bitrate_kbps,
|
| int min_bitrate_kbps) {
|
| CriticalSectionScoped cs(critsect_.get());
|
| - media_budget_->set_target_rate_kbps(max_bitrate_kbps);
|
| + // Don't set media bitrate here as it may be boosted in order to meet max
|
| + // queue time constraint. Just update max_bitrate_kbps_ and let media_budget_
|
| + // be updated in Process().
|
| padding_budget_->set_target_rate_kbps(min_bitrate_kbps);
|
| bitrate_bps_ = 1000 * bitrate_kbps;
|
| + max_bitrate_kbps_ = max_bitrate_kbps;
|
| }
|
|
|
| void PacedSender::InsertPacket(RtpPacketSender::Priority priority,
|
| @@ -265,14 +286,12 @@ void PacedSender::InsertPacket(RtpPacketSender::Priority priority,
|
| bool retransmission) {
|
| CriticalSectionScoped cs(critsect_.get());
|
|
|
| - if (probing_enabled_ && !prober_->IsProbing()) {
|
| + if (probing_enabled_ && !prober_->IsProbing())
|
| prober_->SetEnabled(true);
|
| - }
|
| prober_->MaybeInitializeProbe(bitrate_bps_);
|
|
|
| - if (capture_time_ms < 0) {
|
| + if (capture_time_ms < 0)
|
| capture_time_ms = clock_->TimeInMilliseconds();
|
| - }
|
|
|
| packets_->Push(paced_sender::Packet(
|
| priority, ssrc, sequence_number, capture_time_ms,
|
| @@ -281,9 +300,8 @@ void PacedSender::InsertPacket(RtpPacketSender::Priority priority,
|
|
|
| int64_t PacedSender::ExpectedQueueTimeMs() const {
|
| CriticalSectionScoped cs(critsect_.get());
|
| - int target_rate = media_budget_->target_rate_kbps();
|
| - assert(target_rate > 0);
|
| - return static_cast<int64_t>(packets_->SizeInBytes() * 8 / target_rate);
|
| + RTC_DCHECK_GT(max_bitrate_kbps_, 0);
|
| + return static_cast<int64_t>(packets_->SizeInBytes() * 8 / max_bitrate_kbps_);
|
| }
|
|
|
| size_t PacedSender::QueueSizePackets() const {
|
| @@ -294,7 +312,7 @@ size_t PacedSender::QueueSizePackets() const {
|
| int64_t PacedSender::QueueInMs() const {
|
| CriticalSectionScoped cs(critsect_.get());
|
|
|
| - int64_t oldest_packet = packets_->OldestEnqueueTime();
|
| + int64_t oldest_packet = packets_->OldestEnqueueTimeMs();
|
| if (oldest_packet == 0)
|
| return 0;
|
|
|
| @@ -305,9 +323,8 @@ int64_t PacedSender::TimeUntilNextProcess() {
|
| CriticalSectionScoped cs(critsect_.get());
|
| if (prober_->IsProbing()) {
|
| int64_t ret = prober_->TimeUntilNextProbe(clock_->TimeInMilliseconds());
|
| - if (ret >= 0) {
|
| + if (ret >= 0)
|
| return ret;
|
| - }
|
| }
|
| int64_t elapsed_time_us = clock_->TimeInMicroseconds() - time_last_update_us_;
|
| int64_t elapsed_time_ms = (elapsed_time_us + 500) / 1000;
|
| @@ -321,14 +338,29 @@ int32_t PacedSender::Process() {
|
| time_last_update_us_ = now_us;
|
| if (paused_)
|
| return 0;
|
| + int target_bitrate_kbps = max_bitrate_kbps_;
|
| if (elapsed_time_ms > 0) {
|
| + size_t queue_size_bytes = packets_->SizeInBytes();
|
| + if (queue_size_bytes > 0) {
|
| + // Assuming equal size packets and input/output rate, the average packet
|
| + // has avg_time_left_ms left to get queue_size_bytes out of the queue, if
|
| + // time constraint shall be met. Determine bitrate needed for that.
|
| + int64_t avg_time_left_ms = std::max<int64_t>(
|
| + 1, kMaxQueueLengthMs - packets_->AverageQueueTimeMs());
|
| + int min_bitrate_needed_kbps =
|
| + static_cast<int>(queue_size_bytes * 8 / avg_time_left_ms);
|
| + if (min_bitrate_needed_kbps > target_bitrate_kbps)
|
| + target_bitrate_kbps = min_bitrate_needed_kbps;
|
| + }
|
| +
|
| + media_budget_->set_target_rate_kbps(target_bitrate_kbps);
|
| +
|
| int64_t delta_time_ms = std::min(kMaxIntervalTimeMs, elapsed_time_ms);
|
| UpdateBytesPerInterval(delta_time_ms);
|
| }
|
| while (!packets_->Empty()) {
|
| - if (media_budget_->bytes_remaining() == 0 && !prober_->IsProbing()) {
|
| + if (media_budget_->bytes_remaining() == 0 && !prober_->IsProbing())
|
| return 0;
|
| - }
|
|
|
| // Since we need to release the lock in order to send, we first pop the
|
| // element from the priority queue but keep it in storage, so that we can
|
| @@ -337,9 +369,8 @@ int32_t PacedSender::Process() {
|
| if (SendPacket(packet)) {
|
| // Send succeeded, remove it from the queue.
|
| packets_->FinalizePop(packet);
|
| - if (prober_->IsProbing()) {
|
| + if (prober_->IsProbing())
|
| return 0;
|
| - }
|
| } else {
|
| // Send failed, put it back into the queue.
|
| packets_->CancelPop(packet);
|
| @@ -351,10 +382,11 @@ int32_t PacedSender::Process() {
|
| return 0;
|
|
|
| size_t padding_needed;
|
| - if (prober_->IsProbing())
|
| + if (prober_->IsProbing()) {
|
| padding_needed = prober_->RecommendedPacketSize();
|
| - else
|
| + } else {
|
| padding_needed = padding_budget_->bytes_remaining();
|
| + }
|
|
|
| if (padding_needed > 0)
|
| SendPadding(static_cast<size_t>(padding_needed));
|
|
|