| OLD | NEW |
| 1 /* | 1 /* |
| 2 * Copyright (c) 2012 The WebRTC project authors. All Rights Reserved. | 2 * Copyright (c) 2012 The WebRTC project authors. All Rights Reserved. |
| 3 * | 3 * |
| 4 * Use of this source code is governed by a BSD-style license | 4 * Use of this source code is governed by a BSD-style license |
| 5 * that can be found in the LICENSE file in the root of the source | 5 * that can be found in the LICENSE file in the root of the source |
| 6 * tree. An additional intellectual property rights grant can be found | 6 * tree. An additional intellectual property rights grant can be found |
| 7 * in the file PATENTS. All contributing project authors may | 7 * in the file PATENTS. All contributing project authors may |
| 8 * be found in the AUTHORS file in the root of the source tree. | 8 * be found in the AUTHORS file in the root of the source tree. |
| 9 */ | 9 */ |
| 10 | 10 |
| (...skipping 14 matching lines...) Expand all Loading... |
| 25 namespace { | 25 namespace { |
| 26 // Time limit in milliseconds between packet bursts. | 26 // Time limit in milliseconds between packet bursts. |
| 27 const int64_t kMinPacketLimitMs = 5; | 27 const int64_t kMinPacketLimitMs = 5; |
| 28 | 28 |
| 29 // Upper cap on process interval, in case process has not been called in a long | 29 // Upper cap on process interval, in case process has not been called in a long |
| 30 // time. | 30 // time. |
| 31 const int64_t kMaxIntervalTimeMs = 30; | 31 const int64_t kMaxIntervalTimeMs = 30; |
| 32 | 32 |
| 33 } // namespace | 33 } // namespace |
| 34 | 34 |
| 35 // TODO(sprang): Move at least PacketQueue and MediaBudget out to separate |
| 36 // files, so that we can more easily test them. |
| 37 |
| 35 namespace webrtc { | 38 namespace webrtc { |
| 36 namespace paced_sender { | 39 namespace paced_sender { |
| 37 struct Packet { | 40 struct Packet { |
| 38 Packet(RtpPacketSender::Priority priority, | 41 Packet(RtpPacketSender::Priority priority, |
| 39 uint32_t ssrc, | 42 uint32_t ssrc, |
| 40 uint16_t seq_number, | 43 uint16_t seq_number, |
| 41 int64_t capture_time_ms, | 44 int64_t capture_time_ms, |
| 42 int64_t enqueue_time_ms, | 45 int64_t enqueue_time_ms, |
| 43 size_t length_in_bytes, | 46 size_t length_in_bytes, |
| 44 bool retransmission, | 47 bool retransmission, |
| (...skipping 41 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 86 class PacketQueue { | 89 class PacketQueue { |
| 87 public: | 90 public: |
| 88 explicit PacketQueue(Clock* clock) | 91 explicit PacketQueue(Clock* clock) |
| 89 : bytes_(0), | 92 : bytes_(0), |
| 90 clock_(clock), | 93 clock_(clock), |
| 91 queue_time_sum_(0), | 94 queue_time_sum_(0), |
| 92 time_last_updated_(clock_->TimeInMilliseconds()) {} | 95 time_last_updated_(clock_->TimeInMilliseconds()) {} |
| 93 virtual ~PacketQueue() {} | 96 virtual ~PacketQueue() {} |
| 94 | 97 |
| 95 void Push(const Packet& packet) { | 98 void Push(const Packet& packet) { |
| 96 if (!AddToDupeSet(packet)) { | 99 if (!AddToDupeSet(packet)) |
| 97 return; | 100 return; |
| 98 } | 101 |
| 102 UpdateQueueTime(packet.enqueue_time_ms); |
| 103 |
| 99 // Store packet in list, use pointers in priority queue for cheaper moves. | 104 // Store packet in list, use pointers in priority queue for cheaper moves. |
| 100 // Packets have a handle to its own iterator in the list, for easy removal | 105 // Packets have a handle to its own iterator in the list, for easy removal |
| 101 // when popping from queue. | 106 // when popping from queue. |
| 102 packet_list_.push_front(packet); | 107 packet_list_.push_front(packet); |
| 103 std::list<Packet>::iterator it = packet_list_.begin(); | 108 std::list<Packet>::iterator it = packet_list_.begin(); |
| 104 it->this_it = it; // Handle for direct removal from list. | 109 it->this_it = it; // Handle for direct removal from list. |
| 105 prio_queue_.push(&(*it)); // Pointer into list. | 110 prio_queue_.push(&(*it)); // Pointer into list. |
| 106 bytes_ += packet.bytes; | 111 bytes_ += packet.bytes; |
| 107 } | 112 } |
| 108 | 113 |
| 109 const Packet& BeginPop() { | 114 const Packet& BeginPop() { |
| 110 const Packet& packet = *prio_queue_.top(); | 115 const Packet& packet = *prio_queue_.top(); |
| 111 prio_queue_.pop(); | 116 prio_queue_.pop(); |
| 112 return packet; | 117 return packet; |
| 113 } | 118 } |
| 114 | 119 |
| 115 void CancelPop(const Packet& packet) { prio_queue_.push(&(*packet.this_it)); } | 120 void CancelPop(const Packet& packet) { prio_queue_.push(&(*packet.this_it)); } |
| 116 | 121 |
| 117 void FinalizePop(const Packet& packet) { | 122 void FinalizePop(const Packet& packet) { |
| 118 RemoveFromDupeSet(packet); | 123 RemoveFromDupeSet(packet); |
| 119 bytes_ -= packet.bytes; | 124 bytes_ -= packet.bytes; |
| 120 queue_time_sum_ -= (time_last_updated_ - packet.enqueue_time_ms); | 125 queue_time_sum_ -= (time_last_updated_ - packet.enqueue_time_ms); |
| 121 packet_list_.erase(packet.this_it); | 126 packet_list_.erase(packet.this_it); |
| 127 RTC_DCHECK_EQ(packet_list_.size(), prio_queue_.size()); |
| 128 if (packet_list_.empty()) |
| 129 RTC_DCHECK_EQ(0u, queue_time_sum_); |
| 122 } | 130 } |
| 123 | 131 |
| 124 bool Empty() const { return prio_queue_.empty(); } | 132 bool Empty() const { return prio_queue_.empty(); } |
| 125 | 133 |
| 126 size_t SizeInPackets() const { return prio_queue_.size(); } | 134 size_t SizeInPackets() const { return prio_queue_.size(); } |
| 127 | 135 |
| 128 uint64_t SizeInBytes() const { return bytes_; } | 136 uint64_t SizeInBytes() const { return bytes_; } |
| 129 | 137 |
| 130 int64_t OldestEnqueueTimeMs() const { | 138 int64_t OldestEnqueueTimeMs() const { |
| 131 auto it = packet_list_.rbegin(); | 139 auto it = packet_list_.rbegin(); |
| 132 if (it == packet_list_.rend()) | 140 if (it == packet_list_.rend()) |
| 133 return 0; | 141 return 0; |
| 134 return it->enqueue_time_ms; | 142 return it->enqueue_time_ms; |
| 135 } | 143 } |
| 136 | 144 |
| 137 int64_t AverageQueueTimeMs() { | 145 void UpdateQueueTime(int64_t timestamp_ms) { |
| 138 int64_t now = clock_->TimeInMilliseconds(); | 146 RTC_DCHECK_GE(timestamp_ms, time_last_updated_); |
| 139 RTC_DCHECK_GE(now, time_last_updated_); | 147 int64_t delta = timestamp_ms - time_last_updated_; |
| 140 int64_t delta = now - time_last_updated_; | 148 // Use packet packet_list_.size() not prio_queue_.size() here, as there |
| 141 queue_time_sum_ += delta * prio_queue_.size(); | 149 // might be an outstanding element popped from prio_queue_ currently in the |
| 142 time_last_updated_ = now; | 150 // SendPacket() call, while packet_list_ will always be correct. |
| 143 return queue_time_sum_ / prio_queue_.size(); | 151 queue_time_sum_ += delta * packet_list_.size(); |
| 152 time_last_updated_ = timestamp_ms; |
| 153 } |
| 154 |
| 155 int64_t AverageQueueTimeMs() const { |
| 156 if (prio_queue_.empty()) |
| 157 return 0; |
| 158 return queue_time_sum_ / packet_list_.size(); |
| 144 } | 159 } |
| 145 | 160 |
| 146 private: | 161 private: |
| 147 // Try to add a packet to the set of ssrc/seqno identifiers currently in the | 162 // Try to add a packet to the set of ssrc/seqno identifiers currently in the |
| 148 // queue. Return true if inserted, false if this is a duplicate. | 163 // queue. Return true if inserted, false if this is a duplicate. |
| 149 bool AddToDupeSet(const Packet& packet) { | 164 bool AddToDupeSet(const Packet& packet) { |
| 150 SsrcSeqNoMap::iterator it = dupe_map_.find(packet.ssrc); | 165 SsrcSeqNoMap::iterator it = dupe_map_.find(packet.ssrc); |
| 151 if (it == dupe_map_.end()) { | 166 if (it == dupe_map_.end()) { |
| 152 // First for this ssrc, just insert. | 167 // First for this ssrc, just insert. |
| 153 dupe_map_[packet.ssrc].insert(packet.sequence_number); | 168 dupe_map_[packet.ssrc].insert(packet.sequence_number); |
| (...skipping 129 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 283 uint16_t sequence_number, | 298 uint16_t sequence_number, |
| 284 int64_t capture_time_ms, | 299 int64_t capture_time_ms, |
| 285 size_t bytes, | 300 size_t bytes, |
| 286 bool retransmission) { | 301 bool retransmission) { |
| 287 CriticalSectionScoped cs(critsect_.get()); | 302 CriticalSectionScoped cs(critsect_.get()); |
| 288 | 303 |
| 289 if (probing_enabled_ && !prober_->IsProbing()) | 304 if (probing_enabled_ && !prober_->IsProbing()) |
| 290 prober_->SetEnabled(true); | 305 prober_->SetEnabled(true); |
| 291 prober_->MaybeInitializeProbe(bitrate_bps_); | 306 prober_->MaybeInitializeProbe(bitrate_bps_); |
| 292 | 307 |
| 308 int64_t now_ms = clock_->TimeInMilliseconds(); |
| 293 if (capture_time_ms < 0) | 309 if (capture_time_ms < 0) |
| 294 capture_time_ms = clock_->TimeInMilliseconds(); | 310 capture_time_ms = now_ms; |
| 295 | 311 |
| 296 packets_->Push(paced_sender::Packet( | 312 packets_->Push(paced_sender::Packet(priority, ssrc, sequence_number, |
| 297 priority, ssrc, sequence_number, capture_time_ms, | 313 capture_time_ms, now_ms, bytes, |
| 298 clock_->TimeInMilliseconds(), bytes, retransmission, packet_counter_++)); | 314 retransmission, packet_counter_++)); |
| 299 } | 315 } |
| 300 | 316 |
| 301 int64_t PacedSender::ExpectedQueueTimeMs() const { | 317 int64_t PacedSender::ExpectedQueueTimeMs() const { |
| 302 CriticalSectionScoped cs(critsect_.get()); | 318 CriticalSectionScoped cs(critsect_.get()); |
| 303 RTC_DCHECK_GT(max_bitrate_kbps_, 0); | 319 RTC_DCHECK_GT(max_bitrate_kbps_, 0); |
| 304 return static_cast<int64_t>(packets_->SizeInBytes() * 8 / max_bitrate_kbps_); | 320 return static_cast<int64_t>(packets_->SizeInBytes() * 8 / max_bitrate_kbps_); |
| 305 } | 321 } |
| 306 | 322 |
| 307 size_t PacedSender::QueueSizePackets() const { | 323 size_t PacedSender::QueueSizePackets() const { |
| 308 CriticalSectionScoped cs(critsect_.get()); | 324 CriticalSectionScoped cs(critsect_.get()); |
| 309 return packets_->SizeInPackets(); | 325 return packets_->SizeInPackets(); |
| 310 } | 326 } |
| 311 | 327 |
| 312 int64_t PacedSender::QueueInMs() const { | 328 int64_t PacedSender::QueueInMs() const { |
| 313 CriticalSectionScoped cs(critsect_.get()); | 329 CriticalSectionScoped cs(critsect_.get()); |
| 314 | 330 |
| 315 int64_t oldest_packet = packets_->OldestEnqueueTimeMs(); | 331 int64_t oldest_packet = packets_->OldestEnqueueTimeMs(); |
| 316 if (oldest_packet == 0) | 332 if (oldest_packet == 0) |
| 317 return 0; | 333 return 0; |
| 318 | 334 |
| 319 return clock_->TimeInMilliseconds() - oldest_packet; | 335 return clock_->TimeInMilliseconds() - oldest_packet; |
| 320 } | 336 } |
| 321 | 337 |
| 338 int64_t PacedSender::AverageQueueTimeMs() { |
| 339 CriticalSectionScoped cs(critsect_.get()); |
| 340 packets_->UpdateQueueTime(clock_->TimeInMilliseconds()); |
| 341 return packets_->AverageQueueTimeMs(); |
| 342 } |
| 343 |
| 322 int64_t PacedSender::TimeUntilNextProcess() { | 344 int64_t PacedSender::TimeUntilNextProcess() { |
| 323 CriticalSectionScoped cs(critsect_.get()); | 345 CriticalSectionScoped cs(critsect_.get()); |
| 324 if (prober_->IsProbing()) { | 346 if (prober_->IsProbing()) { |
| 325 int64_t ret = prober_->TimeUntilNextProbe(clock_->TimeInMilliseconds()); | 347 int64_t ret = prober_->TimeUntilNextProbe(clock_->TimeInMilliseconds()); |
| 326 if (ret >= 0) | 348 if (ret >= 0) |
| 327 return ret; | 349 return ret; |
| 328 } | 350 } |
| 329 int64_t elapsed_time_us = clock_->TimeInMicroseconds() - time_last_update_us_; | 351 int64_t elapsed_time_us = clock_->TimeInMicroseconds() - time_last_update_us_; |
| 330 int64_t elapsed_time_ms = (elapsed_time_us + 500) / 1000; | 352 int64_t elapsed_time_ms = (elapsed_time_us + 500) / 1000; |
| 331 return std::max<int64_t>(kMinPacketLimitMs - elapsed_time_ms, 0); | 353 return std::max<int64_t>(kMinPacketLimitMs - elapsed_time_ms, 0); |
| 332 } | 354 } |
| 333 | 355 |
| 334 int32_t PacedSender::Process() { | 356 int32_t PacedSender::Process() { |
| 335 int64_t now_us = clock_->TimeInMicroseconds(); | 357 int64_t now_us = clock_->TimeInMicroseconds(); |
| 336 CriticalSectionScoped cs(critsect_.get()); | 358 CriticalSectionScoped cs(critsect_.get()); |
| 337 int64_t elapsed_time_ms = (now_us - time_last_update_us_ + 500) / 1000; | 359 int64_t elapsed_time_ms = (now_us - time_last_update_us_ + 500) / 1000; |
| 338 time_last_update_us_ = now_us; | 360 time_last_update_us_ = now_us; |
| 339 if (paused_) | 361 if (paused_) |
| 340 return 0; | 362 return 0; |
| 341 int target_bitrate_kbps = max_bitrate_kbps_; | 363 int target_bitrate_kbps = max_bitrate_kbps_; |
| 342 if (elapsed_time_ms > 0) { | 364 if (elapsed_time_ms > 0) { |
| 343 size_t queue_size_bytes = packets_->SizeInBytes(); | 365 size_t queue_size_bytes = packets_->SizeInBytes(); |
| 344 if (queue_size_bytes > 0) { | 366 if (queue_size_bytes > 0) { |
| 345 // Assuming equal size packets and input/output rate, the average packet | 367 // Assuming equal size packets and input/output rate, the average packet |
| 346 // has avg_time_left_ms left to get queue_size_bytes out of the queue, if | 368 // has avg_time_left_ms left to get queue_size_bytes out of the queue, if |
| 347 // time constraint shall be met. Determine bitrate needed for that. | 369 // time constraint shall be met. Determine bitrate needed for that. |
| 370 packets_->UpdateQueueTime(clock_->TimeInMilliseconds()); |
| 348 int64_t avg_time_left_ms = std::max<int64_t>( | 371 int64_t avg_time_left_ms = std::max<int64_t>( |
| 349 1, kMaxQueueLengthMs - packets_->AverageQueueTimeMs()); | 372 1, kMaxQueueLengthMs - packets_->AverageQueueTimeMs()); |
| 350 int min_bitrate_needed_kbps = | 373 int min_bitrate_needed_kbps = |
| 351 static_cast<int>(queue_size_bytes * 8 / avg_time_left_ms); | 374 static_cast<int>(queue_size_bytes * 8 / avg_time_left_ms); |
| 352 if (min_bitrate_needed_kbps > target_bitrate_kbps) | 375 if (min_bitrate_needed_kbps > target_bitrate_kbps) |
| 353 target_bitrate_kbps = min_bitrate_needed_kbps; | 376 target_bitrate_kbps = min_bitrate_needed_kbps; |
| 354 } | 377 } |
| 355 | 378 |
| 356 media_budget_->set_target_rate_kbps(target_bitrate_kbps); | 379 media_budget_->set_target_rate_kbps(target_bitrate_kbps); |
| 357 | 380 |
| (...skipping 63 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 421 media_budget_->UseBudget(bytes_sent); | 444 media_budget_->UseBudget(bytes_sent); |
| 422 padding_budget_->UseBudget(bytes_sent); | 445 padding_budget_->UseBudget(bytes_sent); |
| 423 } | 446 } |
| 424 } | 447 } |
| 425 | 448 |
| 426 void PacedSender::UpdateBytesPerInterval(int64_t delta_time_ms) { | 449 void PacedSender::UpdateBytesPerInterval(int64_t delta_time_ms) { |
| 427 media_budget_->IncreaseBudget(delta_time_ms); | 450 media_budget_->IncreaseBudget(delta_time_ms); |
| 428 padding_budget_->IncreaseBudget(delta_time_ms); | 451 padding_budget_->IncreaseBudget(delta_time_ms); |
| 429 } | 452 } |
| 430 } // namespace webrtc | 453 } // namespace webrtc |
| OLD | NEW |