| OLD | NEW |
| 1 /* | 1 /* |
| 2 * Copyright 2004 The WebRTC Project Authors. All rights reserved. | 2 * Copyright 2004 The WebRTC Project Authors. All rights reserved. |
| 3 * | 3 * |
| 4 * Use of this source code is governed by a BSD-style license | 4 * Use of this source code is governed by a BSD-style license |
| 5 * that can be found in the LICENSE file in the root of the source | 5 * that can be found in the LICENSE file in the root of the source |
| 6 * tree. An additional intellectual property rights grant can be found | 6 * tree. An additional intellectual property rights grant can be found |
| 7 * in the file PATENTS. All contributing project authors may | 7 * in the file PATENTS. All contributing project authors may |
| 8 * be found in the AUTHORS file in the root of the source tree. | 8 * be found in the AUTHORS file in the root of the source tree. |
| 9 */ | 9 */ |
| 10 #include <algorithm> | 10 #include <algorithm> |
| 11 | 11 |
| 12 #include "webrtc/base/checks.h" | 12 #include "webrtc/base/checks.h" |
| 13 #include "webrtc/base/common.h" | 13 #include "webrtc/base/common.h" |
| 14 #include "webrtc/base/logging.h" | 14 #include "webrtc/base/logging.h" |
| 15 #include "webrtc/base/messagequeue.h" | 15 #include "webrtc/base/messagequeue.h" |
| 16 #include "webrtc/base/trace_event.h" | 16 #include "webrtc/base/trace_event.h" |
| 17 | 17 |
| 18 namespace rtc { | 18 namespace rtc { |
| 19 | 19 |
| 20 const uint32_t kMaxMsgLatency = 150; // 150 ms | 20 const int kMaxMsgLatency = 150; // 150 ms |
| 21 | 21 |
| 22 //------------------------------------------------------------------ | 22 //------------------------------------------------------------------ |
| 23 // MessageQueueManager | 23 // MessageQueueManager |
| 24 | 24 |
| 25 MessageQueueManager* MessageQueueManager::instance_ = NULL; | 25 MessageQueueManager* MessageQueueManager::instance_ = NULL; |
| 26 | 26 |
| 27 MessageQueueManager* MessageQueueManager::Instance() { | 27 MessageQueueManager* MessageQueueManager::Instance() { |
| 28 // Note: This is not thread safe, but it is first called before threads are | 28 // Note: This is not thread safe, but it is first called before threads are |
| 29 // spawned. | 29 // spawned. |
| 30 if (!instance_) | 30 if (!instance_) |
| (...skipping 177 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 208 // Always return the peek if it exists so there is Peek/Get symmetry | 208 // Always return the peek if it exists so there is Peek/Get symmetry |
| 209 | 209 |
| 210 if (fPeekKeep_) { | 210 if (fPeekKeep_) { |
| 211 *pmsg = msgPeek_; | 211 *pmsg = msgPeek_; |
| 212 fPeekKeep_ = false; | 212 fPeekKeep_ = false; |
| 213 return true; | 213 return true; |
| 214 } | 214 } |
| 215 | 215 |
| 216 // Get w/wait + timer scan / dispatch + socket / event multiplexer dispatch | 216 // Get w/wait + timer scan / dispatch + socket / event multiplexer dispatch |
| 217 | 217 |
| 218 int cmsTotal = cmsWait; | 218 int64_t cmsTotal = cmsWait; |
| 219 int cmsElapsed = 0; | 219 int64_t cmsElapsed = 0; |
| 220 uint32_t msStart = Time(); | 220 int64_t msStart = TimeMillis(); |
| 221 uint32_t msCurrent = msStart; | 221 int64_t msCurrent = msStart; |
| 222 while (true) { | 222 while (true) { |
| 223 // Check for sent messages | 223 // Check for sent messages |
| 224 ReceiveSends(); | 224 ReceiveSends(); |
| 225 | 225 |
| 226 // Check for posted events | 226 // Check for posted events |
| 227 int cmsDelayNext = kForever; | 227 int64_t cmsDelayNext = kForever; |
| 228 bool first_pass = true; | 228 bool first_pass = true; |
| 229 while (true) { | 229 while (true) { |
| 230 // All queue operations need to be locked, but nothing else in this loop | 230 // All queue operations need to be locked, but nothing else in this loop |
| 231 // (specifically handling disposed message) can happen inside the crit. | 231 // (specifically handling disposed message) can happen inside the crit. |
| 232 // Otherwise, disposed MessageHandlers will cause deadlocks. | 232 // Otherwise, disposed MessageHandlers will cause deadlocks. |
| 233 { | 233 { |
| 234 CritScope cs(&crit_); | 234 CritScope cs(&crit_); |
| 235 // On the first pass, check for delayed messages that have been | 235 // On the first pass, check for delayed messages that have been |
| 236 // triggered and calculate the next trigger time. | 236 // triggered and calculate the next trigger time. |
| 237 if (first_pass) { | 237 if (first_pass) { |
| 238 first_pass = false; | 238 first_pass = false; |
| 239 while (!dmsgq_.empty()) { | 239 while (!dmsgq_.empty()) { |
| 240 if (TimeIsLater(msCurrent, dmsgq_.top().msTrigger_)) { | 240 if (msCurrent < dmsgq_.top().msTrigger_) { |
| 241 cmsDelayNext = TimeDiff(dmsgq_.top().msTrigger_, msCurrent); | 241 cmsDelayNext = TimeDiff(dmsgq_.top().msTrigger_, msCurrent); |
| 242 break; | 242 break; |
| 243 } | 243 } |
| 244 msgq_.push_back(dmsgq_.top().msg_); | 244 msgq_.push_back(dmsgq_.top().msg_); |
| 245 dmsgq_.pop(); | 245 dmsgq_.pop(); |
| 246 } | 246 } |
| 247 } | 247 } |
| 248 // Pull a message off the message queue, if available. | 248 // Pull a message off the message queue, if available. |
| 249 if (msgq_.empty()) { | 249 if (msgq_.empty()) { |
| 250 break; | 250 break; |
| 251 } else { | 251 } else { |
| 252 *pmsg = msgq_.front(); | 252 *pmsg = msgq_.front(); |
| 253 msgq_.pop_front(); | 253 msgq_.pop_front(); |
| 254 } | 254 } |
| 255 } // crit_ is released here. | 255 } // crit_ is released here. |
| 256 | 256 |
| 257 // Log a warning for time-sensitive messages that we're late to deliver. | 257 // Log a warning for time-sensitive messages that we're late to deliver. |
| 258 if (pmsg->ts_sensitive) { | 258 if (pmsg->ts_sensitive) { |
| 259 int32_t delay = TimeDiff(msCurrent, pmsg->ts_sensitive); | 259 int64_t delay = TimeDiff(msCurrent, pmsg->ts_sensitive); |
| 260 if (delay > 0) { | 260 if (delay > 0) { |
| 261 LOG_F(LS_WARNING) << "id: " << pmsg->message_id << " delay: " | 261 LOG_F(LS_WARNING) << "id: " << pmsg->message_id << " delay: " |
| 262 << (delay + kMaxMsgLatency) << "ms"; | 262 << (delay + kMaxMsgLatency) << "ms"; |
| 263 } | 263 } |
| 264 } | 264 } |
| 265 // If this was a dispose message, delete it and skip it. | 265 // If this was a dispose message, delete it and skip it. |
| 266 if (MQID_DISPOSE == pmsg->message_id) { | 266 if (MQID_DISPOSE == pmsg->message_id) { |
| 267 ASSERT(NULL == pmsg->phandler); | 267 ASSERT(NULL == pmsg->phandler); |
| 268 delete pmsg->pdata; | 268 delete pmsg->pdata; |
| 269 *pmsg = Message(); | 269 *pmsg = Message(); |
| 270 continue; | 270 continue; |
| 271 } | 271 } |
| 272 return true; | 272 return true; |
| 273 } | 273 } |
| 274 | 274 |
| 275 if (fStop_) | 275 if (fStop_) |
| 276 break; | 276 break; |
| 277 | 277 |
| 278 // Which is shorter, the delay wait or the asked wait? | 278 // Which is shorter, the delay wait or the asked wait? |
| 279 | 279 |
| 280 int cmsNext; | 280 int64_t cmsNext; |
| 281 if (cmsWait == kForever) { | 281 if (cmsWait == kForever) { |
| 282 cmsNext = cmsDelayNext; | 282 cmsNext = cmsDelayNext; |
| 283 } else { | 283 } else { |
| 284 cmsNext = std::max(0, cmsTotal - cmsElapsed); | 284 cmsNext = std::max<int64_t>(0, cmsTotal - cmsElapsed); |
| 285 if ((cmsDelayNext != kForever) && (cmsDelayNext < cmsNext)) | 285 if ((cmsDelayNext != kForever) && (cmsDelayNext < cmsNext)) |
| 286 cmsNext = cmsDelayNext; | 286 cmsNext = cmsDelayNext; |
| 287 } | 287 } |
| 288 | 288 |
| 289 { | 289 { |
| 290 // Wait and multiplex in the meantime | 290 // Wait and multiplex in the meantime |
| 291 SharedScope ss(&ss_lock_); | 291 SharedScope ss(&ss_lock_); |
| 292 if (!ss_->Wait(cmsNext, process_io)) | 292 if (!ss_->Wait(static_cast<int>(cmsNext), process_io)) |
| 293 return false; | 293 return false; |
| 294 } | 294 } |
| 295 | 295 |
| 296 // If the specified timeout expired, return | 296 // If the specified timeout expired, return |
| 297 | 297 |
| 298 msCurrent = Time(); | 298 msCurrent = TimeMillis(); |
| 299 cmsElapsed = TimeDiff(msCurrent, msStart); | 299 cmsElapsed = TimeDiff(msCurrent, msStart); |
| 300 if (cmsWait != kForever) { | 300 if (cmsWait != kForever) { |
| 301 if (cmsElapsed >= cmsWait) | 301 if (cmsElapsed >= cmsWait) |
| 302 return false; | 302 return false; |
| 303 } | 303 } |
| 304 } | 304 } |
| 305 return false; | 305 return false; |
| 306 } | 306 } |
| 307 | 307 |
| 308 void MessageQueue::ReceiveSends() { | 308 void MessageQueue::ReceiveSends() { |
| (...skipping 10 matching lines...) Expand all Loading... |
| 319 // Add the message to the end of the queue | 319 // Add the message to the end of the queue |
| 320 // Signal for the multiplexer to return | 320 // Signal for the multiplexer to return |
| 321 | 321 |
| 322 { | 322 { |
| 323 CritScope cs(&crit_); | 323 CritScope cs(&crit_); |
| 324 Message msg; | 324 Message msg; |
| 325 msg.phandler = phandler; | 325 msg.phandler = phandler; |
| 326 msg.message_id = id; | 326 msg.message_id = id; |
| 327 msg.pdata = pdata; | 327 msg.pdata = pdata; |
| 328 if (time_sensitive) { | 328 if (time_sensitive) { |
| 329 msg.ts_sensitive = Time() + kMaxMsgLatency; | 329 msg.ts_sensitive = TimeMillis() + kMaxMsgLatency; |
| 330 } | 330 } |
| 331 msgq_.push_back(msg); | 331 msgq_.push_back(msg); |
| 332 } | 332 } |
| 333 WakeUpSocketServer(); | 333 WakeUpSocketServer(); |
| 334 } | 334 } |
| 335 | 335 |
| 336 void MessageQueue::PostDelayed(int cmsDelay, | 336 void MessageQueue::PostDelayed(int cmsDelay, |
| 337 MessageHandler* phandler, | 337 MessageHandler* phandler, |
| 338 uint32_t id, | 338 uint32_t id, |
| 339 MessageData* pdata) { | 339 MessageData* pdata) { |
| 340 return DoDelayPost(cmsDelay, TimeAfter(cmsDelay), phandler, id, pdata); | 340 return DoDelayPost(cmsDelay, TimeAfter(cmsDelay), phandler, id, pdata); |
| 341 } | 341 } |
| 342 | 342 |
| 343 void MessageQueue::PostAt(uint32_t tstamp, | 343 void MessageQueue::PostAt(uint32_t tstamp, |
| 344 MessageHandler* phandler, | 344 MessageHandler* phandler, |
| 345 uint32_t id, | 345 uint32_t id, |
| 346 MessageData* pdata) { | 346 MessageData* pdata) { |
| 347 // This should work even if it is used (unexpectedly). |
| 348 int delay = static_cast<uint32_t>(TimeMillis()) - tstamp; |
| 349 return DoDelayPost(delay, tstamp, phandler, id, pdata); |
| 350 } |
| 351 |
| 352 void MessageQueue::PostAt(int64_t tstamp, |
| 353 MessageHandler* phandler, |
| 354 uint32_t id, |
| 355 MessageData* pdata) { |
| 347 return DoDelayPost(TimeUntil(tstamp), tstamp, phandler, id, pdata); | 356 return DoDelayPost(TimeUntil(tstamp), tstamp, phandler, id, pdata); |
| 348 } | 357 } |
| 349 | 358 |
| 350 void MessageQueue::DoDelayPost(int cmsDelay, | 359 void MessageQueue::DoDelayPost(int cmsDelay, |
| 351 uint32_t tstamp, | 360 int64_t tstamp, |
| 352 MessageHandler* phandler, | 361 MessageHandler* phandler, |
| 353 uint32_t id, | 362 uint32_t id, |
| 354 MessageData* pdata) { | 363 MessageData* pdata) { |
| 355 if (fStop_) | 364 if (fStop_) |
| 356 return; | 365 return; |
| 357 | 366 |
| 358 // Keep thread safe | 367 // Keep thread safe |
| 359 // Add to the priority queue. Gets sorted soonest first. | 368 // Add to the priority queue. Gets sorted soonest first. |
| 360 // Signal for the multiplexer to return. | 369 // Signal for the multiplexer to return. |
| 361 | 370 |
| (...skipping 78 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 440 dmsgq_.container().erase(new_end, dmsgq_.container().end()); | 449 dmsgq_.container().erase(new_end, dmsgq_.container().end()); |
| 441 dmsgq_.reheap(); | 450 dmsgq_.reheap(); |
| 442 } | 451 } |
| 443 | 452 |
| 444 void MessageQueue::Dispatch(Message *pmsg) { | 453 void MessageQueue::Dispatch(Message *pmsg) { |
| 445 TRACE_EVENT0("webrtc", "MessageQueue::Dispatch"); | 454 TRACE_EVENT0("webrtc", "MessageQueue::Dispatch"); |
| 446 pmsg->phandler->OnMessage(pmsg); | 455 pmsg->phandler->OnMessage(pmsg); |
| 447 } | 456 } |
| 448 | 457 |
| 449 } // namespace rtc | 458 } // namespace rtc |
| OLD | NEW |