Index: webrtc/modules/pacing/paced_sender.cc |
diff --git a/webrtc/modules/pacing/paced_sender.cc b/webrtc/modules/pacing/paced_sender.cc |
index 3c299f827ee1654b4bd6aa011d5abd2393a95aee..e38405a6a24faa71145f89eb36b88a6b4c988493 100644 |
--- a/webrtc/modules/pacing/paced_sender.cc |
+++ b/webrtc/modules/pacing/paced_sender.cc |
@@ -10,12 +10,11 @@ |
#include "webrtc/modules/pacing/paced_sender.h" |
-#include <assert.h> |
- |
#include <map> |
#include <queue> |
#include <set> |
+#include "webrtc/base/checks.h" |
#include "webrtc/modules/include/module_common_types.h" |
#include "webrtc/modules/pacing/bitrate_prober.h" |
#include "webrtc/system_wrappers/include/clock.h" |
@@ -86,7 +85,11 @@ struct Comparator { |
// Class encapsulating a priority queue with some extensions. |
class PacketQueue { |
public: |
- PacketQueue() : bytes_(0) {} |
+ explicit PacketQueue(Clock* clock) |
+ : bytes_(0), |
+ clock_(clock), |
+ queue_time_sum_(0), |
+ time_last_updated_(clock_->TimeInMilliseconds()) {} |
virtual ~PacketQueue() {} |
void Push(const Packet& packet) { |
@@ -114,6 +117,7 @@ class PacketQueue { |
void FinalizePop(const Packet& packet) { |
RemoveFromDupeSet(packet); |
bytes_ -= packet.bytes; |
+ queue_time_sum_ -= (time_last_updated_ - packet.enqueue_time_ms); |
packet_list_.erase(packet.this_it); |
} |
@@ -123,13 +127,22 @@ class PacketQueue { |
uint64_t SizeInBytes() const { return bytes_; } |
- int64_t OldestEnqueueTime() const { |
- std::list<Packet>::const_reverse_iterator it = packet_list_.rbegin(); |
+ int64_t OldestEnqueueTimeMs() const { |
+ auto it = packet_list_.rbegin(); |
if (it == packet_list_.rend()) |
return 0; |
return it->enqueue_time_ms; |
} |
+ int64_t AverageQueueTimeMs() { |
+ int64_t now = clock_->TimeInMilliseconds(); |
+ RTC_DCHECK_GE(now, time_last_updated_); |
+ int64_t delta = now - time_last_updated_; |
+ queue_time_sum_ += delta * prio_queue_.size(); |
+ time_last_updated_ = now; |
+ return queue_time_sum_ / prio_queue_.size(); |
+ } |
+ |
private: |
// Try to add a packet to the set of ssrc/seqno identifiers currently in the |
// queue. Return true if inserted, false if this is a duplicate. |
@@ -147,7 +160,7 @@ class PacketQueue { |
void RemoveFromDupeSet(const Packet& packet) { |
SsrcSeqNoMap::iterator it = dupe_map_.find(packet.ssrc); |
- assert(it != dupe_map_.end()); |
+ RTC_DCHECK(it != dupe_map_.end()); |
it->second.erase(packet.sequence_number); |
if (it->second.empty()) { |
dupe_map_.erase(it); |
@@ -165,6 +178,9 @@ class PacketQueue { |
// Map<ssrc, set<seq_no> >, for checking duplicates. |
typedef std::map<uint32_t, std::set<uint16_t> > SsrcSeqNoMap; |
SsrcSeqNoMap dupe_map_; |
+ Clock* const clock_; |
+ int64_t queue_time_sum_; |
+ int64_t time_last_updated_; |
}; |
class IntervalBudget { |
@@ -209,6 +225,7 @@ class IntervalBudget { |
}; |
} // namespace paced_sender |
+const int64_t PacedSender::kMaxQueueLengthMs = 2000; |
const float PacedSender::kDefaultPaceMultiplier = 2.5f; |
PacedSender::PacedSender(Clock* clock, |
@@ -225,8 +242,9 @@ PacedSender::PacedSender(Clock* clock, |
padding_budget_(new paced_sender::IntervalBudget(min_bitrate_kbps)), |
prober_(new BitrateProber()), |
bitrate_bps_(1000 * bitrate_kbps), |
+ max_bitrate_kbps_(max_bitrate_kbps), |
time_last_update_us_(clock->TimeInMicroseconds()), |
- packets_(new paced_sender::PacketQueue()), |
+ packets_(new paced_sender::PacketQueue(clock)), |
packet_counter_(0) { |
UpdateBytesPerInterval(kMinPacketLimitMs); |
} |
@@ -244,7 +262,7 @@ void PacedSender::Resume() { |
} |
void PacedSender::SetProbingEnabled(bool enabled) { |
- assert(packet_counter_ == 0); |
+ RTC_CHECK_EQ(0u, packet_counter_); |
probing_enabled_ = enabled; |
} |
@@ -252,9 +270,12 @@ void PacedSender::UpdateBitrate(int bitrate_kbps, |
int max_bitrate_kbps, |
int min_bitrate_kbps) { |
CriticalSectionScoped cs(critsect_.get()); |
- media_budget_->set_target_rate_kbps(max_bitrate_kbps); |
+ // Don't set media bitrate here as it may be boosted in order to meet max |
+ // queue time constraint. Just update max_bitrate_kbps_ and let media_budget_ |
+ // be updated in Process(). |
padding_budget_->set_target_rate_kbps(min_bitrate_kbps); |
bitrate_bps_ = 1000 * bitrate_kbps; |
+ max_bitrate_kbps_ = max_bitrate_kbps; |
} |
void PacedSender::InsertPacket(RtpPacketSender::Priority priority, |
@@ -265,14 +286,12 @@ void PacedSender::InsertPacket(RtpPacketSender::Priority priority, |
bool retransmission) { |
CriticalSectionScoped cs(critsect_.get()); |
- if (probing_enabled_ && !prober_->IsProbing()) { |
+ if (probing_enabled_ && !prober_->IsProbing()) |
prober_->SetEnabled(true); |
- } |
prober_->MaybeInitializeProbe(bitrate_bps_); |
- if (capture_time_ms < 0) { |
+ if (capture_time_ms < 0) |
capture_time_ms = clock_->TimeInMilliseconds(); |
- } |
packets_->Push(paced_sender::Packet( |
priority, ssrc, sequence_number, capture_time_ms, |
@@ -281,9 +300,8 @@ void PacedSender::InsertPacket(RtpPacketSender::Priority priority, |
int64_t PacedSender::ExpectedQueueTimeMs() const { |
CriticalSectionScoped cs(critsect_.get()); |
- int target_rate = media_budget_->target_rate_kbps(); |
- assert(target_rate > 0); |
- return static_cast<int64_t>(packets_->SizeInBytes() * 8 / target_rate); |
+ RTC_DCHECK_GT(max_bitrate_kbps_, 0); |
+ return static_cast<int64_t>(packets_->SizeInBytes() * 8 / max_bitrate_kbps_); |
} |
size_t PacedSender::QueueSizePackets() const { |
@@ -294,7 +312,7 @@ size_t PacedSender::QueueSizePackets() const { |
int64_t PacedSender::QueueInMs() const { |
CriticalSectionScoped cs(critsect_.get()); |
- int64_t oldest_packet = packets_->OldestEnqueueTime(); |
+ int64_t oldest_packet = packets_->OldestEnqueueTimeMs(); |
if (oldest_packet == 0) |
return 0; |
@@ -305,9 +323,8 @@ int64_t PacedSender::TimeUntilNextProcess() { |
CriticalSectionScoped cs(critsect_.get()); |
if (prober_->IsProbing()) { |
int64_t ret = prober_->TimeUntilNextProbe(clock_->TimeInMilliseconds()); |
- if (ret >= 0) { |
+ if (ret >= 0) |
return ret; |
- } |
} |
int64_t elapsed_time_us = clock_->TimeInMicroseconds() - time_last_update_us_; |
int64_t elapsed_time_ms = (elapsed_time_us + 500) / 1000; |
@@ -321,14 +338,29 @@ int32_t PacedSender::Process() { |
time_last_update_us_ = now_us; |
if (paused_) |
return 0; |
+ int target_bitrate_kbps = max_bitrate_kbps_; |
if (elapsed_time_ms > 0) { |
+ size_t queue_size_bytes = packets_->SizeInBytes(); |
+ if (queue_size_bytes > 0) { |
+ // Assuming equal size packets and input/output rate, the average packet |
+ // has avg_time_left_ms left to get queue_size_bytes out of the queue, if |
+ // time constraint shall be met. Determine bitrate needed for that. |
+ int64_t avg_time_left_ms = std::max<int64_t>( |
+ 1, kMaxQueueLengthMs - packets_->AverageQueueTimeMs()); |
+ int min_bitrate_needed_kbps = |
+ static_cast<int>(queue_size_bytes * 8 / avg_time_left_ms); |
+ if (min_bitrate_needed_kbps > target_bitrate_kbps) |
+ target_bitrate_kbps = min_bitrate_needed_kbps; |
+ } |
+ |
+ media_budget_->set_target_rate_kbps(target_bitrate_kbps); |
+ |
int64_t delta_time_ms = std::min(kMaxIntervalTimeMs, elapsed_time_ms); |
UpdateBytesPerInterval(delta_time_ms); |
} |
while (!packets_->Empty()) { |
- if (media_budget_->bytes_remaining() == 0 && !prober_->IsProbing()) { |
+ if (media_budget_->bytes_remaining() == 0 && !prober_->IsProbing()) |
return 0; |
- } |
// Since we need to release the lock in order to send, we first pop the |
// element from the priority queue but keep it in storage, so that we can |
@@ -337,9 +369,8 @@ int32_t PacedSender::Process() { |
if (SendPacket(packet)) { |
// Send succeeded, remove it from the queue. |
packets_->FinalizePop(packet); |
- if (prober_->IsProbing()) { |
+ if (prober_->IsProbing()) |
return 0; |
- } |
} else { |
// Send failed, put it back into the queue. |
packets_->CancelPop(packet); |
@@ -351,10 +382,11 @@ int32_t PacedSender::Process() { |
return 0; |
size_t padding_needed; |
- if (prober_->IsProbing()) |
+ if (prober_->IsProbing()) { |
padding_needed = prober_->RecommendedPacketSize(); |
- else |
+ } else { |
padding_needed = padding_budget_->bytes_remaining(); |
+ } |
if (padding_needed > 0) |
SendPadding(static_cast<size_t>(padding_needed)); |