Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(116)

Side by Side Diff: webrtc/modules/pacing/paced_sender.cc

Issue 2918323002: Add functionality which limits the number of bytes on the network. (Closed)
Patch Set: . Created 3 years, 5 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 /* 1 /*
2 * Copyright (c) 2012 The WebRTC project authors. All Rights Reserved. 2 * Copyright (c) 2012 The WebRTC project authors. All Rights Reserved.
3 * 3 *
4 * Use of this source code is governed by a BSD-style license 4 * Use of this source code is governed by a BSD-style license
5 * that can be found in the LICENSE file in the root of the source 5 * that can be found in the LICENSE file in the root of the source
6 * tree. An additional intellectual property rights grant can be found 6 * tree. An additional intellectual property rights grant can be found
7 * in the file PATENTS. All contributing project authors may 7 * in the file PATENTS. All contributing project authors may
8 * be found in the AUTHORS file in the root of the source tree. 8 * be found in the AUTHORS file in the root of the source tree.
9 */ 9 */
10 10
(...skipping 10 matching lines...) Expand all
21 #include "webrtc/modules/pacing/bitrate_prober.h" 21 #include "webrtc/modules/pacing/bitrate_prober.h"
22 #include "webrtc/modules/utility/include/process_thread.h" 22 #include "webrtc/modules/utility/include/process_thread.h"
23 #include "webrtc/rtc_base/checks.h" 23 #include "webrtc/rtc_base/checks.h"
24 #include "webrtc/rtc_base/logging.h" 24 #include "webrtc/rtc_base/logging.h"
25 #include "webrtc/system_wrappers/include/clock.h" 25 #include "webrtc/system_wrappers/include/clock.h"
26 #include "webrtc/system_wrappers/include/field_trial.h" 26 #include "webrtc/system_wrappers/include/field_trial.h"
27 27
28 namespace { 28 namespace {
29 // Time limit in milliseconds between packet bursts. 29 // Time limit in milliseconds between packet bursts.
30 const int64_t kMinPacketLimitMs = 5; 30 const int64_t kMinPacketLimitMs = 5;
31 const int64_t kPausedPacketIntervalMs = 500;
31 32
32 // Upper cap on process interval, in case process has not been called in a long 33 // Upper cap on process interval, in case process has not been called in a long
33 // time. 34 // time.
34 const int64_t kMaxIntervalTimeMs = 30; 35 const int64_t kMaxIntervalTimeMs = 30;
35 36
36 } // namespace 37 } // namespace
37 38
38 // TODO(sprang): Move at least PacketQueue and MediaBudget out to separate 39 // TODO(sprang): Move at least PacketQueue and MediaBudget out to separate
39 // files, so that we can more easily test them. 40 // files, so that we can more easily test them.
40 41
(...skipping 230 matching lines...) Expand 10 before | Expand all | Expand 10 after
271 } 272 }
272 273
273 PacedSender::~PacedSender() {} 274 PacedSender::~PacedSender() {}
274 275
275 void PacedSender::CreateProbeCluster(int bitrate_bps) { 276 void PacedSender::CreateProbeCluster(int bitrate_bps) {
276 rtc::CritScope cs(&critsect_); 277 rtc::CritScope cs(&critsect_);
277 prober_->CreateProbeCluster(bitrate_bps, clock_->TimeInMilliseconds()); 278 prober_->CreateProbeCluster(bitrate_bps, clock_->TimeInMilliseconds());
278 } 279 }
279 280
280 void PacedSender::Pause() { 281 void PacedSender::Pause() {
281 LOG(LS_INFO) << "PacedSender paused.";
282 { 282 {
283 rtc::CritScope cs(&critsect_); 283 rtc::CritScope cs(&critsect_);
284 if (!paused_)
285 LOG(LS_INFO) << "PacedSender paused.";
284 paused_ = true; 286 paused_ = true;
285 } 287 }
286 // Tell the process thread to call our TimeUntilNextProcess() method to get 288 // Tell the process thread to call our TimeUntilNextProcess() method to get
287 // a new (longer) estimate for when to call Process(). 289 // a new (longer) estimate for when to call Process().
288 if (process_thread_) 290 if (process_thread_)
289 process_thread_->WakeUp(this); 291 process_thread_->WakeUp(this);
290 } 292 }
291 293
292 void PacedSender::Resume() { 294 void PacedSender::Resume() {
293 LOG(LS_INFO) << "PacedSender resumed.";
294 { 295 {
295 rtc::CritScope cs(&critsect_); 296 rtc::CritScope cs(&critsect_);
297 if (paused_)
298 LOG(LS_INFO) << "PacedSender resumed.";
296 paused_ = false; 299 paused_ = false;
297 } 300 }
298 // Tell the process thread to call our TimeUntilNextProcess() method to 301 // Tell the process thread to call our TimeUntilNextProcess() method to
299 // refresh the estimate for when to call Process(). 302 // refresh the estimate for when to call Process().
300 if (process_thread_) 303 if (process_thread_)
301 process_thread_->WakeUp(this); 304 process_thread_->WakeUp(this);
302 } 305 }
303 306
304 void PacedSender::SetProbingEnabled(bool enabled) { 307 void PacedSender::SetProbingEnabled(bool enabled) {
305 RTC_CHECK_EQ(0, packet_counter_); 308 RTC_CHECK_EQ(0, packet_counter_);
(...skipping 81 matching lines...) Expand 10 before | Expand all | Expand 10 after
387 } 390 }
388 391
389 int64_t PacedSender::AverageQueueTimeMs() { 392 int64_t PacedSender::AverageQueueTimeMs() {
390 rtc::CritScope cs(&critsect_); 393 rtc::CritScope cs(&critsect_);
391 packets_->UpdateQueueTime(clock_->TimeInMilliseconds()); 394 packets_->UpdateQueueTime(clock_->TimeInMilliseconds());
392 return packets_->AverageQueueTimeMs(); 395 return packets_->AverageQueueTimeMs();
393 } 396 }
394 397
395 int64_t PacedSender::TimeUntilNextProcess() { 398 int64_t PacedSender::TimeUntilNextProcess() {
396 rtc::CritScope cs(&critsect_); 399 rtc::CritScope cs(&critsect_);
400 int64_t elapsed_time_us = clock_->TimeInMicroseconds() - time_last_update_us_;
401 int64_t elapsed_time_ms = (elapsed_time_us + 500) / 1000;
402 // When paused we wake up every 500 ms to send a padding packet to ensure
403 // we won't get stuck in the paused state due to no feedback being received.
397 if (paused_) 404 if (paused_)
398 return 1000 * 60 * 60; 405 return std::max<int64_t>(kPausedPacketIntervalMs - elapsed_time_ms, 0);
399 406
400 if (prober_->IsProbing()) { 407 if (prober_->IsProbing()) {
401 int64_t ret = prober_->TimeUntilNextProbe(clock_->TimeInMilliseconds()); 408 int64_t ret = prober_->TimeUntilNextProbe(clock_->TimeInMilliseconds());
402 if (ret > 0 || (ret == 0 && !probing_send_failure_)) 409 if (ret > 0 || (ret == 0 && !probing_send_failure_))
403 return ret; 410 return ret;
404 } 411 }
405 int64_t elapsed_time_us = clock_->TimeInMicroseconds() - time_last_update_us_;
406 int64_t elapsed_time_ms = (elapsed_time_us + 500) / 1000;
407 return std::max<int64_t>(kMinPacketLimitMs - elapsed_time_ms, 0); 412 return std::max<int64_t>(kMinPacketLimitMs - elapsed_time_ms, 0);
408 } 413 }
409 414
410 void PacedSender::Process() { 415 void PacedSender::Process() {
411 int64_t now_us = clock_->TimeInMicroseconds(); 416 int64_t now_us = clock_->TimeInMicroseconds();
412 rtc::CritScope cs(&critsect_); 417 rtc::CritScope cs(&critsect_);
413 int64_t elapsed_time_ms = (now_us - time_last_update_us_ + 500) / 1000; 418 int64_t elapsed_time_ms = (now_us - time_last_update_us_ + 500) / 1000;
414 time_last_update_us_ = now_us;
415 int target_bitrate_kbps = pacing_bitrate_kbps_; 419 int target_bitrate_kbps = pacing_bitrate_kbps_;
416 if (!paused_ && elapsed_time_ms > 0) { 420
421 if (paused_) {
422 PacedPacketInfo pacing_info;
423 size_t bytes_sent = SendPadding(1, pacing_info);
philipel 2017/08/02 15:35:16 Is there a way to completely stop the pacer, not j
stefan-webrtc 2017/08/04 09:50:00 We could, but the problem is that if we lose some
424 alr_detector_->OnBytesSent(bytes_sent, now_us / 1000);
425 time_last_update_us_ = now_us;
426 return;
427 }
428
429 if (elapsed_time_ms > 0) {
417 size_t queue_size_bytes = packets_->SizeInBytes(); 430 size_t queue_size_bytes = packets_->SizeInBytes();
418 if (queue_size_bytes > 0) { 431 if (queue_size_bytes > 0) {
419 // Assuming equal size packets and input/output rate, the average packet 432 // Assuming equal size packets and input/output rate, the average packet
420 // has avg_time_left_ms left to get queue_size_bytes out of the queue, if 433 // has avg_time_left_ms left to get queue_size_bytes out of the queue, if
421 // time constraint shall be met. Determine bitrate needed for that. 434 // time constraint shall be met. Determine bitrate needed for that.
422 packets_->UpdateQueueTime(clock_->TimeInMilliseconds()); 435 packets_->UpdateQueueTime(clock_->TimeInMilliseconds());
423 int64_t avg_time_left_ms = std::max<int64_t>( 436 int64_t avg_time_left_ms = std::max<int64_t>(
424 1, queue_time_limit - packets_->AverageQueueTimeMs()); 437 1, queue_time_limit - packets_->AverageQueueTimeMs());
425 int min_bitrate_needed_kbps = 438 int min_bitrate_needed_kbps =
426 static_cast<int>(queue_size_bytes * 8 / avg_time_left_ms); 439 static_cast<int>(queue_size_bytes * 8 / avg_time_left_ms);
427 if (min_bitrate_needed_kbps > target_bitrate_kbps) 440 if (min_bitrate_needed_kbps > target_bitrate_kbps)
428 target_bitrate_kbps = min_bitrate_needed_kbps; 441 target_bitrate_kbps = min_bitrate_needed_kbps;
429 } 442 }
430 443
431 media_budget_->set_target_rate_kbps(target_bitrate_kbps); 444 media_budget_->set_target_rate_kbps(target_bitrate_kbps);
432 445
433 elapsed_time_ms = std::min(kMaxIntervalTimeMs, elapsed_time_ms); 446 elapsed_time_ms = std::min(kMaxIntervalTimeMs, elapsed_time_ms);
434 UpdateBudgetWithElapsedTime(elapsed_time_ms); 447 UpdateBudgetWithElapsedTime(elapsed_time_ms);
435 } 448 }
436 449
450 time_last_update_us_ = now_us;
451
437 bool is_probing = prober_->IsProbing(); 452 bool is_probing = prober_->IsProbing();
438 PacedPacketInfo pacing_info; 453 PacedPacketInfo pacing_info;
439 size_t bytes_sent = 0; 454 size_t bytes_sent = 0;
440 size_t recommended_probe_size = 0; 455 size_t recommended_probe_size = 0;
441 if (is_probing) { 456 if (is_probing) {
442 pacing_info = prober_->CurrentCluster(); 457 pacing_info = prober_->CurrentCluster();
443 recommended_probe_size = prober_->RecommendedMinProbeSize(); 458 recommended_probe_size = prober_->RecommendedMinProbeSize();
444 } 459 }
445 while (!packets_->Empty()) { 460 while (!packets_->Empty()) {
446 // Since we need to release the lock in order to send, we first pop the 461 // Since we need to release the lock in order to send, we first pop the
447 // element from the priority queue but keep it in storage, so that we can 462 // element from the priority queue but keep it in storage, so that we can
448 // reinsert it if send fails. 463 // reinsert it if send fails.
449 const paced_sender::Packet& packet = packets_->BeginPop(); 464 const paced_sender::Packet& packet = packets_->BeginPop();
450 465
451 if (SendPacket(packet, pacing_info)) { 466 if (SendPacket(packet, pacing_info)) {
452 // Send succeeded, remove it from the queue. 467 // Send succeeded, remove it from the queue.
453 if (first_sent_packet_ms_ == -1) 468 if (first_sent_packet_ms_ == -1)
454 first_sent_packet_ms_ = clock_->TimeInMilliseconds(); 469 first_sent_packet_ms_ = clock_->TimeInMilliseconds();
455 bytes_sent += packet.bytes; 470 bytes_sent += packet.bytes;
456 packets_->FinalizePop(packet); 471 packets_->FinalizePop(packet);
457 if (is_probing && bytes_sent > recommended_probe_size) 472 if (is_probing && bytes_sent > recommended_probe_size)
458 break; 473 break;
459 } else { 474 } else {
460 // Send failed, put it back into the queue. 475 // Send failed, put it back into the queue.
461 packets_->CancelPop(packet); 476 packets_->CancelPop(packet);
462 break; 477 break;
463 } 478 }
464 } 479 }
465 480
466 if (packets_->Empty() && !paused_) { 481 if (packets_->Empty()) {
philipel 2017/08/02 15:35:16 Is this correct? I assume you want to be able to
stefan-webrtc 2017/08/04 09:50:00 Why would we do that if there are packets in the q
philipel 2017/08/04 14:13:04 Ah, you're right.
467 // We can not send padding unless a normal packet has first been sent. If we 482 int padding_needed =
468 // do, timestamps get messed up. 483 static_cast<int>(is_probing ? (recommended_probe_size - bytes_sent)
469 if (packet_counter_ > 0) { 484 : padding_budget_->bytes_remaining());
470 int padding_needed = 485 if (padding_needed > 0)
471 static_cast<int>(is_probing ? (recommended_probe_size - bytes_sent) 486 bytes_sent += SendPadding(padding_needed, pacing_info);
472 : padding_budget_->bytes_remaining());
473
474 if (padding_needed > 0)
475 bytes_sent += SendPadding(padding_needed, pacing_info);
476 }
477 } 487 }
478 if (is_probing) { 488 if (is_probing) {
479 probing_send_failure_ = bytes_sent == 0; 489 probing_send_failure_ = bytes_sent == 0;
480 if (!probing_send_failure_) 490 if (!probing_send_failure_)
481 prober_->ProbeSent(clock_->TimeInMilliseconds(), bytes_sent); 491 prober_->ProbeSent(clock_->TimeInMilliseconds(), bytes_sent);
482 } 492 }
483 alr_detector_->OnBytesSent(bytes_sent, now_us / 1000); 493 alr_detector_->OnBytesSent(bytes_sent, now_us / 1000);
484 } 494 }
485 495
486 void PacedSender::ProcessThreadAttached(ProcessThread* process_thread) { 496 void PacedSender::ProcessThreadAttached(ProcessThread* process_thread) {
487 LOG(LS_INFO) << "ProcessThreadAttached 0x" << std::hex << process_thread; 497 LOG(LS_INFO) << "ProcessThreadAttached 0x" << std::hex << process_thread;
488 process_thread_ = process_thread; 498 process_thread_ = process_thread;
489 } 499 }
490 500
491 bool PacedSender::SendPacket(const paced_sender::Packet& packet, 501 bool PacedSender::SendPacket(const paced_sender::Packet& packet,
492 const PacedPacketInfo& pacing_info) { 502 const PacedPacketInfo& pacing_info) {
493 if (paused_)
494 return false;
philipel 2017/08/02 15:35:16 Make it into an RTC_DCHECK
stefan-webrtc 2017/08/04 09:50:00 Done.
495 if (media_budget_->bytes_remaining() == 0 && 503 if (media_budget_->bytes_remaining() == 0 &&
496 pacing_info.probe_cluster_id == PacedPacketInfo::kNotAProbe) { 504 pacing_info.probe_cluster_id == PacedPacketInfo::kNotAProbe) {
497 return false; 505 return false;
498 } 506 }
499 507
500 critsect_.Leave(); 508 critsect_.Leave();
501 const bool success = packet_sender_->TimeToSendPacket( 509 const bool success = packet_sender_->TimeToSendPacket(
502 packet.ssrc, packet.sequence_number, packet.capture_time_ms, 510 packet.ssrc, packet.sequence_number, packet.capture_time_ms,
503 packet.retransmission, pacing_info); 511 packet.retransmission, pacing_info);
504 critsect_.Enter(); 512 critsect_.Enter();
505 513
506 if (success) { 514 if (success) {
507 // TODO(holmer): High priority packets should only be accounted for if we 515 // TODO(holmer): High priority packets should only be accounted for if we
508 // are allocating bandwidth for audio. 516 // are allocating bandwidth for audio.
509 if (packet.priority != kHighPriority) { 517 if (packet.priority != kHighPriority) {
510 // Update media bytes sent. 518 // Update media bytes sent.
511 UpdateBudgetWithBytesSent(packet.bytes); 519 UpdateBudgetWithBytesSent(packet.bytes);
512 } 520 }
513 } 521 }
514 522
515 return success; 523 return success;
516 } 524 }
517 525
518 size_t PacedSender::SendPadding(size_t padding_needed, 526 size_t PacedSender::SendPadding(size_t padding_needed,
519 const PacedPacketInfo& pacing_info) { 527 const PacedPacketInfo& pacing_info) {
528 // We can not send padding unless a normal packet has first been sent. If we
529 // do, timestamps get messed up.
530 if (packet_counter_ == 0)
philipel 2017/08/02 15:35:16 I think it's somewhat clearer to keep this check w
stefan-webrtc 2017/08/04 09:50:00 Then the code will have to be duplicated since we'
philipel 2017/08/04 14:13:04 I still think that would be preferable.
531 return 0;
520 critsect_.Leave(); 532 critsect_.Leave();
521 size_t bytes_sent = 533 size_t bytes_sent =
522 packet_sender_->TimeToSendPadding(padding_needed, pacing_info); 534 packet_sender_->TimeToSendPadding(padding_needed, pacing_info);
523 critsect_.Enter(); 535 critsect_.Enter();
524 536
525 if (bytes_sent > 0) { 537 if (bytes_sent > 0) {
526 UpdateBudgetWithBytesSent(bytes_sent); 538 UpdateBudgetWithBytesSent(bytes_sent);
527 } 539 }
528 return bytes_sent; 540 return bytes_sent;
529 } 541 }
(...skipping 12 matching lines...) Expand all
542 rtc::CritScope cs(&critsect_); 554 rtc::CritScope cs(&critsect_);
543 pacing_factor_ = pacing_factor; 555 pacing_factor_ = pacing_factor;
544 } 556 }
545 557
546 void PacedSender::SetQueueTimeLimit(int limit_ms) { 558 void PacedSender::SetQueueTimeLimit(int limit_ms) {
547 rtc::CritScope cs(&critsect_); 559 rtc::CritScope cs(&critsect_);
548 queue_time_limit = limit_ms; 560 queue_time_limit = limit_ms;
549 } 561 }
550 562
551 } // namespace webrtc 563 } // namespace webrtc
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698