Chromium Code Reviews| OLD | NEW |
|---|---|
| 1 /* | 1 /* |
| 2 * Copyright 2004 The WebRTC Project Authors. All rights reserved. | 2 * Copyright 2004 The WebRTC Project Authors. All rights reserved. |
| 3 * | 3 * |
| 4 * Use of this source code is governed by a BSD-style license | 4 * Use of this source code is governed by a BSD-style license |
| 5 * that can be found in the LICENSE file in the root of the source | 5 * that can be found in the LICENSE file in the root of the source |
| 6 * tree. An additional intellectual property rights grant can be found | 6 * tree. An additional intellectual property rights grant can be found |
| 7 * in the file PATENTS. All contributing project authors may | 7 * in the file PATENTS. All contributing project authors may |
| 8 * be found in the AUTHORS file in the root of the source tree. | 8 * be found in the AUTHORS file in the root of the source tree. |
| 9 */ | 9 */ |
| 10 #include <algorithm> | 10 #include <algorithm> |
| (...skipping 115 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 126 } | 126 } |
| 127 } | 127 } |
| 128 | 128 |
| 129 void MessageQueueManager::OnMessage(Message* pmsg) { | 129 void MessageQueueManager::OnMessage(Message* pmsg) { |
| 130 RTC_DCHECK(pmsg->message_id == MSG_WAKE_MESSAGE_QUEUE); | 130 RTC_DCHECK(pmsg->message_id == MSG_WAKE_MESSAGE_QUEUE); |
| 131 } | 131 } |
| 132 | 132 |
| 133 //------------------------------------------------------------------ | 133 //------------------------------------------------------------------ |
| 134 // MessageQueue | 134 // MessageQueue |
| 135 MessageQueue::MessageQueue(SocketServer* ss, bool init_queue) | 135 MessageQueue::MessageQueue(SocketServer* ss, bool init_queue) |
| 136 : fStop_(false), fPeekKeep_(false), | 136 : fPeekKeep_(false), |
| 137 dmsgq_next_num_(0), fInitialized_(false), fDestroyed_(false), ss_(ss) { | 137 dmsgq_next_num_(0), |
| 138 fInitialized_(false), | |
| 139 fDestroyed_(false), | |
| 140 stop_(false), | |
| 141 ss_(ss) { | |
| 138 RTC_DCHECK(ss); | 142 RTC_DCHECK(ss); |
| 139 // Currently, MessageQueue holds a socket server, and is the base class for | 143 // Currently, MessageQueue holds a socket server, and is the base class for |
| 140 // Thread. It seems like it makes more sense for Thread to hold the socket | 144 // Thread. It seems like it makes more sense for Thread to hold the socket |
| 141 // server, and provide it to the MessageQueue, since the Thread controls | 145 // server, and provide it to the MessageQueue, since the Thread controls |
| 142 // the I/O model, and MQ is agnostic to those details. Anyway, this causes | 146 // the I/O model, and MQ is agnostic to those details. Anyway, this causes |
| 143 // messagequeue_unittest to depend on network libraries... yuck. | 147 // messagequeue_unittest to depend on network libraries... yuck. |
| 144 ss_->SetMessageQueue(this); | 148 ss_->SetMessageQueue(this); |
| 145 if (init_queue) { | 149 if (init_queue) { |
| 146 DoInit(); | 150 DoInit(); |
| 147 } | 151 } |
| (...skipping 51 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 199 ss_ = ss ? ss : own_ss_.get(); | 203 ss_ = ss ? ss : own_ss_.get(); |
| 200 ss_->SetMessageQueue(this); | 204 ss_->SetMessageQueue(this); |
| 201 } | 205 } |
| 202 | 206 |
| 203 void MessageQueue::WakeUpSocketServer() { | 207 void MessageQueue::WakeUpSocketServer() { |
| 204 SharedScope ss(&ss_lock_); | 208 SharedScope ss(&ss_lock_); |
| 205 ss_->WakeUp(); | 209 ss_->WakeUp(); |
| 206 } | 210 } |
| 207 | 211 |
| 208 void MessageQueue::Quit() { | 212 void MessageQueue::Quit() { |
| 209 fStop_ = true; | 213 { |
| 214 CritScope cs(&stop_crit_); | |
| 215 stop_ = true; | |
|
pthatcher1
2016/06/01 15:29:14
Might as well rename "stop_" to "quitting_".
| |
| 216 } | |
| 210 WakeUpSocketServer(); | 217 WakeUpSocketServer(); |
| 211 } | 218 } |
| 212 | 219 |
| 213 bool MessageQueue::IsQuitting() { | 220 bool MessageQueue::IsQuitting() { |
| 214 return fStop_; | 221 CritScope cs(&stop_crit_); |
|
pthatcher1
2016/06/01 15:29:14
Should we use an std::atomic here once they are al
tommi (sloooow) - chröme
2016/06/01 15:53:29
We do have alternatives to std::atomic that we can
| |
| 222 return stop_; | |
| 215 } | 223 } |
| 216 | 224 |
| 217 void MessageQueue::Restart() { | 225 void MessageQueue::Restart() { |
| 218 fStop_ = false; | 226 CritScope cs(&stop_crit_); |
| 227 stop_ = false; | |
| 219 } | 228 } |
| 220 | 229 |
| 221 bool MessageQueue::Peek(Message *pmsg, int cmsWait) { | 230 bool MessageQueue::Peek(Message *pmsg, int cmsWait) { |
| 222 if (fPeekKeep_) { | 231 if (fPeekKeep_) { |
| 223 *pmsg = msgPeek_; | 232 *pmsg = msgPeek_; |
| 224 return true; | 233 return true; |
| 225 } | 234 } |
| 226 if (!Get(pmsg, cmsWait)) | 235 if (!Get(pmsg, cmsWait)) |
| 227 return false; | 236 return false; |
| 228 msgPeek_ = *pmsg; | 237 msgPeek_ = *pmsg; |
| (...skipping 63 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 292 // If this was a dispose message, delete it and skip it. | 301 // If this was a dispose message, delete it and skip it. |
| 293 if (MQID_DISPOSE == pmsg->message_id) { | 302 if (MQID_DISPOSE == pmsg->message_id) { |
| 294 ASSERT(NULL == pmsg->phandler); | 303 ASSERT(NULL == pmsg->phandler); |
| 295 delete pmsg->pdata; | 304 delete pmsg->pdata; |
| 296 *pmsg = Message(); | 305 *pmsg = Message(); |
| 297 continue; | 306 continue; |
| 298 } | 307 } |
| 299 return true; | 308 return true; |
| 300 } | 309 } |
| 301 | 310 |
| 302 if (fStop_) | 311 if (IsQuitting()) |
| 303 break; | 312 break; |
| 304 | 313 |
| 305 // Which is shorter, the delay wait or the asked wait? | 314 // Which is shorter, the delay wait or the asked wait? |
| 306 | 315 |
| 307 int64_t cmsNext; | 316 int64_t cmsNext; |
| 308 if (cmsWait == kForever) { | 317 if (cmsWait == kForever) { |
| 309 cmsNext = cmsDelayNext; | 318 cmsNext = cmsDelayNext; |
| 310 } else { | 319 } else { |
| 311 cmsNext = std::max<int64_t>(0, cmsTotal - cmsElapsed); | 320 cmsNext = std::max<int64_t>(0, cmsTotal - cmsElapsed); |
| 312 if ((cmsDelayNext != kForever) && (cmsDelayNext < cmsNext)) | 321 if ((cmsDelayNext != kForever) && (cmsDelayNext < cmsNext)) |
| (...skipping 19 matching lines...) Expand all Loading... | |
| 332 return false; | 341 return false; |
| 333 } | 342 } |
| 334 | 343 |
| 335 void MessageQueue::ReceiveSends() { | 344 void MessageQueue::ReceiveSends() { |
| 336 } | 345 } |
| 337 | 346 |
| 338 void MessageQueue::Post(MessageHandler* phandler, | 347 void MessageQueue::Post(MessageHandler* phandler, |
| 339 uint32_t id, | 348 uint32_t id, |
| 340 MessageData* pdata, | 349 MessageData* pdata, |
| 341 bool time_sensitive) { | 350 bool time_sensitive) { |
| 342 if (fStop_) | 351 if (IsQuitting()) |
| 343 return; | 352 return; |
| 344 | 353 |
| 345 // Keep thread safe | 354 // Keep thread safe |
| 346 // Add the message to the end of the queue | 355 // Add the message to the end of the queue |
| 347 // Signal for the multiplexer to return | 356 // Signal for the multiplexer to return |
| 348 | 357 |
| 349 { | 358 { |
| 350 CritScope cs(&crit_); | 359 CritScope cs(&crit_); |
| 351 Message msg; | 360 Message msg; |
| 352 msg.phandler = phandler; | 361 msg.phandler = phandler; |
| (...skipping 28 matching lines...) Expand all Loading... | |
| 381 uint32_t id, | 390 uint32_t id, |
| 382 MessageData* pdata) { | 391 MessageData* pdata) { |
| 383 return DoDelayPost(TimeUntil(tstamp), tstamp, phandler, id, pdata); | 392 return DoDelayPost(TimeUntil(tstamp), tstamp, phandler, id, pdata); |
| 384 } | 393 } |
| 385 | 394 |
| 386 void MessageQueue::DoDelayPost(int64_t cmsDelay, | 395 void MessageQueue::DoDelayPost(int64_t cmsDelay, |
| 387 int64_t tstamp, | 396 int64_t tstamp, |
| 388 MessageHandler* phandler, | 397 MessageHandler* phandler, |
| 389 uint32_t id, | 398 uint32_t id, |
| 390 MessageData* pdata) { | 399 MessageData* pdata) { |
| 391 if (fStop_) { | 400 if (IsQuitting()) { |
| 392 return; | 401 return; |
| 393 } | 402 } |
| 394 | 403 |
| 395 // Keep thread safe | 404 // Keep thread safe |
| 396 // Add to the priority queue. Gets sorted soonest first. | 405 // Add to the priority queue. Gets sorted soonest first. |
| 397 // Signal for the multiplexer to return. | 406 // Signal for the multiplexer to return. |
| 398 | 407 |
| 399 { | 408 { |
| 400 CritScope cs(&crit_); | 409 CritScope cs(&crit_); |
| 401 Message msg; | 410 Message msg; |
| (...skipping 75 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 477 dmsgq_.container().erase(new_end, dmsgq_.container().end()); | 486 dmsgq_.container().erase(new_end, dmsgq_.container().end()); |
| 478 dmsgq_.reheap(); | 487 dmsgq_.reheap(); |
| 479 } | 488 } |
| 480 | 489 |
| 481 void MessageQueue::Dispatch(Message *pmsg) { | 490 void MessageQueue::Dispatch(Message *pmsg) { |
| 482 TRACE_EVENT0("webrtc", "MessageQueue::Dispatch"); | 491 TRACE_EVENT0("webrtc", "MessageQueue::Dispatch"); |
| 483 pmsg->phandler->OnMessage(pmsg); | 492 pmsg->phandler->OnMessage(pmsg); |
| 484 } | 493 } |
| 485 | 494 |
| 486 } // namespace rtc | 495 } // namespace rtc |
| OLD | NEW |