Index: webrtc/modules/remote_bitrate_estimator/remote_bitrate_estimator_abs_send_time.cc |
diff --git a/webrtc/modules/remote_bitrate_estimator/remote_bitrate_estimator_abs_send_time.cc b/webrtc/modules/remote_bitrate_estimator/remote_bitrate_estimator_abs_send_time.cc |
index ee749f77133baf537a125899e33318517d795ba9..db54cab43c2311c8424ff94b26899469fa5f67db 100644 |
--- a/webrtc/modules/remote_bitrate_estimator/remote_bitrate_estimator_abs_send_time.cc |
+++ b/webrtc/modules/remote_bitrate_estimator/remote_bitrate_estimator_abs_send_time.cc |
@@ -37,24 +37,9 @@ enum { |
kExpectedNumberOfProbes = 3 |
}; |
-static const size_t kPropagationDeltaQueueMaxSize = 1000; |
-static const int64_t kPropagationDeltaQueueMaxTimeMs = 1000; |
static const double kTimestampToMs = 1000.0 / |
static_cast<double>(1 << kInterArrivalShift); |
-// Removes the entries at |index| of |time| and |value|, if time[index] is |
-// smaller than or equal to |deadline|. |time| must be sorted ascendingly. |
-static void RemoveStaleEntries( |
- std::vector<int64_t>* time, std::vector<int>* value, int64_t deadline) { |
- assert(time->size() == value->size()); |
- std::vector<int64_t>::iterator end_of_removal = std::upper_bound( |
- time->begin(), time->end(), deadline); |
- size_t end_of_removal_index = end_of_removal - time->begin(); |
- |
- time->erase(time->begin(), end_of_removal); |
- value->erase(value->begin(), value->begin() + end_of_removal_index); |
-} |
- |
template<typename K, typename V> |
std::vector<K> Keys(const std::map<K, V>& map) { |
std::vector<K> keys; |
@@ -94,29 +79,24 @@ bool RemoteBitrateEstimatorAbsSendTime::IsWithinClusterBounds( |
clusters->push_back(*cluster); |
} |
- int RemoteBitrateEstimatorAbsSendTime::Id() const { |
- return static_cast<int>(reinterpret_cast<uint64_t>(this)); |
- } |
- |
RemoteBitrateEstimatorAbsSendTime::RemoteBitrateEstimatorAbsSendTime( |
RemoteBitrateObserver* observer, |
Clock* clock) |
- : crit_sect_(CriticalSectionWrapper::CreateCriticalSection()), |
- observer_(observer), |
- clock_(clock), |
- ssrcs_(), |
+ : observer_(observer), |
inter_arrival_(), |
estimator_(OverUseDetectorOptions()), |
detector_(OverUseDetectorOptions()), |
incoming_bitrate_(kBitrateWindowMs, 8000), |
- last_process_time_(-1), |
- process_interval_ms_(kProcessIntervalMs), |
- total_propagation_delta_ms_(0), |
total_probes_received_(0), |
- first_packet_time_ms_(-1) { |
- assert(observer_); |
- assert(clock_); |
- LOG(LS_INFO) << "RemoteBitrateEstimatorAbsSendTime: Instantiating."; |
+ first_packet_time_ms_(-1), |
+ last_update_ms_(-1), |
+ ssrcs_(), |
+ clock_(clock) { |
+ RTC_DCHECK(observer_); |
+ RTC_DCHECK(clock_); |
+ LOG(LS_INFO) << "RemoteBitrateEstimatorAbsSendTime: Instantiating."; |
+ network_thread_.DetachFromThread(); |
+ process_thread_.DetachFromThread(); |
} |
void RemoteBitrateEstimatorAbsSendTime::ComputeClusters( |
@@ -183,7 +163,8 @@ RemoteBitrateEstimatorAbsSendTime::FindBestProbe( |
return best_it; |
} |
-void RemoteBitrateEstimatorAbsSendTime::ProcessClusters(int64_t now_ms) { |
+RemoteBitrateEstimatorAbsSendTime::ProbeResult |
+RemoteBitrateEstimatorAbsSendTime::ProcessClusters(int64_t now_ms) { |
std::list<Cluster> clusters; |
ComputeClusters(&clusters); |
if (clusters.empty()) { |
@@ -191,7 +172,7 @@ void RemoteBitrateEstimatorAbsSendTime::ProcessClusters(int64_t now_ms) { |
// we will remove the oldest one. |
if (probes_.size() >= kMaxProbePackets) |
probes_.pop_front(); |
- return; |
+ return ProbeResult::kNoUpdate; |
} |
std::list<Cluster>::const_iterator best_it = FindBestProbe(clusters); |
@@ -209,6 +190,7 @@ void RemoteBitrateEstimatorAbsSendTime::ProcessClusters(int64_t now_ms) { |
<< " ms, mean recv delta: " << best_it->recv_mean_ms |
<< " ms, num probes: " << best_it->count; |
remote_rate_.SetEstimate(probe_bitrate_bps, now_ms); |
+ return ProbeResult::kBitrateUpdated; |
} |
} |
@@ -216,6 +198,7 @@ void RemoteBitrateEstimatorAbsSendTime::ProcessClusters(int64_t now_ms) { |
// of probes. |
if (clusters.size() >= kExpectedNumberOfProbes) |
probes_.clear(); |
+ return ProbeResult::kNoUpdate; |
} |
bool RemoteBitrateEstimatorAbsSendTime::IsBitrateImproving( |
@@ -229,6 +212,7 @@ bool RemoteBitrateEstimatorAbsSendTime::IsBitrateImproving( |
void RemoteBitrateEstimatorAbsSendTime::IncomingPacketFeedbackVector( |
const std::vector<PacketInfo>& packet_feedback_vector) { |
+ RTC_DCHECK(network_thread_.CalledOnValidThread()); |
for (const auto& packet_info : packet_feedback_vector) { |
IncomingPacketInfo(packet_info.arrival_time_ms, |
ConvertMsTo24Bits(packet_info.send_time_ms), |
@@ -240,6 +224,7 @@ void RemoteBitrateEstimatorAbsSendTime::IncomingPacket(int64_t arrival_time_ms, |
size_t payload_size, |
const RTPHeader& header, |
bool was_paced) { |
+ RTC_DCHECK(network_thread_.CalledOnValidThread()); |
if (!header.extension.hasAbsoluteSendTime) { |
LOG(LS_WARNING) << "RemoteBitrateEstimatorAbsSendTimeImpl: Incoming packet " |
"is missing absolute send time extension!"; |
@@ -261,13 +246,10 @@ void RemoteBitrateEstimatorAbsSendTime::IncomingPacketInfo( |
uint32_t timestamp = send_time_24bits << kAbsSendTimeInterArrivalUpshift; |
int64_t send_time_ms = static_cast<int64_t>(timestamp) * kTimestampToMs; |
- CriticalSectionScoped cs(crit_sect_.get()); |
int64_t now_ms = clock_->TimeInMilliseconds(); |
// TODO(holmer): SSRCs are only needed for REMB, should be broken out from |
// here. |
- ssrcs_[ssrc] = now_ms; |
incoming_bitrate_.Update(payload_size, now_ms); |
- const BandwidthUsage prior_state = detector_.State(); |
if (first_packet_time_ms_ == -1) |
first_packet_time_ms_ = clock_->TimeInMilliseconds(); |
@@ -279,79 +261,88 @@ void RemoteBitrateEstimatorAbsSendTime::IncomingPacketInfo( |
// make sure the packet was paced. We currently assume that only packets |
// larger than 200 bytes are paced by the sender. |
was_paced = was_paced && payload_size > PacedSender::kMinProbePacketSize; |
- if (was_paced && |
- (!remote_rate_.ValidEstimate() || |
- now_ms - first_packet_time_ms_ < kInitialProbingIntervalMs)) { |
- // TODO(holmer): Use a map instead to get correct order? |
- if (total_probes_received_ < kMaxProbePackets) { |
- int send_delta_ms = -1; |
- int recv_delta_ms = -1; |
- if (!probes_.empty()) { |
- send_delta_ms = send_time_ms - probes_.back().send_time_ms; |
- recv_delta_ms = arrival_time_ms - probes_.back().recv_time_ms; |
+ bool update_estimate = false; |
+ uint32_t target_bitrate_bps = 0; |
+ std::vector<uint32_t> ssrcs; |
+ { |
+ rtc::CritScope lock(&crit_); |
+ |
+ TimeoutStreams(now_ms); |
+ ssrcs_[ssrc] = now_ms; |
+ |
+ if (was_paced && |
+ (!remote_rate_.ValidEstimate() || |
+ now_ms - first_packet_time_ms_ < kInitialProbingIntervalMs)) { |
+ // TODO(holmer): Use a map instead to get correct order? |
+ if (total_probes_received_ < kMaxProbePackets) { |
+ int send_delta_ms = -1; |
+ int recv_delta_ms = -1; |
+ if (!probes_.empty()) { |
+ send_delta_ms = send_time_ms - probes_.back().send_time_ms; |
+ recv_delta_ms = arrival_time_ms - probes_.back().recv_time_ms; |
+ } |
+ LOG(LS_INFO) << "Probe packet received: send time=" << send_time_ms |
+ << " ms, recv time=" << arrival_time_ms |
+ << " ms, send delta=" << send_delta_ms |
+ << " ms, recv delta=" << recv_delta_ms << " ms."; |
} |
- LOG(LS_INFO) << "Probe packet received: send time=" << send_time_ms |
- << " ms, recv time=" << arrival_time_ms |
- << " ms, send delta=" << send_delta_ms |
- << " ms, recv delta=" << recv_delta_ms << " ms."; |
+ probes_.push_back(Probe(send_time_ms, arrival_time_ms, payload_size)); |
+ ++total_probes_received_; |
+ // Make sure that a probe which updated the bitrate immediately has an |
+ // effect by calling the OnReceiveBitrateChanged callback. |
+ if (ProcessClusters(now_ms) == ProbeResult::kBitrateUpdated) |
+ update_estimate = true; |
} |
- probes_.push_back(Probe(send_time_ms, arrival_time_ms, payload_size)); |
- ++total_probes_received_; |
- ProcessClusters(now_ms); |
- } |
- if (!inter_arrival_.get()) { |
- inter_arrival_.reset( |
- new InterArrival((kTimestampGroupLengthMs << kInterArrivalShift) / 1000, |
- kTimestampToMs, true)); |
- } |
- if (inter_arrival_->ComputeDeltas(timestamp, arrival_time_ms, payload_size, |
- &ts_delta, &t_delta, &size_delta)) { |
- double ts_delta_ms = (1000.0 * ts_delta) / (1 << kInterArrivalShift); |
- estimator_.Update(t_delta, ts_delta_ms, size_delta, detector_.State()); |
- detector_.Detect(estimator_.offset(), ts_delta_ms, |
- estimator_.num_of_deltas(), arrival_time_ms); |
- UpdateStats(static_cast<int>(t_delta - ts_delta_ms), now_ms); |
- } |
- if (detector_.State() == kBwOverusing) { |
- uint32_t incoming_bitrate_bps = incoming_bitrate_.Rate(now_ms); |
- if (prior_state != kBwOverusing || |
- remote_rate_.TimeToReduceFurther(now_ms, incoming_bitrate_bps)) { |
+ if (inter_arrival_->ComputeDeltas(timestamp, arrival_time_ms, payload_size, |
+ &ts_delta, &t_delta, &size_delta)) { |
+ double ts_delta_ms = (1000.0 * ts_delta) / (1 << kInterArrivalShift); |
+ estimator_.Update(t_delta, ts_delta_ms, size_delta, detector_.State()); |
+ detector_.Detect(estimator_.offset(), ts_delta_ms, |
+ estimator_.num_of_deltas(), arrival_time_ms); |
+ } |
+ |
+ if (!update_estimate) { |
+ // Check if it's time for a periodic update or if we should update because |
+ // of an over-use. |
+ if (last_update_ms_ == -1 || |
+ now_ms - last_update_ms_ > remote_rate_.GetFeedbackInterval()) { |
+ update_estimate = true; |
+ } else if (detector_.State() == kBwOverusing && |
+ remote_rate_.TimeToReduceFurther( |
+ now_ms, incoming_bitrate_.Rate(now_ms))) { |
+ update_estimate = true; |
+ } |
+ } |
+ |
+ if (update_estimate) { |
// The first overuse should immediately trigger a new estimate. |
// We also have to update the estimate immediately if we are overusing |
// and the target bitrate is too high compared to what we are receiving. |
- UpdateEstimate(now_ms); |
+ const RateControlInput input(detector_.State(), |
+ incoming_bitrate_.Rate(now_ms), |
+ estimator_.var_noise()); |
+ remote_rate_.Update(&input, now_ms); |
+ target_bitrate_bps = remote_rate_.UpdateBandwidthEstimate(now_ms); |
+ update_estimate = remote_rate_.ValidEstimate(); |
+ ssrcs = Keys(ssrcs_); |
} |
} |
+ if (update_estimate) { |
+ last_update_ms_ = now_ms; |
+ observer_->OnReceiveBitrateChanged(ssrcs, target_bitrate_bps); |
+ } |
} |
int32_t RemoteBitrateEstimatorAbsSendTime::Process() { |
- if (TimeUntilNextProcess() > 0) { |
- return 0; |
- } |
- { |
- CriticalSectionScoped cs(crit_sect_.get()); |
- UpdateEstimate(clock_->TimeInMilliseconds()); |
- } |
- last_process_time_ = clock_->TimeInMilliseconds(); |
return 0; |
} |
int64_t RemoteBitrateEstimatorAbsSendTime::TimeUntilNextProcess() { |
- if (last_process_time_ < 0) { |
- return 0; |
- } |
- { |
- CriticalSectionScoped cs(crit_sect_.get()); |
- return last_process_time_ + process_interval_ms_ - |
- clock_->TimeInMilliseconds(); |
- } |
+ const int64_t kDisabledModuleTime = 1000; |
+ return kDisabledModuleTime; |
} |
-void RemoteBitrateEstimatorAbsSendTime::UpdateEstimate(int64_t now_ms) { |
- if (!inter_arrival_.get()) { |
- // No packets have been received on the active streams. |
- return; |
- } |
+void RemoteBitrateEstimatorAbsSendTime::TimeoutStreams(int64_t now_ms) { |
for (Ssrcs::iterator it = ssrcs_.begin(); it != ssrcs_.end();) { |
if ((now_ms - it->second) > kStreamTimeOutMs) { |
ssrcs_.erase(it++); |
@@ -361,40 +352,36 @@ void RemoteBitrateEstimatorAbsSendTime::UpdateEstimate(int64_t now_ms) { |
} |
if (ssrcs_.empty()) { |
// We can't update the estimate if we don't have any active streams. |
- inter_arrival_.reset(); |
+ inter_arrival_.reset( |
+ new InterArrival((kTimestampGroupLengthMs << kInterArrivalShift) / 1000, |
+ kTimestampToMs, true)); |
// We deliberately don't reset the first_packet_time_ms_ here for now since |
// we only probe for bandwidth in the beginning of a call right now. |
- return; |
- } |
- |
- const RateControlInput input(detector_.State(), |
- incoming_bitrate_.Rate(now_ms), |
- estimator_.var_noise()); |
- remote_rate_.Update(&input, now_ms); |
- uint32_t target_bitrate = remote_rate_.UpdateBandwidthEstimate(now_ms); |
- if (remote_rate_.ValidEstimate()) { |
- process_interval_ms_ = remote_rate_.GetFeedbackInterval(); |
- observer_->OnReceiveBitrateChanged(Keys(ssrcs_), target_bitrate); |
} |
} |
void RemoteBitrateEstimatorAbsSendTime::OnRttUpdate(int64_t avg_rtt_ms, |
int64_t max_rtt_ms) { |
- CriticalSectionScoped cs(crit_sect_.get()); |
+ RTC_DCHECK(process_thread_.CalledOnValidThread()); |
+ rtc::CritScope lock(&crit_); |
remote_rate_.SetRtt(avg_rtt_ms); |
} |
void RemoteBitrateEstimatorAbsSendTime::RemoveStream(uint32_t ssrc) { |
- CriticalSectionScoped cs(crit_sect_.get()); |
+ rtc::CritScope lock(&crit_); |
ssrcs_.erase(ssrc); |
} |
bool RemoteBitrateEstimatorAbsSendTime::LatestEstimate( |
std::vector<uint32_t>* ssrcs, |
uint32_t* bitrate_bps) const { |
- CriticalSectionScoped cs(crit_sect_.get()); |
- assert(ssrcs); |
- assert(bitrate_bps); |
+ // Currently accessed from both the process thread (see |
+ // ModuleRtpRtcpImpl::Process()) and the configuration thread (see |
+ // Call::GetStats()). Should in the future only be accessed from a single |
+ // thread. |
+ RTC_DCHECK(ssrcs); |
+ RTC_DCHECK(bitrate_bps); |
+ rtc::CritScope lock(&crit_); |
if (!remote_rate_.ValidEstimate()) { |
return false; |
} |
@@ -407,45 +394,10 @@ bool RemoteBitrateEstimatorAbsSendTime::LatestEstimate( |
return true; |
} |
-bool RemoteBitrateEstimatorAbsSendTime::GetStats( |
- ReceiveBandwidthEstimatorStats* output) const { |
- { |
- CriticalSectionScoped cs(crit_sect_.get()); |
- output->recent_propagation_time_delta_ms = recent_propagation_delta_ms_; |
- output->recent_arrival_time_ms = recent_update_time_ms_; |
- output->total_propagation_time_delta_ms = total_propagation_delta_ms_; |
- } |
- RemoveStaleEntries( |
- &output->recent_arrival_time_ms, |
- &output->recent_propagation_time_delta_ms, |
- clock_->TimeInMilliseconds() - kPropagationDeltaQueueMaxTimeMs); |
- return true; |
-} |
- |
-void RemoteBitrateEstimatorAbsSendTime::UpdateStats(int propagation_delta_ms, |
- int64_t now_ms) { |
- // The caller must enter crit_sect_ before the call. |
- |
- // Remove the oldest entry if the size limit is reached. |
- if (recent_update_time_ms_.size() == kPropagationDeltaQueueMaxSize) { |
- recent_update_time_ms_.erase(recent_update_time_ms_.begin()); |
- recent_propagation_delta_ms_.erase(recent_propagation_delta_ms_.begin()); |
- } |
- |
- recent_propagation_delta_ms_.push_back(propagation_delta_ms); |
- recent_update_time_ms_.push_back(now_ms); |
- |
- RemoveStaleEntries( |
- &recent_update_time_ms_, |
- &recent_propagation_delta_ms_, |
- now_ms - kPropagationDeltaQueueMaxTimeMs); |
- |
- total_propagation_delta_ms_ = |
- std::max(total_propagation_delta_ms_ + propagation_delta_ms, 0); |
-} |
- |
void RemoteBitrateEstimatorAbsSendTime::SetMinBitrate(int min_bitrate_bps) { |
- CriticalSectionScoped cs(crit_sect_.get()); |
+ // Called from both the configuration thread and the network thread. Shouldn't |
+ // be called from the network thread in the future. |
+ rtc::CritScope lock(&crit_); |
remote_rate_.SetMinBitrate(min_bitrate_bps); |
} |
} // namespace webrtc |