| OLD | NEW |
| 1 /* | 1 /* |
| 2 * Copyright 2004 The WebRTC Project Authors. All rights reserved. | 2 * Copyright 2004 The WebRTC Project Authors. All rights reserved. |
| 3 * | 3 * |
| 4 * Use of this source code is governed by a BSD-style license | 4 * Use of this source code is governed by a BSD-style license |
| 5 * that can be found in the LICENSE file in the root of the source | 5 * that can be found in the LICENSE file in the root of the source |
| 6 * tree. An additional intellectual property rights grant can be found | 6 * tree. An additional intellectual property rights grant can be found |
| 7 * in the file PATENTS. All contributing project authors may | 7 * in the file PATENTS. All contributing project authors may |
| 8 * be found in the AUTHORS file in the root of the source tree. | 8 * be found in the AUTHORS file in the root of the source tree. |
| 9 */ | 9 */ |
| 10 #include <algorithm> | 10 #include <algorithm> |
| (...skipping 132 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 143 // we can't synchronously wait for queues_not_done to go to 0; we need to | 143 // we can't synchronously wait for queues_not_done to go to 0; we need to |
| 144 // process messages as well. | 144 // process messages as well. |
| 145 while (AtomicOps::AcquireLoad(&queues_not_done) > 0) { | 145 while (AtomicOps::AcquireLoad(&queues_not_done) > 0) { |
| 146 rtc::Thread::Current()->ProcessMessages(0); | 146 rtc::Thread::Current()->ProcessMessages(0); |
| 147 } | 147 } |
| 148 } | 148 } |
| 149 | 149 |
| 150 //------------------------------------------------------------------ | 150 //------------------------------------------------------------------ |
| 151 // MessageQueue | 151 // MessageQueue |
| 152 MessageQueue::MessageQueue(SocketServer* ss, bool init_queue) | 152 MessageQueue::MessageQueue(SocketServer* ss, bool init_queue) |
| 153 : fStop_(false), fPeekKeep_(false), | 153 : fPeekKeep_(false), |
| 154 dmsgq_next_num_(0), fInitialized_(false), fDestroyed_(false), ss_(ss) { | 154 dmsgq_next_num_(0), |
| 155 fInitialized_(false), |
| 156 fDestroyed_(false), |
| 157 stop_(0), |
| 158 ss_(ss) { |
| 155 RTC_DCHECK(ss); | 159 RTC_DCHECK(ss); |
| 156 // Currently, MessageQueue holds a socket server, and is the base class for | 160 // Currently, MessageQueue holds a socket server, and is the base class for |
| 157 // Thread. It seems like it makes more sense for Thread to hold the socket | 161 // Thread. It seems like it makes more sense for Thread to hold the socket |
| 158 // server, and provide it to the MessageQueue, since the Thread controls | 162 // server, and provide it to the MessageQueue, since the Thread controls |
| 159 // the I/O model, and MQ is agnostic to those details. Anyway, this causes | 163 // the I/O model, and MQ is agnostic to those details. Anyway, this causes |
| 160 // messagequeue_unittest to depend on network libraries... yuck. | 164 // messagequeue_unittest to depend on network libraries... yuck. |
| 161 ss_->SetMessageQueue(this); | 165 ss_->SetMessageQueue(this); |
| 162 if (init_queue) { | 166 if (init_queue) { |
| 163 DoInit(); | 167 DoInit(); |
| 164 } | 168 } |
| (...skipping 51 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 216 ss_ = ss ? ss : own_ss_.get(); | 220 ss_ = ss ? ss : own_ss_.get(); |
| 217 ss_->SetMessageQueue(this); | 221 ss_->SetMessageQueue(this); |
| 218 } | 222 } |
| 219 | 223 |
| 220 void MessageQueue::WakeUpSocketServer() { | 224 void MessageQueue::WakeUpSocketServer() { |
| 221 SharedScope ss(&ss_lock_); | 225 SharedScope ss(&ss_lock_); |
| 222 ss_->WakeUp(); | 226 ss_->WakeUp(); |
| 223 } | 227 } |
| 224 | 228 |
| 225 void MessageQueue::Quit() { | 229 void MessageQueue::Quit() { |
| 226 fStop_ = true; | 230 AtomicOps::ReleaseStore(&stop_, 1); |
| 227 WakeUpSocketServer(); | 231 WakeUpSocketServer(); |
| 228 } | 232 } |
| 229 | 233 |
| 230 bool MessageQueue::IsQuitting() { | 234 bool MessageQueue::IsQuitting() { |
| 231 return fStop_; | 235 return AtomicOps::AcquireLoad(&stop_) != 0; |
| 232 } | 236 } |
| 233 | 237 |
| 234 void MessageQueue::Restart() { | 238 void MessageQueue::Restart() { |
| 235 fStop_ = false; | 239 AtomicOps::ReleaseStore(&stop_, 0); |
| 236 } | 240 } |
| 237 | 241 |
| 238 bool MessageQueue::Peek(Message *pmsg, int cmsWait) { | 242 bool MessageQueue::Peek(Message *pmsg, int cmsWait) { |
| 239 if (fPeekKeep_) { | 243 if (fPeekKeep_) { |
| 240 *pmsg = msgPeek_; | 244 *pmsg = msgPeek_; |
| 241 return true; | 245 return true; |
| 242 } | 246 } |
| 243 if (!Get(pmsg, cmsWait)) | 247 if (!Get(pmsg, cmsWait)) |
| 244 return false; | 248 return false; |
| 245 msgPeek_ = *pmsg; | 249 msgPeek_ = *pmsg; |
| (...skipping 63 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 309 // If this was a dispose message, delete it and skip it. | 313 // If this was a dispose message, delete it and skip it. |
| 310 if (MQID_DISPOSE == pmsg->message_id) { | 314 if (MQID_DISPOSE == pmsg->message_id) { |
| 311 ASSERT(NULL == pmsg->phandler); | 315 ASSERT(NULL == pmsg->phandler); |
| 312 delete pmsg->pdata; | 316 delete pmsg->pdata; |
| 313 *pmsg = Message(); | 317 *pmsg = Message(); |
| 314 continue; | 318 continue; |
| 315 } | 319 } |
| 316 return true; | 320 return true; |
| 317 } | 321 } |
| 318 | 322 |
| 319 if (fStop_) | 323 if (IsQuitting()) |
| 320 break; | 324 break; |
| 321 | 325 |
| 322 // Which is shorter, the delay wait or the asked wait? | 326 // Which is shorter, the delay wait or the asked wait? |
| 323 | 327 |
| 324 int64_t cmsNext; | 328 int64_t cmsNext; |
| 325 if (cmsWait == kForever) { | 329 if (cmsWait == kForever) { |
| 326 cmsNext = cmsDelayNext; | 330 cmsNext = cmsDelayNext; |
| 327 } else { | 331 } else { |
| 328 cmsNext = std::max<int64_t>(0, cmsTotal - cmsElapsed); | 332 cmsNext = std::max<int64_t>(0, cmsTotal - cmsElapsed); |
| 329 if ((cmsDelayNext != kForever) && (cmsDelayNext < cmsNext)) | 333 if ((cmsDelayNext != kForever) && (cmsDelayNext < cmsNext)) |
| (...skipping 20 matching lines...) Expand all Loading... |
| 350 } | 354 } |
| 351 | 355 |
| 352 void MessageQueue::ReceiveSends() { | 356 void MessageQueue::ReceiveSends() { |
| 353 } | 357 } |
| 354 | 358 |
| 355 void MessageQueue::Post(const Location& posted_from, | 359 void MessageQueue::Post(const Location& posted_from, |
| 356 MessageHandler* phandler, | 360 MessageHandler* phandler, |
| 357 uint32_t id, | 361 uint32_t id, |
| 358 MessageData* pdata, | 362 MessageData* pdata, |
| 359 bool time_sensitive) { | 363 bool time_sensitive) { |
| 360 if (fStop_) | 364 if (IsQuitting()) |
| 361 return; | 365 return; |
| 362 | 366 |
| 363 // Keep thread safe | 367 // Keep thread safe |
| 364 // Add the message to the end of the queue | 368 // Add the message to the end of the queue |
| 365 // Signal for the multiplexer to return | 369 // Signal for the multiplexer to return |
| 366 | 370 |
| 367 { | 371 { |
| 368 CritScope cs(&crit_); | 372 CritScope cs(&crit_); |
| 369 Message msg; | 373 Message msg; |
| 370 msg.posted_from = posted_from; | 374 msg.posted_from = posted_from; |
| (...skipping 35 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 406 return DoDelayPost(posted_from, TimeUntil(tstamp), tstamp, phandler, id, | 410 return DoDelayPost(posted_from, TimeUntil(tstamp), tstamp, phandler, id, |
| 407 pdata); | 411 pdata); |
| 408 } | 412 } |
| 409 | 413 |
| 410 void MessageQueue::DoDelayPost(const Location& posted_from, | 414 void MessageQueue::DoDelayPost(const Location& posted_from, |
| 411 int64_t cmsDelay, | 415 int64_t cmsDelay, |
| 412 int64_t tstamp, | 416 int64_t tstamp, |
| 413 MessageHandler* phandler, | 417 MessageHandler* phandler, |
| 414 uint32_t id, | 418 uint32_t id, |
| 415 MessageData* pdata) { | 419 MessageData* pdata) { |
| 416 if (fStop_) { | 420 if (IsQuitting()) { |
| 417 return; | 421 return; |
| 418 } | 422 } |
| 419 | 423 |
| 420 // Keep thread safe | 424 // Keep thread safe |
| 421 // Add to the priority queue. Gets sorted soonest first. | 425 // Add to the priority queue. Gets sorted soonest first. |
| 422 // Signal for the multiplexer to return. | 426 // Signal for the multiplexer to return. |
| 423 | 427 |
| 424 { | 428 { |
| 425 CritScope cs(&crit_); | 429 CritScope cs(&crit_); |
| 426 Message msg; | 430 Message msg; |
| (...skipping 85 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 512 pmsg->phandler->OnMessage(pmsg); | 516 pmsg->phandler->OnMessage(pmsg); |
| 513 int64_t end_time = TimeMillis(); | 517 int64_t end_time = TimeMillis(); |
| 514 int64_t diff = TimeDiff(end_time, start_time); | 518 int64_t diff = TimeDiff(end_time, start_time); |
| 515 if (diff >= kSlowDispatchLoggingThreshold) { | 519 if (diff >= kSlowDispatchLoggingThreshold) { |
| 516 LOG(LS_INFO) << "Message took " << diff << "ms to dispatch. Posted from: " | 520 LOG(LS_INFO) << "Message took " << diff << "ms to dispatch. Posted from: " |
| 517 << pmsg->posted_from.ToString(); | 521 << pmsg->posted_from.ToString(); |
| 518 } | 522 } |
| 519 } | 523 } |
| 520 | 524 |
| 521 } // namespace rtc | 525 } // namespace rtc |
| OLD | NEW |