OLD | NEW |
1 /* | 1 /* |
2 * Copyright (c) 2012 The WebRTC project authors. All Rights Reserved. | 2 * Copyright (c) 2012 The WebRTC project authors. All Rights Reserved. |
3 * | 3 * |
4 * Use of this source code is governed by a BSD-style license | 4 * Use of this source code is governed by a BSD-style license |
5 * that can be found in the LICENSE file in the root of the source | 5 * that can be found in the LICENSE file in the root of the source |
6 * tree. An additional intellectual property rights grant can be found | 6 * tree. An additional intellectual property rights grant can be found |
7 * in the file PATENTS. All contributing project authors may | 7 * in the file PATENTS. All contributing project authors may |
8 * be found in the AUTHORS file in the root of the source tree. | 8 * be found in the AUTHORS file in the root of the source tree. |
9 */ | 9 */ |
10 | 10 |
(...skipping 37 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
48 int64_t capture_time_ms, | 48 int64_t capture_time_ms, |
49 int64_t enqueue_time_ms, | 49 int64_t enqueue_time_ms, |
50 size_t length_in_bytes, | 50 size_t length_in_bytes, |
51 bool retransmission, | 51 bool retransmission, |
52 uint64_t enqueue_order) | 52 uint64_t enqueue_order) |
53 : priority(priority), | 53 : priority(priority), |
54 ssrc(ssrc), | 54 ssrc(ssrc), |
55 sequence_number(seq_number), | 55 sequence_number(seq_number), |
56 capture_time_ms(capture_time_ms), | 56 capture_time_ms(capture_time_ms), |
57 enqueue_time_ms(enqueue_time_ms), | 57 enqueue_time_ms(enqueue_time_ms), |
| 58 sum_paused_ms(0), |
58 bytes(length_in_bytes), | 59 bytes(length_in_bytes), |
59 retransmission(retransmission), | 60 retransmission(retransmission), |
60 enqueue_order(enqueue_order) {} | 61 enqueue_order(enqueue_order) {} |
61 | 62 |
62 RtpPacketSender::Priority priority; | 63 RtpPacketSender::Priority priority; |
63 uint32_t ssrc; | 64 uint32_t ssrc; |
64 uint16_t sequence_number; | 65 uint16_t sequence_number; |
65 int64_t capture_time_ms; | 66 int64_t capture_time_ms; // Absolute time of frame capture. |
66 int64_t enqueue_time_ms; | 67 int64_t enqueue_time_ms; // Absolute time of pacer queue entry. |
| 68 int64_t sum_paused_ms; // Sum of time spent in queue while pacer is paused. |
67 size_t bytes; | 69 size_t bytes; |
68 bool retransmission; | 70 bool retransmission; |
69 uint64_t enqueue_order; | 71 uint64_t enqueue_order; |
70 std::list<Packet>::iterator this_it; | 72 std::list<Packet>::iterator this_it; |
71 }; | 73 }; |
72 | 74 |
73 // Used by priority queue to sort packets. | 75 // Used by priority queue to sort packets. |
74 struct Comparator { | 76 struct Comparator { |
75 bool operator()(const Packet* first, const Packet* second) { | 77 bool operator()(const Packet* first, const Packet* second) { |
76 // Highest prio = 0. | 78 // Highest prio = 0. |
(...skipping 12 matching lines...) Expand all Loading... |
89 } | 91 } |
90 }; | 92 }; |
91 | 93 |
92 // Class encapsulating a priority queue with some extensions. | 94 // Class encapsulating a priority queue with some extensions. |
93 class PacketQueue { | 95 class PacketQueue { |
94 public: | 96 public: |
95 explicit PacketQueue(const Clock* clock) | 97 explicit PacketQueue(const Clock* clock) |
96 : bytes_(0), | 98 : bytes_(0), |
97 clock_(clock), | 99 clock_(clock), |
98 queue_time_sum_(0), | 100 queue_time_sum_(0), |
99 time_last_updated_(clock_->TimeInMilliseconds()) {} | 101 time_last_updated_(clock_->TimeInMilliseconds()), |
| 102 paused_(false) {} |
100 virtual ~PacketQueue() {} | 103 virtual ~PacketQueue() {} |
101 | 104 |
102 void Push(const Packet& packet) { | 105 void Push(const Packet& packet) { |
103 if (!AddToDupeSet(packet)) | 106 if (!AddToDupeSet(packet)) |
104 return; | 107 return; |
105 | 108 |
106 UpdateQueueTime(packet.enqueue_time_ms); | 109 UpdateQueueTime(packet.enqueue_time_ms); |
107 | 110 |
108 // Store packet in list, use pointers in priority queue for cheaper moves. | 111 // Store packet in list, use pointers in priority queue for cheaper moves. |
109 // Packets have a handle to its own iterator in the list, for easy removal | 112 // Packets have a handle to its own iterator in the list, for easy removal |
110 // when popping from queue. | 113 // when popping from queue. |
111 packet_list_.push_front(packet); | 114 packet_list_.push_front(packet); |
112 std::list<Packet>::iterator it = packet_list_.begin(); | 115 std::list<Packet>::iterator it = packet_list_.begin(); |
113 it->this_it = it; // Handle for direct removal from list. | 116 it->this_it = it; // Handle for direct removal from list. |
114 prio_queue_.push(&(*it)); // Pointer into list. | 117 prio_queue_.push(&(*it)); // Pointer into list. |
115 bytes_ += packet.bytes; | 118 bytes_ += packet.bytes; |
116 } | 119 } |
117 | 120 |
118 const Packet& BeginPop() { | 121 const Packet& BeginPop() { |
119 const Packet& packet = *prio_queue_.top(); | 122 const Packet& packet = *prio_queue_.top(); |
120 prio_queue_.pop(); | 123 prio_queue_.pop(); |
121 return packet; | 124 return packet; |
122 } | 125 } |
123 | 126 |
124 void CancelPop(const Packet& packet) { prio_queue_.push(&(*packet.this_it)); } | 127 void CancelPop(const Packet& packet) { prio_queue_.push(&(*packet.this_it)); } |
125 | 128 |
126 void FinalizePop(const Packet& packet) { | 129 void FinalizePop(const Packet& packet) { |
127 RemoveFromDupeSet(packet); | 130 RemoveFromDupeSet(packet); |
128 bytes_ -= packet.bytes; | 131 bytes_ -= packet.bytes; |
129 queue_time_sum_ -= (time_last_updated_ - packet.enqueue_time_ms); | 132 int64_t packet_queue_time_ms = time_last_updated_ - packet.enqueue_time_ms; |
| 133 RTC_DCHECK_LE(packet.sum_paused_ms, packet_queue_time_ms); |
| 134 packet_queue_time_ms -= packet.sum_paused_ms; |
| 135 RTC_DCHECK_LE(packet_queue_time_ms, queue_time_sum_); |
| 136 queue_time_sum_ -= packet_queue_time_ms; |
130 packet_list_.erase(packet.this_it); | 137 packet_list_.erase(packet.this_it); |
131 RTC_DCHECK_EQ(packet_list_.size(), prio_queue_.size()); | 138 RTC_DCHECK_EQ(packet_list_.size(), prio_queue_.size()); |
132 if (packet_list_.empty()) | 139 if (packet_list_.empty()) |
133 RTC_DCHECK_EQ(0, queue_time_sum_); | 140 RTC_DCHECK_EQ(0, queue_time_sum_); |
134 } | 141 } |
135 | 142 |
136 bool Empty() const { return prio_queue_.empty(); } | 143 bool Empty() const { return prio_queue_.empty(); } |
137 | 144 |
138 size_t SizeInPackets() const { return prio_queue_.size(); } | 145 size_t SizeInPackets() const { return prio_queue_.size(); } |
139 | 146 |
140 uint64_t SizeInBytes() const { return bytes_; } | 147 uint64_t SizeInBytes() const { return bytes_; } |
141 | 148 |
142 int64_t OldestEnqueueTimeMs() const { | 149 int64_t OldestEnqueueTimeMs() const { |
143 auto it = packet_list_.rbegin(); | 150 auto it = packet_list_.rbegin(); |
144 if (it == packet_list_.rend()) | 151 if (it == packet_list_.rend()) |
145 return 0; | 152 return 0; |
146 return it->enqueue_time_ms; | 153 return it->enqueue_time_ms; |
147 } | 154 } |
148 | 155 |
149 void UpdateQueueTime(int64_t timestamp_ms) { | 156 void UpdateQueueTime(int64_t timestamp_ms) { |
150 RTC_DCHECK_GE(timestamp_ms, time_last_updated_); | 157 RTC_DCHECK_GE(timestamp_ms, time_last_updated_); |
151 int64_t delta = timestamp_ms - time_last_updated_; | 158 if (timestamp_ms == time_last_updated_) |
152 // Use packet packet_list_.size() not prio_queue_.size() here, as there | 159 return; |
153 // might be an outstanding element popped from prio_queue_ currently in the | 160 |
154 // SendPacket() call, while packet_list_ will always be correct. | 161 int64_t delta_ms = timestamp_ms - time_last_updated_; |
155 queue_time_sum_ += delta * packet_list_.size(); | 162 |
| 163 if (paused_) { |
| 164 // Increase per-packet accumulators of time spent in queue while paused, |
| 165 // so that we can disregard that when subtracting main accumulator when |
| 166 // popping packet from the queue. |
| 167 for (auto& it : packet_list_) { |
| 168 it.sum_paused_ms += delta_ms; |
| 169 } |
| 170 } else { |
| 171 // Use packet packet_list_.size() not prio_queue_.size() here, as there |
| 172 // might be an outstanding element popped from prio_queue_ currently in |
| 173 // the SendPacket() call, while packet_list_ will always be correct. |
| 174 queue_time_sum_ += delta_ms * packet_list_.size(); |
| 175 } |
156 time_last_updated_ = timestamp_ms; | 176 time_last_updated_ = timestamp_ms; |
157 } | 177 } |
158 | 178 |
| 179 void SetPauseState(bool paused, int64_t timestamp_ms) { |
| 180 if (paused_ == paused) |
| 181 return; |
| 182 UpdateQueueTime(timestamp_ms); |
| 183 paused_ = paused; |
| 184 } |
| 185 |
159 int64_t AverageQueueTimeMs() const { | 186 int64_t AverageQueueTimeMs() const { |
160 if (prio_queue_.empty()) | 187 if (prio_queue_.empty()) |
161 return 0; | 188 return 0; |
162 return queue_time_sum_ / packet_list_.size(); | 189 return queue_time_sum_ / packet_list_.size(); |
163 } | 190 } |
164 | 191 |
165 private: | 192 private: |
166 // Try to add a packet to the set of ssrc/seqno identifiers currently in the | 193 // Try to add a packet to the set of ssrc/seqno identifiers currently in the |
167 // queue. Return true if inserted, false if this is a duplicate. | 194 // queue. Return true if inserted, false if this is a duplicate. |
168 bool AddToDupeSet(const Packet& packet) { | 195 bool AddToDupeSet(const Packet& packet) { |
(...skipping 24 matching lines...) Expand all Loading... |
193 // Use pointers into list, to avoid moving whole struct within heap. | 220 // Use pointers into list, to avoid moving whole struct within heap. |
194 std::priority_queue<Packet*, std::vector<Packet*>, Comparator> prio_queue_; | 221 std::priority_queue<Packet*, std::vector<Packet*>, Comparator> prio_queue_; |
195 // Total number of bytes in the queue. | 222 // Total number of bytes in the queue. |
196 uint64_t bytes_; | 223 uint64_t bytes_; |
197 // Map<ssrc, std::set<seq_no> >, for checking duplicates. | 224 // Map<ssrc, std::set<seq_no> >, for checking duplicates. |
198 typedef std::map<uint32_t, std::set<uint16_t> > SsrcSeqNoMap; | 225 typedef std::map<uint32_t, std::set<uint16_t> > SsrcSeqNoMap; |
199 SsrcSeqNoMap dupe_map_; | 226 SsrcSeqNoMap dupe_map_; |
200 const Clock* const clock_; | 227 const Clock* const clock_; |
201 int64_t queue_time_sum_; | 228 int64_t queue_time_sum_; |
202 int64_t time_last_updated_; | 229 int64_t time_last_updated_; |
| 230 bool paused_; |
203 }; | 231 }; |
204 | 232 |
205 } // namespace paced_sender | 233 } // namespace paced_sender |
206 | 234 |
207 const int64_t PacedSender::kMaxQueueLengthMs = 2000; | 235 const int64_t PacedSender::kMaxQueueLengthMs = 2000; |
208 const float PacedSender::kDefaultPaceMultiplier = 2.5f; | 236 const float PacedSender::kDefaultPaceMultiplier = 2.5f; |
209 | 237 |
210 PacedSender::PacedSender(const Clock* clock, | 238 PacedSender::PacedSender(const Clock* clock, |
211 PacketSender* packet_sender, | 239 PacketSender* packet_sender, |
212 RtcEventLog* event_log) | 240 RtcEventLog* event_log) |
(...skipping 23 matching lines...) Expand all Loading... |
236 void PacedSender::CreateProbeCluster(int bitrate_bps) { | 264 void PacedSender::CreateProbeCluster(int bitrate_bps) { |
237 rtc::CritScope cs(&critsect_); | 265 rtc::CritScope cs(&critsect_); |
238 prober_->CreateProbeCluster(bitrate_bps, clock_->TimeInMilliseconds()); | 266 prober_->CreateProbeCluster(bitrate_bps, clock_->TimeInMilliseconds()); |
239 } | 267 } |
240 | 268 |
241 void PacedSender::Pause() { | 269 void PacedSender::Pause() { |
242 LOG(LS_INFO) << "PacedSender paused."; | 270 LOG(LS_INFO) << "PacedSender paused."; |
243 { | 271 { |
244 rtc::CritScope cs(&critsect_); | 272 rtc::CritScope cs(&critsect_); |
245 paused_ = true; | 273 paused_ = true; |
| 274 packets_->SetPauseState(true, clock_->TimeInMilliseconds()); |
246 } | 275 } |
247 // Tell the process thread to call our TimeUntilNextProcess() method to get | 276 // Tell the process thread to call our TimeUntilNextProcess() method to get |
248 // a new (longer) estimate for when to call Process(). | 277 // a new (longer) estimate for when to call Process(). |
249 if (process_thread_) | 278 if (process_thread_) |
250 process_thread_->WakeUp(this); | 279 process_thread_->WakeUp(this); |
251 } | 280 } |
252 | 281 |
253 void PacedSender::Resume() { | 282 void PacedSender::Resume() { |
254 LOG(LS_INFO) << "PacedSender resumed."; | 283 LOG(LS_INFO) << "PacedSender resumed."; |
255 { | 284 { |
256 rtc::CritScope cs(&critsect_); | 285 rtc::CritScope cs(&critsect_); |
257 paused_ = false; | 286 paused_ = false; |
| 287 packets_->SetPauseState(false, clock_->TimeInMilliseconds()); |
258 } | 288 } |
259 // Tell the process thread to call our TimeUntilNextProcess() method to | 289 // Tell the process thread to call our TimeUntilNextProcess() method to |
260 // refresh the estimate for when to call Process(). | 290 // refresh the estimate for when to call Process(). |
261 if (process_thread_) | 291 if (process_thread_) |
262 process_thread_->WakeUp(this); | 292 process_thread_->WakeUp(this); |
263 } | 293 } |
264 | 294 |
265 void PacedSender::SetProbingEnabled(bool enabled) { | 295 void PacedSender::SetProbingEnabled(bool enabled) { |
266 RTC_CHECK_EQ(0, packet_counter_); | 296 RTC_CHECK_EQ(0, packet_counter_); |
267 rtc::CritScope cs(&critsect_); | 297 rtc::CritScope cs(&critsect_); |
(...skipping 239 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
507 rtc::CritScope cs(&critsect_); | 537 rtc::CritScope cs(&critsect_); |
508 pacing_factor_ = pacing_factor; | 538 pacing_factor_ = pacing_factor; |
509 } | 539 } |
510 | 540 |
511 void PacedSender::SetQueueTimeLimit(int limit_ms) { | 541 void PacedSender::SetQueueTimeLimit(int limit_ms) { |
512 rtc::CritScope cs(&critsect_); | 542 rtc::CritScope cs(&critsect_); |
513 queue_time_limit = limit_ms; | 543 queue_time_limit = limit_ms; |
514 } | 544 } |
515 | 545 |
516 } // namespace webrtc | 546 } // namespace webrtc |
OLD | NEW |