Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(485)

Side by Side Diff: webrtc/modules/pacing/paced_sender.cc

Issue 2918323002: Add functionality which limits the number of bytes on the network. (Closed)
Patch Set: Comments addressed. Created 3 years, 4 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 /* 1 /*
2 * Copyright (c) 2012 The WebRTC project authors. All Rights Reserved. 2 * Copyright (c) 2012 The WebRTC project authors. All Rights Reserved.
3 * 3 *
4 * Use of this source code is governed by a BSD-style license 4 * Use of this source code is governed by a BSD-style license
5 * that can be found in the LICENSE file in the root of the source 5 * that can be found in the LICENSE file in the root of the source
6 * tree. An additional intellectual property rights grant can be found 6 * tree. An additional intellectual property rights grant can be found
7 * in the file PATENTS. All contributing project authors may 7 * in the file PATENTS. All contributing project authors may
8 * be found in the AUTHORS file in the root of the source tree. 8 * be found in the AUTHORS file in the root of the source tree.
9 */ 9 */
10 10
(...skipping 11 matching lines...) Expand all
22 #include "webrtc/modules/pacing/interval_budget.h" 22 #include "webrtc/modules/pacing/interval_budget.h"
23 #include "webrtc/modules/utility/include/process_thread.h" 23 #include "webrtc/modules/utility/include/process_thread.h"
24 #include "webrtc/rtc_base/checks.h" 24 #include "webrtc/rtc_base/checks.h"
25 #include "webrtc/rtc_base/logging.h" 25 #include "webrtc/rtc_base/logging.h"
26 #include "webrtc/system_wrappers/include/clock.h" 26 #include "webrtc/system_wrappers/include/clock.h"
27 #include "webrtc/system_wrappers/include/field_trial.h" 27 #include "webrtc/system_wrappers/include/field_trial.h"
28 28
29 namespace { 29 namespace {
30 // Time limit in milliseconds between packet bursts. 30 // Time limit in milliseconds between packet bursts.
31 const int64_t kMinPacketLimitMs = 5; 31 const int64_t kMinPacketLimitMs = 5;
32 const int64_t kPausedPacketIntervalMs = 500;
32 33
33 // Upper cap on process interval, in case process has not been called in a long 34 // Upper cap on process interval, in case process has not been called in a long
34 // time. 35 // time.
35 const int64_t kMaxIntervalTimeMs = 30; 36 const int64_t kMaxIntervalTimeMs = 30;
36 37
37 } // namespace 38 } // namespace
38 39
39 // TODO(sprang): Move at least PacketQueue out to separate 40 // TODO(sprang): Move at least PacketQueue out to separate
40 // files, so that we can more easily test them. 41 // files, so that we can more easily test them.
41 42
(...skipping 190 matching lines...) Expand 10 before | Expand all | Expand 10 after
232 } 233 }
233 234
234 PacedSender::~PacedSender() {} 235 PacedSender::~PacedSender() {}
235 236
236 void PacedSender::CreateProbeCluster(int bitrate_bps) { 237 void PacedSender::CreateProbeCluster(int bitrate_bps) {
237 rtc::CritScope cs(&critsect_); 238 rtc::CritScope cs(&critsect_);
238 prober_->CreateProbeCluster(bitrate_bps, clock_->TimeInMilliseconds()); 239 prober_->CreateProbeCluster(bitrate_bps, clock_->TimeInMilliseconds());
239 } 240 }
240 241
241 void PacedSender::Pause() { 242 void PacedSender::Pause() {
242 LOG(LS_INFO) << "PacedSender paused.";
243 { 243 {
244 rtc::CritScope cs(&critsect_); 244 rtc::CritScope cs(&critsect_);
245 if (!paused_)
246 LOG(LS_INFO) << "PacedSender paused.";
245 paused_ = true; 247 paused_ = true;
246 } 248 }
247 // Tell the process thread to call our TimeUntilNextProcess() method to get 249 // Tell the process thread to call our TimeUntilNextProcess() method to get
248 // a new (longer) estimate for when to call Process(). 250 // a new (longer) estimate for when to call Process().
249 if (process_thread_) 251 if (process_thread_)
250 process_thread_->WakeUp(this); 252 process_thread_->WakeUp(this);
251 } 253 }
252 254
253 void PacedSender::Resume() { 255 void PacedSender::Resume() {
254 LOG(LS_INFO) << "PacedSender resumed.";
255 { 256 {
256 rtc::CritScope cs(&critsect_); 257 rtc::CritScope cs(&critsect_);
258 if (paused_)
259 LOG(LS_INFO) << "PacedSender resumed.";
257 paused_ = false; 260 paused_ = false;
258 } 261 }
259 // Tell the process thread to call our TimeUntilNextProcess() method to 262 // Tell the process thread to call our TimeUntilNextProcess() method to
260 // refresh the estimate for when to call Process(). 263 // refresh the estimate for when to call Process().
261 if (process_thread_) 264 if (process_thread_)
262 process_thread_->WakeUp(this); 265 process_thread_->WakeUp(this);
263 } 266 }
264 267
265 void PacedSender::SetProbingEnabled(bool enabled) { 268 void PacedSender::SetProbingEnabled(bool enabled) {
266 RTC_CHECK_EQ(0, packet_counter_); 269 RTC_CHECK_EQ(0, packet_counter_);
(...skipping 81 matching lines...) Expand 10 before | Expand all | Expand 10 after
348 } 351 }
349 352
350 int64_t PacedSender::AverageQueueTimeMs() { 353 int64_t PacedSender::AverageQueueTimeMs() {
351 rtc::CritScope cs(&critsect_); 354 rtc::CritScope cs(&critsect_);
352 packets_->UpdateQueueTime(clock_->TimeInMilliseconds()); 355 packets_->UpdateQueueTime(clock_->TimeInMilliseconds());
353 return packets_->AverageQueueTimeMs(); 356 return packets_->AverageQueueTimeMs();
354 } 357 }
355 358
356 int64_t PacedSender::TimeUntilNextProcess() { 359 int64_t PacedSender::TimeUntilNextProcess() {
357 rtc::CritScope cs(&critsect_); 360 rtc::CritScope cs(&critsect_);
361 int64_t elapsed_time_us = clock_->TimeInMicroseconds() - time_last_update_us_;
362 int64_t elapsed_time_ms = (elapsed_time_us + 500) / 1000;
363 // When paused we wake up every 500 ms to send a padding packet to ensure
364 // we won't get stuck in the paused state due to no feedback being received.
358 if (paused_) 365 if (paused_)
359 return 1000 * 60 * 60; 366 return std::max<int64_t>(kPausedPacketIntervalMs - elapsed_time_ms, 0);
360 367
361 if (prober_->IsProbing()) { 368 if (prober_->IsProbing()) {
362 int64_t ret = prober_->TimeUntilNextProbe(clock_->TimeInMilliseconds()); 369 int64_t ret = prober_->TimeUntilNextProbe(clock_->TimeInMilliseconds());
363 if (ret > 0 || (ret == 0 && !probing_send_failure_)) 370 if (ret > 0 || (ret == 0 && !probing_send_failure_))
364 return ret; 371 return ret;
365 } 372 }
366 int64_t elapsed_time_us = clock_->TimeInMicroseconds() - time_last_update_us_;
367 int64_t elapsed_time_ms = (elapsed_time_us + 500) / 1000;
368 return std::max<int64_t>(kMinPacketLimitMs - elapsed_time_ms, 0); 373 return std::max<int64_t>(kMinPacketLimitMs - elapsed_time_ms, 0);
369 } 374 }
370 375
371 void PacedSender::Process() { 376 void PacedSender::Process() {
372 int64_t now_us = clock_->TimeInMicroseconds(); 377 int64_t now_us = clock_->TimeInMicroseconds();
373 rtc::CritScope cs(&critsect_); 378 rtc::CritScope cs(&critsect_);
374 int64_t elapsed_time_ms = (now_us - time_last_update_us_ + 500) / 1000; 379 int64_t elapsed_time_ms = (now_us - time_last_update_us_ + 500) / 1000;
375 time_last_update_us_ = now_us;
376 int target_bitrate_kbps = pacing_bitrate_kbps_; 380 int target_bitrate_kbps = pacing_bitrate_kbps_;
377 if (!paused_ && elapsed_time_ms > 0) { 381
382 if (paused_) {
383 PacedPacketInfo pacing_info;
384 size_t bytes_sent = SendPadding(1, pacing_info);
385 alr_detector_->OnBytesSent(bytes_sent, now_us / 1000);
386 time_last_update_us_ = now_us;
387 return;
388 }
389
390 if (elapsed_time_ms > 0) {
378 size_t queue_size_bytes = packets_->SizeInBytes(); 391 size_t queue_size_bytes = packets_->SizeInBytes();
379 if (queue_size_bytes > 0) { 392 if (queue_size_bytes > 0) {
380 // Assuming equal size packets and input/output rate, the average packet 393 // Assuming equal size packets and input/output rate, the average packet
381 // has avg_time_left_ms left to get queue_size_bytes out of the queue, if 394 // has avg_time_left_ms left to get queue_size_bytes out of the queue, if
382 // time constraint shall be met. Determine bitrate needed for that. 395 // time constraint shall be met. Determine bitrate needed for that.
383 packets_->UpdateQueueTime(clock_->TimeInMilliseconds()); 396 packets_->UpdateQueueTime(clock_->TimeInMilliseconds());
384 int64_t avg_time_left_ms = std::max<int64_t>( 397 int64_t avg_time_left_ms = std::max<int64_t>(
385 1, queue_time_limit - packets_->AverageQueueTimeMs()); 398 1, queue_time_limit - packets_->AverageQueueTimeMs());
386 int min_bitrate_needed_kbps = 399 int min_bitrate_needed_kbps =
387 static_cast<int>(queue_size_bytes * 8 / avg_time_left_ms); 400 static_cast<int>(queue_size_bytes * 8 / avg_time_left_ms);
388 if (min_bitrate_needed_kbps > target_bitrate_kbps) 401 if (min_bitrate_needed_kbps > target_bitrate_kbps)
389 target_bitrate_kbps = min_bitrate_needed_kbps; 402 target_bitrate_kbps = min_bitrate_needed_kbps;
390 } 403 }
391 404
392 media_budget_->set_target_rate_kbps(target_bitrate_kbps); 405 media_budget_->set_target_rate_kbps(target_bitrate_kbps);
393 406
394 elapsed_time_ms = std::min(kMaxIntervalTimeMs, elapsed_time_ms); 407 elapsed_time_ms = std::min(kMaxIntervalTimeMs, elapsed_time_ms);
395 UpdateBudgetWithElapsedTime(elapsed_time_ms); 408 UpdateBudgetWithElapsedTime(elapsed_time_ms);
396 } 409 }
397 410
411 time_last_update_us_ = now_us;
412
398 bool is_probing = prober_->IsProbing(); 413 bool is_probing = prober_->IsProbing();
399 PacedPacketInfo pacing_info; 414 PacedPacketInfo pacing_info;
400 size_t bytes_sent = 0; 415 size_t bytes_sent = 0;
401 size_t recommended_probe_size = 0; 416 size_t recommended_probe_size = 0;
402 if (is_probing) { 417 if (is_probing) {
403 pacing_info = prober_->CurrentCluster(); 418 pacing_info = prober_->CurrentCluster();
404 recommended_probe_size = prober_->RecommendedMinProbeSize(); 419 recommended_probe_size = prober_->RecommendedMinProbeSize();
405 } 420 }
406 while (!packets_->Empty()) { 421 while (!packets_->Empty()) {
407 // Since we need to release the lock in order to send, we first pop the 422 // Since we need to release the lock in order to send, we first pop the
408 // element from the priority queue but keep it in storage, so that we can 423 // element from the priority queue but keep it in storage, so that we can
409 // reinsert it if send fails. 424 // reinsert it if send fails.
410 const paced_sender::Packet& packet = packets_->BeginPop(); 425 const paced_sender::Packet& packet = packets_->BeginPop();
411 426
412 if (SendPacket(packet, pacing_info)) { 427 if (SendPacket(packet, pacing_info)) {
413 // Send succeeded, remove it from the queue. 428 // Send succeeded, remove it from the queue.
414 if (first_sent_packet_ms_ == -1) 429 if (first_sent_packet_ms_ == -1)
415 first_sent_packet_ms_ = clock_->TimeInMilliseconds(); 430 first_sent_packet_ms_ = clock_->TimeInMilliseconds();
416 bytes_sent += packet.bytes; 431 bytes_sent += packet.bytes;
417 packets_->FinalizePop(packet); 432 packets_->FinalizePop(packet);
418 if (is_probing && bytes_sent > recommended_probe_size) 433 if (is_probing && bytes_sent > recommended_probe_size)
419 break; 434 break;
420 } else { 435 } else {
421 // Send failed, put it back into the queue. 436 // Send failed, put it back into the queue.
422 packets_->CancelPop(packet); 437 packets_->CancelPop(packet);
423 break; 438 break;
424 } 439 }
425 } 440 }
426 441
427 if (packets_->Empty() && !paused_) { 442 if (packets_->Empty()) {
428 // We can not send padding unless a normal packet has first been sent. If we 443 int padding_needed =
429 // do, timestamps get messed up. 444 static_cast<int>(is_probing ? (recommended_probe_size - bytes_sent)
430 if (packet_counter_ > 0) { 445 : padding_budget_->bytes_remaining());
431 int padding_needed = 446 if (padding_needed > 0)
432 static_cast<int>(is_probing ? (recommended_probe_size - bytes_sent) 447 bytes_sent += SendPadding(padding_needed, pacing_info);
433 : padding_budget_->bytes_remaining());
434
435 if (padding_needed > 0)
436 bytes_sent += SendPadding(padding_needed, pacing_info);
437 }
438 } 448 }
439 if (is_probing) { 449 if (is_probing) {
440 probing_send_failure_ = bytes_sent == 0; 450 probing_send_failure_ = bytes_sent == 0;
441 if (!probing_send_failure_) 451 if (!probing_send_failure_)
442 prober_->ProbeSent(clock_->TimeInMilliseconds(), bytes_sent); 452 prober_->ProbeSent(clock_->TimeInMilliseconds(), bytes_sent);
443 } 453 }
444 alr_detector_->OnBytesSent(bytes_sent, elapsed_time_ms); 454 alr_detector_->OnBytesSent(bytes_sent, elapsed_time_ms);
445 } 455 }
446 456
447 void PacedSender::ProcessThreadAttached(ProcessThread* process_thread) { 457 void PacedSender::ProcessThreadAttached(ProcessThread* process_thread) {
448 LOG(LS_INFO) << "ProcessThreadAttached 0x" << std::hex << process_thread; 458 LOG(LS_INFO) << "ProcessThreadAttached 0x" << std::hex << process_thread;
449 process_thread_ = process_thread; 459 process_thread_ = process_thread;
450 } 460 }
451 461
452 bool PacedSender::SendPacket(const paced_sender::Packet& packet, 462 bool PacedSender::SendPacket(const paced_sender::Packet& packet,
453 const PacedPacketInfo& pacing_info) { 463 const PacedPacketInfo& pacing_info) {
454 if (paused_) 464 RTC_DCHECK(!paused_);
455 return false;
456 if (media_budget_->bytes_remaining() == 0 && 465 if (media_budget_->bytes_remaining() == 0 &&
457 pacing_info.probe_cluster_id == PacedPacketInfo::kNotAProbe) { 466 pacing_info.probe_cluster_id == PacedPacketInfo::kNotAProbe) {
458 return false; 467 return false;
459 } 468 }
460 469
461 critsect_.Leave(); 470 critsect_.Leave();
462 const bool success = packet_sender_->TimeToSendPacket( 471 const bool success = packet_sender_->TimeToSendPacket(
463 packet.ssrc, packet.sequence_number, packet.capture_time_ms, 472 packet.ssrc, packet.sequence_number, packet.capture_time_ms,
464 packet.retransmission, pacing_info); 473 packet.retransmission, pacing_info);
465 critsect_.Enter(); 474 critsect_.Enter();
466 475
467 if (success) { 476 if (success) {
468 // TODO(holmer): High priority packets should only be accounted for if we 477 // TODO(holmer): High priority packets should only be accounted for if we
469 // are allocating bandwidth for audio. 478 // are allocating bandwidth for audio.
470 if (packet.priority != kHighPriority) { 479 if (packet.priority != kHighPriority) {
471 // Update media bytes sent. 480 // Update media bytes sent.
472 // TODO(eladalon): TimeToSendPacket() can also return |true| in some 481 // TODO(eladalon): TimeToSendPacket() can also return |true| in some
473 // situations where nothing actually ended up being sent to the network, 482 // situations where nothing actually ended up being sent to the network,
474 // and we probably don't want to update the budget in such cases. 483 // and we probably don't want to update the budget in such cases.
475 // https://bugs.chromium.org/p/webrtc/issues/detail?id=8052 484 // https://bugs.chromium.org/p/webrtc/issues/detail?id=8052
476 UpdateBudgetWithBytesSent(packet.bytes); 485 UpdateBudgetWithBytesSent(packet.bytes);
477 } 486 }
478 } 487 }
479 488
480 return success; 489 return success;
481 } 490 }
482 491
483 size_t PacedSender::SendPadding(size_t padding_needed, 492 size_t PacedSender::SendPadding(size_t padding_needed,
484 const PacedPacketInfo& pacing_info) { 493 const PacedPacketInfo& pacing_info) {
494 // We can not send padding unless a normal packet has first been sent. If we
495 // do, timestamps get messed up.
496 if (packet_counter_ == 0)
497 return 0;
485 critsect_.Leave(); 498 critsect_.Leave();
486 size_t bytes_sent = 499 size_t bytes_sent =
487 packet_sender_->TimeToSendPadding(padding_needed, pacing_info); 500 packet_sender_->TimeToSendPadding(padding_needed, pacing_info);
488 critsect_.Enter(); 501 critsect_.Enter();
489 502
490 if (bytes_sent > 0) { 503 if (bytes_sent > 0) {
491 UpdateBudgetWithBytesSent(bytes_sent); 504 UpdateBudgetWithBytesSent(bytes_sent);
492 } 505 }
493 return bytes_sent; 506 return bytes_sent;
494 } 507 }
(...skipping 12 matching lines...) Expand all
507 rtc::CritScope cs(&critsect_); 520 rtc::CritScope cs(&critsect_);
508 pacing_factor_ = pacing_factor; 521 pacing_factor_ = pacing_factor;
509 } 522 }
510 523
511 void PacedSender::SetQueueTimeLimit(int limit_ms) { 524 void PacedSender::SetQueueTimeLimit(int limit_ms) {
512 rtc::CritScope cs(&critsect_); 525 rtc::CritScope cs(&critsect_);
513 queue_time_limit = limit_ms; 526 queue_time_limit = limit_ms;
514 } 527 }
515 528
516 } // namespace webrtc 529 } // namespace webrtc
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698