OLD | NEW |
1 /* | 1 /* |
2 * Copyright 2004 The WebRTC Project Authors. All rights reserved. | 2 * Copyright 2004 The WebRTC Project Authors. All rights reserved. |
3 * | 3 * |
4 * Use of this source code is governed by a BSD-style license | 4 * Use of this source code is governed by a BSD-style license |
5 * that can be found in the LICENSE file in the root of the source | 5 * that can be found in the LICENSE file in the root of the source |
6 * tree. An additional intellectual property rights grant can be found | 6 * tree. An additional intellectual property rights grant can be found |
7 * in the file PATENTS. All contributing project authors may | 7 * in the file PATENTS. All contributing project authors may |
8 * be found in the AUTHORS file in the root of the source tree. | 8 * be found in the AUTHORS file in the root of the source tree. |
9 */ | 9 */ |
10 #include <algorithm> | 10 #include <algorithm> |
11 | 11 |
12 #include "webrtc/base/checks.h" | 12 #include "webrtc/base/checks.h" |
13 #include "webrtc/base/common.h" | 13 #include "webrtc/base/common.h" |
14 #include "webrtc/base/logging.h" | 14 #include "webrtc/base/logging.h" |
15 #include "webrtc/base/messagequeue.h" | 15 #include "webrtc/base/messagequeue.h" |
16 #include "webrtc/base/trace_event.h" | 16 #include "webrtc/base/trace_event.h" |
17 | 17 |
18 namespace rtc { | 18 namespace rtc { |
19 | 19 |
20 const uint32_t kMaxMsgLatency = 150; // 150 ms | 20 const int kMaxMsgLatency = 150; // 150 ms |
21 | 21 |
22 //------------------------------------------------------------------ | 22 //------------------------------------------------------------------ |
23 // MessageQueueManager | 23 // MessageQueueManager |
24 | 24 |
25 MessageQueueManager* MessageQueueManager::instance_ = NULL; | 25 MessageQueueManager* MessageQueueManager::instance_ = NULL; |
26 | 26 |
27 MessageQueueManager* MessageQueueManager::Instance() { | 27 MessageQueueManager* MessageQueueManager::Instance() { |
28 // Note: This is not thread safe, but it is first called before threads are | 28 // Note: This is not thread safe, but it is first called before threads are |
29 // spawned. | 29 // spawned. |
30 if (!instance_) | 30 if (!instance_) |
(...skipping 177 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
208 // Always return the peek if it exists so there is Peek/Get symmetry | 208 // Always return the peek if it exists so there is Peek/Get symmetry |
209 | 209 |
210 if (fPeekKeep_) { | 210 if (fPeekKeep_) { |
211 *pmsg = msgPeek_; | 211 *pmsg = msgPeek_; |
212 fPeekKeep_ = false; | 212 fPeekKeep_ = false; |
213 return true; | 213 return true; |
214 } | 214 } |
215 | 215 |
216 // Get w/wait + timer scan / dispatch + socket / event multiplexer dispatch | 216 // Get w/wait + timer scan / dispatch + socket / event multiplexer dispatch |
217 | 217 |
218 int cmsTotal = cmsWait; | 218 int64_t cmsTotal = cmsWait; |
219 int cmsElapsed = 0; | 219 int64_t cmsElapsed = 0; |
220 uint32_t msStart = Time(); | 220 int64_t msStart = TimeMillis(); |
221 uint32_t msCurrent = msStart; | 221 int64_t msCurrent = msStart; |
222 while (true) { | 222 while (true) { |
223 // Check for sent messages | 223 // Check for sent messages |
224 ReceiveSends(); | 224 ReceiveSends(); |
225 | 225 |
226 // Check for posted events | 226 // Check for posted events |
227 int cmsDelayNext = kForever; | 227 int64_t cmsDelayNext = kForever; |
228 bool first_pass = true; | 228 bool first_pass = true; |
229 while (true) { | 229 while (true) { |
230 // All queue operations need to be locked, but nothing else in this loop | 230 // All queue operations need to be locked, but nothing else in this loop |
231 // (specifically handling disposed message) can happen inside the crit. | 231 // (specifically handling disposed message) can happen inside the crit. |
232 // Otherwise, disposed MessageHandlers will cause deadlocks. | 232 // Otherwise, disposed MessageHandlers will cause deadlocks. |
233 { | 233 { |
234 CritScope cs(&crit_); | 234 CritScope cs(&crit_); |
235 // On the first pass, check for delayed messages that have been | 235 // On the first pass, check for delayed messages that have been |
236 // triggered and calculate the next trigger time. | 236 // triggered and calculate the next trigger time. |
237 if (first_pass) { | 237 if (first_pass) { |
238 first_pass = false; | 238 first_pass = false; |
239 while (!dmsgq_.empty()) { | 239 while (!dmsgq_.empty()) { |
240 if (TimeIsLater(msCurrent, dmsgq_.top().msTrigger_)) { | 240 if (msCurrent < dmsgq_.top().msTrigger_) { |
241 cmsDelayNext = TimeDiff(dmsgq_.top().msTrigger_, msCurrent); | 241 cmsDelayNext = TimeDiff(dmsgq_.top().msTrigger_, msCurrent); |
242 break; | 242 break; |
243 } | 243 } |
244 msgq_.push_back(dmsgq_.top().msg_); | 244 msgq_.push_back(dmsgq_.top().msg_); |
245 dmsgq_.pop(); | 245 dmsgq_.pop(); |
246 } | 246 } |
247 } | 247 } |
248 // Pull a message off the message queue, if available. | 248 // Pull a message off the message queue, if available. |
249 if (msgq_.empty()) { | 249 if (msgq_.empty()) { |
250 break; | 250 break; |
251 } else { | 251 } else { |
252 *pmsg = msgq_.front(); | 252 *pmsg = msgq_.front(); |
253 msgq_.pop_front(); | 253 msgq_.pop_front(); |
254 } | 254 } |
255 } // crit_ is released here. | 255 } // crit_ is released here. |
256 | 256 |
257 // Log a warning for time-sensitive messages that we're late to deliver. | 257 // Log a warning for time-sensitive messages that we're late to deliver. |
258 if (pmsg->ts_sensitive) { | 258 if (pmsg->ts_sensitive) { |
259 int32_t delay = TimeDiff(msCurrent, pmsg->ts_sensitive); | 259 int64_t delay = TimeDiff(msCurrent, pmsg->ts_sensitive); |
260 if (delay > 0) { | 260 if (delay > 0) { |
261 LOG_F(LS_WARNING) << "id: " << pmsg->message_id << " delay: " | 261 LOG_F(LS_WARNING) << "id: " << pmsg->message_id << " delay: " |
262 << (delay + kMaxMsgLatency) << "ms"; | 262 << (delay + kMaxMsgLatency) << "ms"; |
263 } | 263 } |
264 } | 264 } |
265 // If this was a dispose message, delete it and skip it. | 265 // If this was a dispose message, delete it and skip it. |
266 if (MQID_DISPOSE == pmsg->message_id) { | 266 if (MQID_DISPOSE == pmsg->message_id) { |
267 ASSERT(NULL == pmsg->phandler); | 267 ASSERT(NULL == pmsg->phandler); |
268 delete pmsg->pdata; | 268 delete pmsg->pdata; |
269 *pmsg = Message(); | 269 *pmsg = Message(); |
270 continue; | 270 continue; |
271 } | 271 } |
272 return true; | 272 return true; |
273 } | 273 } |
274 | 274 |
275 if (fStop_) | 275 if (fStop_) |
276 break; | 276 break; |
277 | 277 |
278 // Which is shorter, the delay wait or the asked wait? | 278 // Which is shorter, the delay wait or the asked wait? |
279 | 279 |
280 int cmsNext; | 280 int64_t cmsNext; |
281 if (cmsWait == kForever) { | 281 if (cmsWait == kForever) { |
282 cmsNext = cmsDelayNext; | 282 cmsNext = cmsDelayNext; |
283 } else { | 283 } else { |
284 cmsNext = std::max(0, cmsTotal - cmsElapsed); | 284 cmsNext = std::max<int64_t>(0, cmsTotal - cmsElapsed); |
285 if ((cmsDelayNext != kForever) && (cmsDelayNext < cmsNext)) | 285 if ((cmsDelayNext != kForever) && (cmsDelayNext < cmsNext)) |
286 cmsNext = cmsDelayNext; | 286 cmsNext = cmsDelayNext; |
287 } | 287 } |
288 | 288 |
289 { | 289 { |
290 // Wait and multiplex in the meantime | 290 // Wait and multiplex in the meantime |
291 SharedScope ss(&ss_lock_); | 291 SharedScope ss(&ss_lock_); |
292 if (!ss_->Wait(cmsNext, process_io)) | 292 if (!ss_->Wait(static_cast<int>(cmsNext), process_io)) |
293 return false; | 293 return false; |
294 } | 294 } |
295 | 295 |
296 // If the specified timeout expired, return | 296 // If the specified timeout expired, return |
297 | 297 |
298 msCurrent = Time(); | 298 msCurrent = TimeMillis(); |
299 cmsElapsed = TimeDiff(msCurrent, msStart); | 299 cmsElapsed = TimeDiff(msCurrent, msStart); |
300 if (cmsWait != kForever) { | 300 if (cmsWait != kForever) { |
301 if (cmsElapsed >= cmsWait) | 301 if (cmsElapsed >= cmsWait) |
302 return false; | 302 return false; |
303 } | 303 } |
304 } | 304 } |
305 return false; | 305 return false; |
306 } | 306 } |
307 | 307 |
308 void MessageQueue::ReceiveSends() { | 308 void MessageQueue::ReceiveSends() { |
(...skipping 10 matching lines...) Expand all Loading... |
319 // Add the message to the end of the queue | 319 // Add the message to the end of the queue |
320 // Signal for the multiplexer to return | 320 // Signal for the multiplexer to return |
321 | 321 |
322 { | 322 { |
323 CritScope cs(&crit_); | 323 CritScope cs(&crit_); |
324 Message msg; | 324 Message msg; |
325 msg.phandler = phandler; | 325 msg.phandler = phandler; |
326 msg.message_id = id; | 326 msg.message_id = id; |
327 msg.pdata = pdata; | 327 msg.pdata = pdata; |
328 if (time_sensitive) { | 328 if (time_sensitive) { |
329 msg.ts_sensitive = Time() + kMaxMsgLatency; | 329 msg.ts_sensitive = TimeMillis() + kMaxMsgLatency; |
330 } | 330 } |
331 msgq_.push_back(msg); | 331 msgq_.push_back(msg); |
332 } | 332 } |
333 WakeUpSocketServer(); | 333 WakeUpSocketServer(); |
334 } | 334 } |
335 | 335 |
336 void MessageQueue::PostDelayed(int cmsDelay, | 336 void MessageQueue::PostDelayed(int cmsDelay, |
337 MessageHandler* phandler, | 337 MessageHandler* phandler, |
338 uint32_t id, | 338 uint32_t id, |
339 MessageData* pdata) { | 339 MessageData* pdata) { |
340 return DoDelayPost(cmsDelay, TimeAfter(cmsDelay), phandler, id, pdata); | 340 return DoDelayPost(cmsDelay, TimeAfter(cmsDelay), phandler, id, pdata); |
341 } | 341 } |
342 | 342 |
343 void MessageQueue::PostAt(uint32_t tstamp, | 343 void MessageQueue::PostAt(uint32_t tstamp, |
344 MessageHandler* phandler, | 344 MessageHandler* phandler, |
345 uint32_t id, | 345 uint32_t id, |
346 MessageData* pdata) { | 346 MessageData* pdata) { |
| 347 // This should work even if it is used (unexpectedly). |
| 348 int delay = static_cast<uint32_t>(TimeMillis()) - tstamp; |
| 349 return DoDelayPost(delay, tstamp, phandler, id, pdata); |
| 350 } |
| 351 |
| 352 void MessageQueue::PostAt(int64_t tstamp, |
| 353 MessageHandler* phandler, |
| 354 uint32_t id, |
| 355 MessageData* pdata) { |
347 return DoDelayPost(TimeUntil(tstamp), tstamp, phandler, id, pdata); | 356 return DoDelayPost(TimeUntil(tstamp), tstamp, phandler, id, pdata); |
348 } | 357 } |
349 | 358 |
350 void MessageQueue::DoDelayPost(int cmsDelay, | 359 void MessageQueue::DoDelayPost(int cmsDelay, |
351 uint32_t tstamp, | 360 int64_t tstamp, |
352 MessageHandler* phandler, | 361 MessageHandler* phandler, |
353 uint32_t id, | 362 uint32_t id, |
354 MessageData* pdata) { | 363 MessageData* pdata) { |
355 if (fStop_) | 364 if (fStop_) |
356 return; | 365 return; |
357 | 366 |
358 // Keep thread safe | 367 // Keep thread safe |
359 // Add to the priority queue. Gets sorted soonest first. | 368 // Add to the priority queue. Gets sorted soonest first. |
360 // Signal for the multiplexer to return. | 369 // Signal for the multiplexer to return. |
361 | 370 |
(...skipping 78 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
440 dmsgq_.container().erase(new_end, dmsgq_.container().end()); | 449 dmsgq_.container().erase(new_end, dmsgq_.container().end()); |
441 dmsgq_.reheap(); | 450 dmsgq_.reheap(); |
442 } | 451 } |
443 | 452 |
444 void MessageQueue::Dispatch(Message *pmsg) { | 453 void MessageQueue::Dispatch(Message *pmsg) { |
445 TRACE_EVENT0("webrtc", "MessageQueue::Dispatch"); | 454 TRACE_EVENT0("webrtc", "MessageQueue::Dispatch"); |
446 pmsg->phandler->OnMessage(pmsg); | 455 pmsg->phandler->OnMessage(pmsg); |
447 } | 456 } |
448 | 457 |
449 } // namespace rtc | 458 } // namespace rtc |
OLD | NEW |