Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(775)

Side by Side Diff: webrtc/base/messagequeue.cc

Issue 1835053002: Change default timestamp to 64 bits in all webrtc directories. (Closed) Base URL: https://chromium.googlesource.com/external/webrtc@master
Patch Set: Add TODO for timestamp. Created 4 years, 7 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « webrtc/base/messagequeue.h ('k') | webrtc/base/messagequeue_unittest.cc » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 /* 1 /*
2 * Copyright 2004 The WebRTC Project Authors. All rights reserved. 2 * Copyright 2004 The WebRTC Project Authors. All rights reserved.
3 * 3 *
4 * Use of this source code is governed by a BSD-style license 4 * Use of this source code is governed by a BSD-style license
5 * that can be found in the LICENSE file in the root of the source 5 * that can be found in the LICENSE file in the root of the source
6 * tree. An additional intellectual property rights grant can be found 6 * tree. An additional intellectual property rights grant can be found
7 * in the file PATENTS. All contributing project authors may 7 * in the file PATENTS. All contributing project authors may
8 * be found in the AUTHORS file in the root of the source tree. 8 * be found in the AUTHORS file in the root of the source tree.
9 */ 9 */
10 #include <algorithm> 10 #include <algorithm>
11 11
12 #include "webrtc/base/checks.h" 12 #include "webrtc/base/checks.h"
13 #include "webrtc/base/common.h" 13 #include "webrtc/base/common.h"
14 #include "webrtc/base/logging.h" 14 #include "webrtc/base/logging.h"
15 #include "webrtc/base/messagequeue.h" 15 #include "webrtc/base/messagequeue.h"
16 #include "webrtc/base/trace_event.h" 16 #include "webrtc/base/trace_event.h"
17 17
18 namespace rtc { 18 namespace rtc {
19 19
20 const uint32_t kMaxMsgLatency = 150; // 150 ms 20 const int kMaxMsgLatency = 150; // 150 ms
21 21
22 //------------------------------------------------------------------ 22 //------------------------------------------------------------------
23 // MessageQueueManager 23 // MessageQueueManager
24 24
25 MessageQueueManager* MessageQueueManager::instance_ = NULL; 25 MessageQueueManager* MessageQueueManager::instance_ = NULL;
26 26
27 MessageQueueManager* MessageQueueManager::Instance() { 27 MessageQueueManager* MessageQueueManager::Instance() {
28 // Note: This is not thread safe, but it is first called before threads are 28 // Note: This is not thread safe, but it is first called before threads are
29 // spawned. 29 // spawned.
30 if (!instance_) 30 if (!instance_)
(...skipping 177 matching lines...) Expand 10 before | Expand all | Expand 10 after
208 // Always return the peek if it exists so there is Peek/Get symmetry 208 // Always return the peek if it exists so there is Peek/Get symmetry
209 209
210 if (fPeekKeep_) { 210 if (fPeekKeep_) {
211 *pmsg = msgPeek_; 211 *pmsg = msgPeek_;
212 fPeekKeep_ = false; 212 fPeekKeep_ = false;
213 return true; 213 return true;
214 } 214 }
215 215
216 // Get w/wait + timer scan / dispatch + socket / event multiplexer dispatch 216 // Get w/wait + timer scan / dispatch + socket / event multiplexer dispatch
217 217
218 int cmsTotal = cmsWait; 218 int64_t cmsTotal = cmsWait;
219 int cmsElapsed = 0; 219 int64_t cmsElapsed = 0;
220 uint32_t msStart = Time(); 220 int64_t msStart = TimeMillis();
221 uint32_t msCurrent = msStart; 221 int64_t msCurrent = msStart;
222 while (true) { 222 while (true) {
223 // Check for sent messages 223 // Check for sent messages
224 ReceiveSends(); 224 ReceiveSends();
225 225
226 // Check for posted events 226 // Check for posted events
227 int cmsDelayNext = kForever; 227 int64_t cmsDelayNext = kForever;
228 bool first_pass = true; 228 bool first_pass = true;
229 while (true) { 229 while (true) {
230 // All queue operations need to be locked, but nothing else in this loop 230 // All queue operations need to be locked, but nothing else in this loop
231 // (specifically handling disposed message) can happen inside the crit. 231 // (specifically handling disposed message) can happen inside the crit.
232 // Otherwise, disposed MessageHandlers will cause deadlocks. 232 // Otherwise, disposed MessageHandlers will cause deadlocks.
233 { 233 {
234 CritScope cs(&crit_); 234 CritScope cs(&crit_);
235 // On the first pass, check for delayed messages that have been 235 // On the first pass, check for delayed messages that have been
236 // triggered and calculate the next trigger time. 236 // triggered and calculate the next trigger time.
237 if (first_pass) { 237 if (first_pass) {
238 first_pass = false; 238 first_pass = false;
239 while (!dmsgq_.empty()) { 239 while (!dmsgq_.empty()) {
240 if (TimeIsLater(msCurrent, dmsgq_.top().msTrigger_)) { 240 if (msCurrent < dmsgq_.top().msTrigger_) {
241 cmsDelayNext = TimeDiff(dmsgq_.top().msTrigger_, msCurrent); 241 cmsDelayNext = TimeDiff(dmsgq_.top().msTrigger_, msCurrent);
242 break; 242 break;
243 } 243 }
244 msgq_.push_back(dmsgq_.top().msg_); 244 msgq_.push_back(dmsgq_.top().msg_);
245 dmsgq_.pop(); 245 dmsgq_.pop();
246 } 246 }
247 } 247 }
248 // Pull a message off the message queue, if available. 248 // Pull a message off the message queue, if available.
249 if (msgq_.empty()) { 249 if (msgq_.empty()) {
250 break; 250 break;
251 } else { 251 } else {
252 *pmsg = msgq_.front(); 252 *pmsg = msgq_.front();
253 msgq_.pop_front(); 253 msgq_.pop_front();
254 } 254 }
255 } // crit_ is released here. 255 } // crit_ is released here.
256 256
257 // Log a warning for time-sensitive messages that we're late to deliver. 257 // Log a warning for time-sensitive messages that we're late to deliver.
258 if (pmsg->ts_sensitive) { 258 if (pmsg->ts_sensitive) {
259 int32_t delay = TimeDiff(msCurrent, pmsg->ts_sensitive); 259 int64_t delay = TimeDiff(msCurrent, pmsg->ts_sensitive);
260 if (delay > 0) { 260 if (delay > 0) {
261 LOG_F(LS_WARNING) << "id: " << pmsg->message_id << " delay: " 261 LOG_F(LS_WARNING) << "id: " << pmsg->message_id << " delay: "
262 << (delay + kMaxMsgLatency) << "ms"; 262 << (delay + kMaxMsgLatency) << "ms";
263 } 263 }
264 } 264 }
265 // If this was a dispose message, delete it and skip it. 265 // If this was a dispose message, delete it and skip it.
266 if (MQID_DISPOSE == pmsg->message_id) { 266 if (MQID_DISPOSE == pmsg->message_id) {
267 ASSERT(NULL == pmsg->phandler); 267 ASSERT(NULL == pmsg->phandler);
268 delete pmsg->pdata; 268 delete pmsg->pdata;
269 *pmsg = Message(); 269 *pmsg = Message();
270 continue; 270 continue;
271 } 271 }
272 return true; 272 return true;
273 } 273 }
274 274
275 if (fStop_) 275 if (fStop_)
276 break; 276 break;
277 277
278 // Which is shorter, the delay wait or the asked wait? 278 // Which is shorter, the delay wait or the asked wait?
279 279
280 int cmsNext; 280 int64_t cmsNext;
281 if (cmsWait == kForever) { 281 if (cmsWait == kForever) {
282 cmsNext = cmsDelayNext; 282 cmsNext = cmsDelayNext;
283 } else { 283 } else {
284 cmsNext = std::max(0, cmsTotal - cmsElapsed); 284 cmsNext = std::max<int64_t>(0, cmsTotal - cmsElapsed);
285 if ((cmsDelayNext != kForever) && (cmsDelayNext < cmsNext)) 285 if ((cmsDelayNext != kForever) && (cmsDelayNext < cmsNext))
286 cmsNext = cmsDelayNext; 286 cmsNext = cmsDelayNext;
287 } 287 }
288 288
289 { 289 {
290 // Wait and multiplex in the meantime 290 // Wait and multiplex in the meantime
291 SharedScope ss(&ss_lock_); 291 SharedScope ss(&ss_lock_);
292 if (!ss_->Wait(cmsNext, process_io)) 292 if (!ss_->Wait(static_cast<int>(cmsNext), process_io))
293 return false; 293 return false;
294 } 294 }
295 295
296 // If the specified timeout expired, return 296 // If the specified timeout expired, return
297 297
298 msCurrent = Time(); 298 msCurrent = TimeMillis();
299 cmsElapsed = TimeDiff(msCurrent, msStart); 299 cmsElapsed = TimeDiff(msCurrent, msStart);
300 if (cmsWait != kForever) { 300 if (cmsWait != kForever) {
301 if (cmsElapsed >= cmsWait) 301 if (cmsElapsed >= cmsWait)
302 return false; 302 return false;
303 } 303 }
304 } 304 }
305 return false; 305 return false;
306 } 306 }
307 307
308 void MessageQueue::ReceiveSends() { 308 void MessageQueue::ReceiveSends() {
(...skipping 10 matching lines...) Expand all
319 // Add the message to the end of the queue 319 // Add the message to the end of the queue
320 // Signal for the multiplexer to return 320 // Signal for the multiplexer to return
321 321
322 { 322 {
323 CritScope cs(&crit_); 323 CritScope cs(&crit_);
324 Message msg; 324 Message msg;
325 msg.phandler = phandler; 325 msg.phandler = phandler;
326 msg.message_id = id; 326 msg.message_id = id;
327 msg.pdata = pdata; 327 msg.pdata = pdata;
328 if (time_sensitive) { 328 if (time_sensitive) {
329 msg.ts_sensitive = Time() + kMaxMsgLatency; 329 msg.ts_sensitive = TimeMillis() + kMaxMsgLatency;
330 } 330 }
331 msgq_.push_back(msg); 331 msgq_.push_back(msg);
332 } 332 }
333 WakeUpSocketServer(); 333 WakeUpSocketServer();
334 } 334 }
335 335
336 void MessageQueue::PostDelayed(int cmsDelay, 336 void MessageQueue::PostDelayed(int cmsDelay,
337 MessageHandler* phandler, 337 MessageHandler* phandler,
338 uint32_t id, 338 uint32_t id,
339 MessageData* pdata) { 339 MessageData* pdata) {
340 return DoDelayPost(cmsDelay, TimeAfter(cmsDelay), phandler, id, pdata); 340 return DoDelayPost(cmsDelay, TimeAfter(cmsDelay), phandler, id, pdata);
341 } 341 }
342 342
343 void MessageQueue::PostAt(uint32_t tstamp, 343 void MessageQueue::PostAt(uint32_t tstamp,
344 MessageHandler* phandler, 344 MessageHandler* phandler,
345 uint32_t id, 345 uint32_t id,
346 MessageData* pdata) { 346 MessageData* pdata) {
347 // This should work even if it is used (unexpectedly).
348 int delay = static_cast<uint32_t>(TimeMillis()) - tstamp;
349 return DoDelayPost(delay, tstamp, phandler, id, pdata);
350 }
351
352 void MessageQueue::PostAt(int64_t tstamp,
353 MessageHandler* phandler,
354 uint32_t id,
355 MessageData* pdata) {
347 return DoDelayPost(TimeUntil(tstamp), tstamp, phandler, id, pdata); 356 return DoDelayPost(TimeUntil(tstamp), tstamp, phandler, id, pdata);
348 } 357 }
349 358
350 void MessageQueue::DoDelayPost(int cmsDelay, 359 void MessageQueue::DoDelayPost(int cmsDelay,
351 uint32_t tstamp, 360 int64_t tstamp,
352 MessageHandler* phandler, 361 MessageHandler* phandler,
353 uint32_t id, 362 uint32_t id,
354 MessageData* pdata) { 363 MessageData* pdata) {
355 if (fStop_) 364 if (fStop_)
356 return; 365 return;
357 366
358 // Keep thread safe 367 // Keep thread safe
359 // Add to the priority queue. Gets sorted soonest first. 368 // Add to the priority queue. Gets sorted soonest first.
360 // Signal for the multiplexer to return. 369 // Signal for the multiplexer to return.
361 370
(...skipping 78 matching lines...) Expand 10 before | Expand all | Expand 10 after
440 dmsgq_.container().erase(new_end, dmsgq_.container().end()); 449 dmsgq_.container().erase(new_end, dmsgq_.container().end());
441 dmsgq_.reheap(); 450 dmsgq_.reheap();
442 } 451 }
443 452
444 void MessageQueue::Dispatch(Message *pmsg) { 453 void MessageQueue::Dispatch(Message *pmsg) {
445 TRACE_EVENT0("webrtc", "MessageQueue::Dispatch"); 454 TRACE_EVENT0("webrtc", "MessageQueue::Dispatch");
446 pmsg->phandler->OnMessage(pmsg); 455 pmsg->phandler->OnMessage(pmsg);
447 } 456 }
448 457
449 } // namespace rtc 458 } // namespace rtc
OLDNEW
« no previous file with comments | « webrtc/base/messagequeue.h ('k') | webrtc/base/messagequeue_unittest.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698