Chromium Code Reviews| OLD | NEW |
|---|---|
| 1 /* | 1 /* |
| 2 * Copyright 2004 The WebRTC Project Authors. All rights reserved. | 2 * Copyright 2004 The WebRTC Project Authors. All rights reserved. |
| 3 * | 3 * |
| 4 * Use of this source code is governed by a BSD-style license | 4 * Use of this source code is governed by a BSD-style license |
| 5 * that can be found in the LICENSE file in the root of the source | 5 * that can be found in the LICENSE file in the root of the source |
| 6 * tree. An additional intellectual property rights grant can be found | 6 * tree. An additional intellectual property rights grant can be found |
| 7 * in the file PATENTS. All contributing project authors may | 7 * in the file PATENTS. All contributing project authors may |
| 8 * be found in the AUTHORS file in the root of the source tree. | 8 * be found in the AUTHORS file in the root of the source tree. |
| 9 */ | 9 */ |
| 10 #include <algorithm> | 10 #include <algorithm> |
| (...skipping 123 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 134 // we can't synchronously wait for queues_not_done to go to 0; we need to | 134 // we can't synchronously wait for queues_not_done to go to 0; we need to |
| 135 // process messages as well. | 135 // process messages as well. |
| 136 while (AtomicOps::AcquireLoad(&queues_not_done) > 0) { | 136 while (AtomicOps::AcquireLoad(&queues_not_done) > 0) { |
| 137 rtc::Thread::Current()->ProcessMessages(0); | 137 rtc::Thread::Current()->ProcessMessages(0); |
| 138 } | 138 } |
| 139 } | 139 } |
| 140 | 140 |
| 141 //------------------------------------------------------------------ | 141 //------------------------------------------------------------------ |
| 142 // MessageQueue | 142 // MessageQueue |
| 143 MessageQueue::MessageQueue(SocketServer* ss, bool init_queue) | 143 MessageQueue::MessageQueue(SocketServer* ss, bool init_queue) |
| 144 : fStop_(false), fPeekKeep_(false), | 144 : fPeekKeep_(false), |
| 145 dmsgq_next_num_(0), fInitialized_(false), fDestroyed_(false), ss_(ss) { | 145 dmsgq_next_num_(0), |
| 146 fInitialized_(false), | |
| 147 fDestroyed_(false), | |
| 148 stop_(false), | |
|
Taylor Brandstetter
2016/07/07 22:01:47
Since it's an int now, should probably initialize
andresp
2016/07/08 10:05:37
Done.
| |
| 149 ss_(ss) { | |
| 146 RTC_DCHECK(ss); | 150 RTC_DCHECK(ss); |
| 147 // Currently, MessageQueue holds a socket server, and is the base class for | 151 // Currently, MessageQueue holds a socket server, and is the base class for |
| 148 // Thread. It seems like it makes more sense for Thread to hold the socket | 152 // Thread. It seems like it makes more sense for Thread to hold the socket |
| 149 // server, and provide it to the MessageQueue, since the Thread controls | 153 // server, and provide it to the MessageQueue, since the Thread controls |
| 150 // the I/O model, and MQ is agnostic to those details. Anyway, this causes | 154 // the I/O model, and MQ is agnostic to those details. Anyway, this causes |
| 151 // messagequeue_unittest to depend on network libraries... yuck. | 155 // messagequeue_unittest to depend on network libraries... yuck. |
| 152 ss_->SetMessageQueue(this); | 156 ss_->SetMessageQueue(this); |
| 153 if (init_queue) { | 157 if (init_queue) { |
| 154 DoInit(); | 158 DoInit(); |
| 155 } | 159 } |
| (...skipping 51 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 207 ss_ = ss ? ss : own_ss_.get(); | 211 ss_ = ss ? ss : own_ss_.get(); |
| 208 ss_->SetMessageQueue(this); | 212 ss_->SetMessageQueue(this); |
| 209 } | 213 } |
| 210 | 214 |
| 211 void MessageQueue::WakeUpSocketServer() { | 215 void MessageQueue::WakeUpSocketServer() { |
| 212 SharedScope ss(&ss_lock_); | 216 SharedScope ss(&ss_lock_); |
| 213 ss_->WakeUp(); | 217 ss_->WakeUp(); |
| 214 } | 218 } |
| 215 | 219 |
| 216 void MessageQueue::Quit() { | 220 void MessageQueue::Quit() { |
| 217 fStop_ = true; | 221 AtomicOps::ReleaseStore(&stop_, 1); |
| 218 WakeUpSocketServer(); | 222 WakeUpSocketServer(); |
| 219 } | 223 } |
| 220 | 224 |
| 221 bool MessageQueue::IsQuitting() { | 225 bool MessageQueue::IsQuitting() { |
| 222 return fStop_; | 226 return AtomicOps::AcquireLoad(&stop_) != 0; |
| 223 } | 227 } |
| 224 | 228 |
| 225 void MessageQueue::Restart() { | 229 void MessageQueue::Restart() { |
| 226 fStop_ = false; | 230 AtomicOps::ReleaseStore(&stop_, 0); |
| 227 } | 231 } |
| 228 | 232 |
| 229 bool MessageQueue::Peek(Message *pmsg, int cmsWait) { | 233 bool MessageQueue::Peek(Message *pmsg, int cmsWait) { |
| 230 if (fPeekKeep_) { | 234 if (fPeekKeep_) { |
| 231 *pmsg = msgPeek_; | 235 *pmsg = msgPeek_; |
| 232 return true; | 236 return true; |
| 233 } | 237 } |
| 234 if (!Get(pmsg, cmsWait)) | 238 if (!Get(pmsg, cmsWait)) |
| 235 return false; | 239 return false; |
| 236 msgPeek_ = *pmsg; | 240 msgPeek_ = *pmsg; |
| (...skipping 63 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 300 // If this was a dispose message, delete it and skip it. | 304 // If this was a dispose message, delete it and skip it. |
| 301 if (MQID_DISPOSE == pmsg->message_id) { | 305 if (MQID_DISPOSE == pmsg->message_id) { |
| 302 ASSERT(NULL == pmsg->phandler); | 306 ASSERT(NULL == pmsg->phandler); |
| 303 delete pmsg->pdata; | 307 delete pmsg->pdata; |
| 304 *pmsg = Message(); | 308 *pmsg = Message(); |
| 305 continue; | 309 continue; |
| 306 } | 310 } |
| 307 return true; | 311 return true; |
| 308 } | 312 } |
| 309 | 313 |
| 310 if (fStop_) | 314 if (IsQuitting()) |
| 311 break; | 315 break; |
| 312 | 316 |
| 313 // Which is shorter, the delay wait or the asked wait? | 317 // Which is shorter, the delay wait or the asked wait? |
| 314 | 318 |
| 315 int64_t cmsNext; | 319 int64_t cmsNext; |
| 316 if (cmsWait == kForever) { | 320 if (cmsWait == kForever) { |
| 317 cmsNext = cmsDelayNext; | 321 cmsNext = cmsDelayNext; |
| 318 } else { | 322 } else { |
| 319 cmsNext = std::max<int64_t>(0, cmsTotal - cmsElapsed); | 323 cmsNext = std::max<int64_t>(0, cmsTotal - cmsElapsed); |
| 320 if ((cmsDelayNext != kForever) && (cmsDelayNext < cmsNext)) | 324 if ((cmsDelayNext != kForever) && (cmsDelayNext < cmsNext)) |
| (...skipping 20 matching lines...) Expand all Loading... | |
| 341 } | 345 } |
| 342 | 346 |
| 343 void MessageQueue::ReceiveSends() { | 347 void MessageQueue::ReceiveSends() { |
| 344 } | 348 } |
| 345 | 349 |
| 346 void MessageQueue::Post(const Location& posted_from, | 350 void MessageQueue::Post(const Location& posted_from, |
| 347 MessageHandler* phandler, | 351 MessageHandler* phandler, |
| 348 uint32_t id, | 352 uint32_t id, |
| 349 MessageData* pdata, | 353 MessageData* pdata, |
| 350 bool time_sensitive) { | 354 bool time_sensitive) { |
| 351 if (fStop_) | 355 if (IsQuitting()) |
| 352 return; | 356 return; |
| 353 | 357 |
| 354 // Keep thread safe | 358 // Keep thread safe |
| 355 // Add the message to the end of the queue | 359 // Add the message to the end of the queue |
| 356 // Signal for the multiplexer to return | 360 // Signal for the multiplexer to return |
| 357 | 361 |
| 358 { | 362 { |
| 359 CritScope cs(&crit_); | 363 CritScope cs(&crit_); |
| 360 Message msg; | 364 Message msg; |
| 361 msg.posted_from = posted_from; | 365 msg.posted_from = posted_from; |
| (...skipping 35 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 397 return DoDelayPost(posted_from, TimeUntil(tstamp), tstamp, phandler, id, | 401 return DoDelayPost(posted_from, TimeUntil(tstamp), tstamp, phandler, id, |
| 398 pdata); | 402 pdata); |
| 399 } | 403 } |
| 400 | 404 |
| 401 void MessageQueue::DoDelayPost(const Location& posted_from, | 405 void MessageQueue::DoDelayPost(const Location& posted_from, |
| 402 int64_t cmsDelay, | 406 int64_t cmsDelay, |
| 403 int64_t tstamp, | 407 int64_t tstamp, |
| 404 MessageHandler* phandler, | 408 MessageHandler* phandler, |
| 405 uint32_t id, | 409 uint32_t id, |
| 406 MessageData* pdata) { | 410 MessageData* pdata) { |
| 407 if (fStop_) { | 411 if (IsQuitting()) { |
| 408 return; | 412 return; |
| 409 } | 413 } |
| 410 | 414 |
| 411 // Keep thread safe | 415 // Keep thread safe |
| 412 // Add to the priority queue. Gets sorted soonest first. | 416 // Add to the priority queue. Gets sorted soonest first. |
| 413 // Signal for the multiplexer to return. | 417 // Signal for the multiplexer to return. |
| 414 | 418 |
| 415 { | 419 { |
| 416 CritScope cs(&crit_); | 420 CritScope cs(&crit_); |
| 417 Message msg; | 421 Message msg; |
| (...skipping 85 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 503 pmsg->phandler->OnMessage(pmsg); | 507 pmsg->phandler->OnMessage(pmsg); |
| 504 int64_t end_time = TimeMillis(); | 508 int64_t end_time = TimeMillis(); |
| 505 int64_t diff = TimeDiff(end_time, start_time); | 509 int64_t diff = TimeDiff(end_time, start_time); |
| 506 if (diff >= kSlowDispatchLoggingThreshold) { | 510 if (diff >= kSlowDispatchLoggingThreshold) { |
| 507 LOG(LS_INFO) << "Message took " << diff << "ms to dispatch. Posted from: " | 511 LOG(LS_INFO) << "Message took " << diff << "ms to dispatch. Posted from: " |
| 508 << pmsg->posted_from.ToString(); | 512 << pmsg->posted_from.ToString(); |
| 509 } | 513 } |
| 510 } | 514 } |
| 511 | 515 |
| 512 } // namespace rtc | 516 } // namespace rtc |
| OLD | NEW |