Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(350)

Side by Side Diff: webrtc/modules/pacing/paced_sender.cc

Issue 2918323002: Add functionality which limits the number of bytes on the network. (Closed)
Patch Set: Switch to max-rtt within a feedback message and fix race. Created 3 years, 4 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 /* 1 /*
2 * Copyright (c) 2012 The WebRTC project authors. All Rights Reserved. 2 * Copyright (c) 2012 The WebRTC project authors. All Rights Reserved.
3 * 3 *
4 * Use of this source code is governed by a BSD-style license 4 * Use of this source code is governed by a BSD-style license
5 * that can be found in the LICENSE file in the root of the source 5 * that can be found in the LICENSE file in the root of the source
6 * tree. An additional intellectual property rights grant can be found 6 * tree. An additional intellectual property rights grant can be found
7 * in the file PATENTS. All contributing project authors may 7 * in the file PATENTS. All contributing project authors may
8 * be found in the AUTHORS file in the root of the source tree. 8 * be found in the AUTHORS file in the root of the source tree.
9 */ 9 */
10 10
(...skipping 11 matching lines...) Expand all
22 #include "webrtc/modules/pacing/interval_budget.h" 22 #include "webrtc/modules/pacing/interval_budget.h"
23 #include "webrtc/modules/utility/include/process_thread.h" 23 #include "webrtc/modules/utility/include/process_thread.h"
24 #include "webrtc/rtc_base/checks.h" 24 #include "webrtc/rtc_base/checks.h"
25 #include "webrtc/rtc_base/logging.h" 25 #include "webrtc/rtc_base/logging.h"
26 #include "webrtc/system_wrappers/include/clock.h" 26 #include "webrtc/system_wrappers/include/clock.h"
27 #include "webrtc/system_wrappers/include/field_trial.h" 27 #include "webrtc/system_wrappers/include/field_trial.h"
28 28
29 namespace { 29 namespace {
30 // Time limit in milliseconds between packet bursts. 30 // Time limit in milliseconds between packet bursts.
31 const int64_t kMinPacketLimitMs = 5; 31 const int64_t kMinPacketLimitMs = 5;
32 const int64_t kPausedPacketIntervalMs = 500;
32 33
33 // Upper cap on process interval, in case process has not been called in a long 34 // Upper cap on process interval, in case process has not been called in a long
34 // time. 35 // time.
35 const int64_t kMaxIntervalTimeMs = 30; 36 const int64_t kMaxIntervalTimeMs = 30;
36 37
37 } // namespace 38 } // namespace
38 39
39 // TODO(sprang): Move at least PacketQueue out to separate 40 // TODO(sprang): Move at least PacketQueue out to separate
40 // files, so that we can more easily test them. 41 // files, so that we can more easily test them.
41 42
(...skipping 190 matching lines...) Expand 10 before | Expand all | Expand 10 after
232 } 233 }
233 234
234 PacedSender::~PacedSender() {} 235 PacedSender::~PacedSender() {}
235 236
236 void PacedSender::CreateProbeCluster(int bitrate_bps) { 237 void PacedSender::CreateProbeCluster(int bitrate_bps) {
237 rtc::CritScope cs(&critsect_); 238 rtc::CritScope cs(&critsect_);
238 prober_->CreateProbeCluster(bitrate_bps, clock_->TimeInMilliseconds()); 239 prober_->CreateProbeCluster(bitrate_bps, clock_->TimeInMilliseconds());
239 } 240 }
240 241
241 void PacedSender::Pause() { 242 void PacedSender::Pause() {
242 LOG(LS_INFO) << "PacedSender paused.";
243 { 243 {
244 rtc::CritScope cs(&critsect_); 244 rtc::CritScope cs(&critsect_);
245 if (!paused_)
246 LOG(LS_INFO) << "PacedSender paused.";
245 paused_ = true; 247 paused_ = true;
246 } 248 }
247 // Tell the process thread to call our TimeUntilNextProcess() method to get 249 // Tell the process thread to call our TimeUntilNextProcess() method to get
248 // a new (longer) estimate for when to call Process(). 250 // a new (longer) estimate for when to call Process().
249 if (process_thread_) 251 if (process_thread_)
250 process_thread_->WakeUp(this); 252 process_thread_->WakeUp(this);
251 } 253 }
252 254
253 void PacedSender::Resume() { 255 void PacedSender::Resume() {
254 LOG(LS_INFO) << "PacedSender resumed.";
255 { 256 {
256 rtc::CritScope cs(&critsect_); 257 rtc::CritScope cs(&critsect_);
258 if (paused_)
259 LOG(LS_INFO) << "PacedSender resumed.";
257 paused_ = false; 260 paused_ = false;
258 } 261 }
259 // Tell the process thread to call our TimeUntilNextProcess() method to 262 // Tell the process thread to call our TimeUntilNextProcess() method to
260 // refresh the estimate for when to call Process(). 263 // refresh the estimate for when to call Process().
261 if (process_thread_) 264 if (process_thread_)
262 process_thread_->WakeUp(this); 265 process_thread_->WakeUp(this);
263 } 266 }
264 267
265 void PacedSender::SetProbingEnabled(bool enabled) { 268 void PacedSender::SetProbingEnabled(bool enabled) {
266 RTC_CHECK_EQ(0, packet_counter_); 269 RTC_CHECK_EQ(0, packet_counter_);
(...skipping 81 matching lines...) Expand 10 before | Expand all | Expand 10 after
348 } 351 }
349 352
350 int64_t PacedSender::AverageQueueTimeMs() { 353 int64_t PacedSender::AverageQueueTimeMs() {
351 rtc::CritScope cs(&critsect_); 354 rtc::CritScope cs(&critsect_);
352 packets_->UpdateQueueTime(clock_->TimeInMilliseconds()); 355 packets_->UpdateQueueTime(clock_->TimeInMilliseconds());
353 return packets_->AverageQueueTimeMs(); 356 return packets_->AverageQueueTimeMs();
354 } 357 }
355 358
356 int64_t PacedSender::TimeUntilNextProcess() { 359 int64_t PacedSender::TimeUntilNextProcess() {
357 rtc::CritScope cs(&critsect_); 360 rtc::CritScope cs(&critsect_);
361 int64_t elapsed_time_us = clock_->TimeInMicroseconds() - time_last_update_us_;
362 int64_t elapsed_time_ms = (elapsed_time_us + 500) / 1000;
363 // When paused we wake up every 500 ms to send a padding packet to ensure
364 // we won't get stuck in the paused state due to no feedback being received.
358 if (paused_) 365 if (paused_)
359 return 1000 * 60 * 60; 366 return std::max<int64_t>(kPausedPacketIntervalMs - elapsed_time_ms, 0);
360 367
361 if (prober_->IsProbing()) { 368 if (prober_->IsProbing()) {
362 int64_t ret = prober_->TimeUntilNextProbe(clock_->TimeInMilliseconds()); 369 int64_t ret = prober_->TimeUntilNextProbe(clock_->TimeInMilliseconds());
363 if (ret > 0 || (ret == 0 && !probing_send_failure_)) 370 if (ret > 0 || (ret == 0 && !probing_send_failure_))
364 return ret; 371 return ret;
365 } 372 }
366 int64_t elapsed_time_us = clock_->TimeInMicroseconds() - time_last_update_us_;
367 int64_t elapsed_time_ms = (elapsed_time_us + 500) / 1000;
368 return std::max<int64_t>(kMinPacketLimitMs - elapsed_time_ms, 0); 373 return std::max<int64_t>(kMinPacketLimitMs - elapsed_time_ms, 0);
369 } 374 }
370 375
371 void PacedSender::Process() { 376 void PacedSender::Process() {
372 int64_t now_us = clock_->TimeInMicroseconds(); 377 int64_t now_us = clock_->TimeInMicroseconds();
373 rtc::CritScope cs(&critsect_); 378 rtc::CritScope cs(&critsect_);
374 int64_t elapsed_time_ms = (now_us - time_last_update_us_ + 500) / 1000; 379 int64_t elapsed_time_ms = (now_us - time_last_update_us_ + 500) / 1000;
375 time_last_update_us_ = now_us;
376 int target_bitrate_kbps = pacing_bitrate_kbps_; 380 int target_bitrate_kbps = pacing_bitrate_kbps_;
377 if (!paused_ && elapsed_time_ms > 0) { 381
382 if (paused_) {
383 PacedPacketInfo pacing_info;
384 time_last_update_us_ = now_us;
385 // We can not send padding unless a normal packet has first been sent. If we
386 // do, timestamps get messed up.
387 if (packet_counter_ == 0)
388 return;
389 size_t bytes_sent = SendPadding(1, pacing_info);
390 alr_detector_->OnBytesSent(bytes_sent, now_us / 1000);
391 return;
392 }
393
394 if (elapsed_time_ms > 0) {
378 size_t queue_size_bytes = packets_->SizeInBytes(); 395 size_t queue_size_bytes = packets_->SizeInBytes();
379 if (queue_size_bytes > 0) { 396 if (queue_size_bytes > 0) {
380 // Assuming equal size packets and input/output rate, the average packet 397 // Assuming equal size packets and input/output rate, the average packet
381 // has avg_time_left_ms left to get queue_size_bytes out of the queue, if 398 // has avg_time_left_ms left to get queue_size_bytes out of the queue, if
382 // time constraint shall be met. Determine bitrate needed for that. 399 // time constraint shall be met. Determine bitrate needed for that.
383 packets_->UpdateQueueTime(clock_->TimeInMilliseconds()); 400 packets_->UpdateQueueTime(clock_->TimeInMilliseconds());
384 int64_t avg_time_left_ms = std::max<int64_t>( 401 int64_t avg_time_left_ms = std::max<int64_t>(
385 1, queue_time_limit - packets_->AverageQueueTimeMs()); 402 1, queue_time_limit - packets_->AverageQueueTimeMs());
386 int min_bitrate_needed_kbps = 403 int min_bitrate_needed_kbps =
387 static_cast<int>(queue_size_bytes * 8 / avg_time_left_ms); 404 static_cast<int>(queue_size_bytes * 8 / avg_time_left_ms);
388 if (min_bitrate_needed_kbps > target_bitrate_kbps) 405 if (min_bitrate_needed_kbps > target_bitrate_kbps)
389 target_bitrate_kbps = min_bitrate_needed_kbps; 406 target_bitrate_kbps = min_bitrate_needed_kbps;
390 } 407 }
391 408
392 media_budget_->set_target_rate_kbps(target_bitrate_kbps); 409 media_budget_->set_target_rate_kbps(target_bitrate_kbps);
393 410
394 elapsed_time_ms = std::min(kMaxIntervalTimeMs, elapsed_time_ms); 411 elapsed_time_ms = std::min(kMaxIntervalTimeMs, elapsed_time_ms);
395 UpdateBudgetWithElapsedTime(elapsed_time_ms); 412 UpdateBudgetWithElapsedTime(elapsed_time_ms);
396 } 413 }
397 414
415 time_last_update_us_ = now_us;
416
398 bool is_probing = prober_->IsProbing(); 417 bool is_probing = prober_->IsProbing();
399 PacedPacketInfo pacing_info; 418 PacedPacketInfo pacing_info;
400 size_t bytes_sent = 0; 419 size_t bytes_sent = 0;
401 size_t recommended_probe_size = 0; 420 size_t recommended_probe_size = 0;
402 if (is_probing) { 421 if (is_probing) {
403 pacing_info = prober_->CurrentCluster(); 422 pacing_info = prober_->CurrentCluster();
404 recommended_probe_size = prober_->RecommendedMinProbeSize(); 423 recommended_probe_size = prober_->RecommendedMinProbeSize();
405 } 424 }
406 while (!packets_->Empty()) { 425 while (!packets_->Empty()) {
407 // Since we need to release the lock in order to send, we first pop the 426 // Since we need to release the lock in order to send, we first pop the
408 // element from the priority queue but keep it in storage, so that we can 427 // element from the priority queue but keep it in storage, so that we can
409 // reinsert it if send fails. 428 // reinsert it if send fails.
410 const paced_sender::Packet& packet = packets_->BeginPop(); 429 const paced_sender::Packet& packet = packets_->BeginPop();
411 430
412 if (SendPacket(packet, pacing_info)) { 431 if (SendPacket(packet, pacing_info)) {
413 // Send succeeded, remove it from the queue. 432 // Send succeeded, remove it from the queue.
414 if (first_sent_packet_ms_ == -1) 433 if (first_sent_packet_ms_ == -1)
415 first_sent_packet_ms_ = clock_->TimeInMilliseconds(); 434 first_sent_packet_ms_ = clock_->TimeInMilliseconds();
416 bytes_sent += packet.bytes; 435 bytes_sent += packet.bytes;
417 packets_->FinalizePop(packet); 436 packets_->FinalizePop(packet);
418 if (is_probing && bytes_sent > recommended_probe_size) 437 if (is_probing && bytes_sent > recommended_probe_size)
419 break; 438 break;
420 } else { 439 } else {
421 // Send failed, put it back into the queue. 440 // Send failed, put it back into the queue.
422 packets_->CancelPop(packet); 441 packets_->CancelPop(packet);
423 break; 442 break;
424 } 443 }
425 } 444 }
426 445
427 if (packets_->Empty() && !paused_) { 446 if (packets_->Empty()) {
428 // We can not send padding unless a normal packet has first been sent. If we 447 // We can not send padding unless a normal packet has first been sent. If we
429 // do, timestamps get messed up. 448 // do, timestamps get messed up.
430 if (packet_counter_ > 0) { 449 if (packet_counter_ > 0) {
431 int padding_needed = 450 int padding_needed =
432 static_cast<int>(is_probing ? (recommended_probe_size - bytes_sent) 451 static_cast<int>(is_probing ? (recommended_probe_size - bytes_sent)
433 : padding_budget_->bytes_remaining()); 452 : padding_budget_->bytes_remaining());
434
435 if (padding_needed > 0) 453 if (padding_needed > 0)
436 bytes_sent += SendPadding(padding_needed, pacing_info); 454 bytes_sent += SendPadding(padding_needed, pacing_info);
437 } 455 }
438 } 456 }
439 if (is_probing) { 457 if (is_probing) {
440 probing_send_failure_ = bytes_sent == 0; 458 probing_send_failure_ = bytes_sent == 0;
441 if (!probing_send_failure_) 459 if (!probing_send_failure_)
442 prober_->ProbeSent(clock_->TimeInMilliseconds(), bytes_sent); 460 prober_->ProbeSent(clock_->TimeInMilliseconds(), bytes_sent);
443 } 461 }
444 alr_detector_->OnBytesSent(bytes_sent, elapsed_time_ms); 462 alr_detector_->OnBytesSent(bytes_sent, elapsed_time_ms);
445 } 463 }
446 464
447 void PacedSender::ProcessThreadAttached(ProcessThread* process_thread) { 465 void PacedSender::ProcessThreadAttached(ProcessThread* process_thread) {
448 LOG(LS_INFO) << "ProcessThreadAttached 0x" << std::hex << process_thread; 466 LOG(LS_INFO) << "ProcessThreadAttached 0x" << std::hex << process_thread;
449 process_thread_ = process_thread; 467 process_thread_ = process_thread;
450 } 468 }
451 469
452 bool PacedSender::SendPacket(const paced_sender::Packet& packet, 470 bool PacedSender::SendPacket(const paced_sender::Packet& packet,
453 const PacedPacketInfo& pacing_info) { 471 const PacedPacketInfo& pacing_info) {
454 if (paused_) 472 RTC_DCHECK(!paused_);
455 return false;
456 if (media_budget_->bytes_remaining() == 0 && 473 if (media_budget_->bytes_remaining() == 0 &&
457 pacing_info.probe_cluster_id == PacedPacketInfo::kNotAProbe) { 474 pacing_info.probe_cluster_id == PacedPacketInfo::kNotAProbe) {
458 return false; 475 return false;
459 } 476 }
460 477
461 critsect_.Leave(); 478 critsect_.Leave();
462 const bool success = packet_sender_->TimeToSendPacket( 479 const bool success = packet_sender_->TimeToSendPacket(
463 packet.ssrc, packet.sequence_number, packet.capture_time_ms, 480 packet.ssrc, packet.sequence_number, packet.capture_time_ms,
464 packet.retransmission, pacing_info); 481 packet.retransmission, pacing_info);
465 critsect_.Enter(); 482 critsect_.Enter();
466 483
467 if (success) { 484 if (success) {
468 // TODO(holmer): High priority packets should only be accounted for if we 485 // TODO(holmer): High priority packets should only be accounted for if we
469 // are allocating bandwidth for audio. 486 // are allocating bandwidth for audio.
470 if (packet.priority != kHighPriority) { 487 if (packet.priority != kHighPriority) {
471 // Update media bytes sent. 488 // Update media bytes sent.
472 // TODO(eladalon): TimeToSendPacket() can also return |true| in some 489 // TODO(eladalon): TimeToSendPacket() can also return |true| in some
473 // situations where nothing actually ended up being sent to the network, 490 // situations where nothing actually ended up being sent to the network,
474 // and we probably don't want to update the budget in such cases. 491 // and we probably don't want to update the budget in such cases.
475 // https://bugs.chromium.org/p/webrtc/issues/detail?id=8052 492 // https://bugs.chromium.org/p/webrtc/issues/detail?id=8052
476 UpdateBudgetWithBytesSent(packet.bytes); 493 UpdateBudgetWithBytesSent(packet.bytes);
477 } 494 }
478 } 495 }
479 496
480 return success; 497 return success;
481 } 498 }
482 499
483 size_t PacedSender::SendPadding(size_t padding_needed, 500 size_t PacedSender::SendPadding(size_t padding_needed,
484 const PacedPacketInfo& pacing_info) { 501 const PacedPacketInfo& pacing_info) {
502 RTC_DCHECK_GT(packet_counter_, 0);
485 critsect_.Leave(); 503 critsect_.Leave();
486 size_t bytes_sent = 504 size_t bytes_sent =
487 packet_sender_->TimeToSendPadding(padding_needed, pacing_info); 505 packet_sender_->TimeToSendPadding(padding_needed, pacing_info);
488 critsect_.Enter(); 506 critsect_.Enter();
489 507
490 if (bytes_sent > 0) { 508 if (bytes_sent > 0) {
491 UpdateBudgetWithBytesSent(bytes_sent); 509 UpdateBudgetWithBytesSent(bytes_sent);
492 } 510 }
493 return bytes_sent; 511 return bytes_sent;
494 } 512 }
(...skipping 12 matching lines...) Expand all
507 rtc::CritScope cs(&critsect_); 525 rtc::CritScope cs(&critsect_);
508 pacing_factor_ = pacing_factor; 526 pacing_factor_ = pacing_factor;
509 } 527 }
510 528
511 void PacedSender::SetQueueTimeLimit(int limit_ms) { 529 void PacedSender::SetQueueTimeLimit(int limit_ms) {
512 rtc::CritScope cs(&critsect_); 530 rtc::CritScope cs(&critsect_);
513 queue_time_limit = limit_ms; 531 queue_time_limit = limit_ms;
514 } 532 }
515 533
516 } // namespace webrtc 534 } // namespace webrtc
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698