Index: webrtc/modules/pacing/paced_sender.cc |
diff --git a/webrtc/modules/pacing/paced_sender.cc b/webrtc/modules/pacing/paced_sender.cc |
index e38405a6a24faa71145f89eb36b88a6b4c988493..dcdf64e97e9bd8cfdf0e80891d2ef991354e08dd 100644 |
--- a/webrtc/modules/pacing/paced_sender.cc |
+++ b/webrtc/modules/pacing/paced_sender.cc |
@@ -32,6 +32,9 @@ const int64_t kMaxIntervalTimeMs = 30; |
} // namespace |
+// TODO(sprang): Move at least PacketQueue and MediaBudget out to separate |
+// files, so that we can more easily test them. |
+ |
namespace webrtc { |
namespace paced_sender { |
struct Packet { |
@@ -93,9 +96,11 @@ class PacketQueue { |
virtual ~PacketQueue() {} |
void Push(const Packet& packet) { |
- if (!AddToDupeSet(packet)) { |
+ if (!AddToDupeSet(packet)) |
return; |
- } |
+ |
+ UpdateQueueTime(packet.enqueue_time_ms); |
+ |
// Store packet in list, use pointers in priority queue for cheaper moves. |
// Packets have a handle to its own iterator in the list, for easy removal |
// when popping from queue. |
@@ -119,6 +124,9 @@ class PacketQueue { |
bytes_ -= packet.bytes; |
queue_time_sum_ -= (time_last_updated_ - packet.enqueue_time_ms); |
packet_list_.erase(packet.this_it); |
+ RTC_DCHECK_EQ(packet_list_.size(), prio_queue_.size()); |
+ if (packet_list_.empty()) |
+ RTC_DCHECK_EQ(0u, queue_time_sum_); |
} |
bool Empty() const { return prio_queue_.empty(); } |
@@ -134,13 +142,20 @@ class PacketQueue { |
return it->enqueue_time_ms; |
} |
- int64_t AverageQueueTimeMs() { |
- int64_t now = clock_->TimeInMilliseconds(); |
- RTC_DCHECK_GE(now, time_last_updated_); |
- int64_t delta = now - time_last_updated_; |
- queue_time_sum_ += delta * prio_queue_.size(); |
- time_last_updated_ = now; |
- return queue_time_sum_ / prio_queue_.size(); |
+ void UpdateQueueTime(int64_t timestamp_ms) { |
+ RTC_DCHECK_GE(timestamp_ms, time_last_updated_); |
+ int64_t delta = timestamp_ms - time_last_updated_; |
+ // Use packet packet_list_.size() not prio_queue_.size() here, as there |
+ // might be an outstanding element popped from prio_queue_ currently in the |
+ // SendPacket() call, while packet_list_ will always be correct. |
+ queue_time_sum_ += delta * packet_list_.size(); |
+ time_last_updated_ = timestamp_ms; |
+ } |
+ |
+ int64_t AverageQueueTimeMs() const { |
+ if (prio_queue_.empty()) |
+ return 0; |
+ return queue_time_sum_ / packet_list_.size(); |
} |
private: |
@@ -290,12 +305,13 @@ void PacedSender::InsertPacket(RtpPacketSender::Priority priority, |
prober_->SetEnabled(true); |
prober_->MaybeInitializeProbe(bitrate_bps_); |
+ int64_t now_ms = clock_->TimeInMilliseconds(); |
if (capture_time_ms < 0) |
- capture_time_ms = clock_->TimeInMilliseconds(); |
+ capture_time_ms = now_ms; |
- packets_->Push(paced_sender::Packet( |
- priority, ssrc, sequence_number, capture_time_ms, |
- clock_->TimeInMilliseconds(), bytes, retransmission, packet_counter_++)); |
+ packets_->Push(paced_sender::Packet(priority, ssrc, sequence_number, |
+ capture_time_ms, now_ms, bytes, |
+ retransmission, packet_counter_++)); |
} |
int64_t PacedSender::ExpectedQueueTimeMs() const { |
@@ -319,6 +335,12 @@ int64_t PacedSender::QueueInMs() const { |
return clock_->TimeInMilliseconds() - oldest_packet; |
} |
+int64_t PacedSender::AverageQueueTimeMs() { |
+ CriticalSectionScoped cs(critsect_.get()); |
+ packets_->UpdateQueueTime(clock_->TimeInMilliseconds()); |
+ return packets_->AverageQueueTimeMs(); |
+} |
+ |
int64_t PacedSender::TimeUntilNextProcess() { |
CriticalSectionScoped cs(critsect_.get()); |
if (prober_->IsProbing()) { |
@@ -345,6 +367,7 @@ int32_t PacedSender::Process() { |
// Assuming equal size packets and input/output rate, the average packet |
// has avg_time_left_ms left to get queue_size_bytes out of the queue, if |
// time constraint shall be met. Determine bitrate needed for that. |
+ packets_->UpdateQueueTime(clock_->TimeInMilliseconds()); |
int64_t avg_time_left_ms = std::max<int64_t>( |
1, kMaxQueueLengthMs - packets_->AverageQueueTimeMs()); |
int min_bitrate_needed_kbps = |