| OLD | NEW | 
|---|
| 1 /* | 1 /* | 
| 2  *  Copyright 2004 The WebRTC Project Authors. All rights reserved. | 2  *  Copyright 2004 The WebRTC Project Authors. All rights reserved. | 
| 3  * | 3  * | 
| 4  *  Use of this source code is governed by a BSD-style license | 4  *  Use of this source code is governed by a BSD-style license | 
| 5  *  that can be found in the LICENSE file in the root of the source | 5  *  that can be found in the LICENSE file in the root of the source | 
| 6  *  tree. An additional intellectual property rights grant can be found | 6  *  tree. An additional intellectual property rights grant can be found | 
| 7  *  in the file PATENTS.  All contributing project authors may | 7  *  in the file PATENTS.  All contributing project authors may | 
| 8  *  be found in the AUTHORS file in the root of the source tree. | 8  *  be found in the AUTHORS file in the root of the source tree. | 
| 9  */ | 9  */ | 
| 10 #include <algorithm> | 10 #include <algorithm> | 
| (...skipping 132 matching lines...) Expand 10 before | Expand all | Expand 10 after  Loading... | 
| 143   // we can't synchronously wait for queues_not_done to go to 0; we need to | 143   // we can't synchronously wait for queues_not_done to go to 0; we need to | 
| 144   // process messages as well. | 144   // process messages as well. | 
| 145   while (AtomicOps::AcquireLoad(&queues_not_done) > 0) { | 145   while (AtomicOps::AcquireLoad(&queues_not_done) > 0) { | 
| 146     rtc::Thread::Current()->ProcessMessages(0); | 146     rtc::Thread::Current()->ProcessMessages(0); | 
| 147   } | 147   } | 
| 148 } | 148 } | 
| 149 | 149 | 
| 150 //------------------------------------------------------------------ | 150 //------------------------------------------------------------------ | 
| 151 // MessageQueue | 151 // MessageQueue | 
| 152 MessageQueue::MessageQueue(SocketServer* ss, bool init_queue) | 152 MessageQueue::MessageQueue(SocketServer* ss, bool init_queue) | 
| 153     : fPeekKeep_(false), | 153     : fStop_(false), fPeekKeep_(false), | 
| 154       dmsgq_next_num_(0), | 154       dmsgq_next_num_(0), fInitialized_(false), fDestroyed_(false), ss_(ss) { | 
| 155       fInitialized_(false), |  | 
| 156       fDestroyed_(false), |  | 
| 157       stop_(0), |  | 
| 158       ss_(ss) { |  | 
| 159   RTC_DCHECK(ss); | 155   RTC_DCHECK(ss); | 
| 160   // Currently, MessageQueue holds a socket server, and is the base class for | 156   // Currently, MessageQueue holds a socket server, and is the base class for | 
| 161   // Thread.  It seems like it makes more sense for Thread to hold the socket | 157   // Thread.  It seems like it makes more sense for Thread to hold the socket | 
| 162   // server, and provide it to the MessageQueue, since the Thread controls | 158   // server, and provide it to the MessageQueue, since the Thread controls | 
| 163   // the I/O model, and MQ is agnostic to those details.  Anyway, this causes | 159   // the I/O model, and MQ is agnostic to those details.  Anyway, this causes | 
| 164   // messagequeue_unittest to depend on network libraries... yuck. | 160   // messagequeue_unittest to depend on network libraries... yuck. | 
| 165   ss_->SetMessageQueue(this); | 161   ss_->SetMessageQueue(this); | 
| 166   if (init_queue) { | 162   if (init_queue) { | 
| 167     DoInit(); | 163     DoInit(); | 
| 168   } | 164   } | 
| (...skipping 51 matching lines...) Expand 10 before | Expand all | Expand 10 after  Loading... | 
| 220   ss_ = ss ? ss : own_ss_.get(); | 216   ss_ = ss ? ss : own_ss_.get(); | 
| 221   ss_->SetMessageQueue(this); | 217   ss_->SetMessageQueue(this); | 
| 222 } | 218 } | 
| 223 | 219 | 
| 224 void MessageQueue::WakeUpSocketServer() { | 220 void MessageQueue::WakeUpSocketServer() { | 
| 225   SharedScope ss(&ss_lock_); | 221   SharedScope ss(&ss_lock_); | 
| 226   ss_->WakeUp(); | 222   ss_->WakeUp(); | 
| 227 } | 223 } | 
| 228 | 224 | 
| 229 void MessageQueue::Quit() { | 225 void MessageQueue::Quit() { | 
| 230   AtomicOps::ReleaseStore(&stop_, 1); | 226   fStop_ = true; | 
| 231   WakeUpSocketServer(); | 227   WakeUpSocketServer(); | 
| 232 } | 228 } | 
| 233 | 229 | 
| 234 bool MessageQueue::IsQuitting() { | 230 bool MessageQueue::IsQuitting() { | 
| 235   return AtomicOps::AcquireLoad(&stop_) != 0; | 231   return fStop_; | 
| 236 } | 232 } | 
| 237 | 233 | 
| 238 void MessageQueue::Restart() { | 234 void MessageQueue::Restart() { | 
| 239   AtomicOps::ReleaseStore(&stop_, 0); | 235   fStop_ = false; | 
| 240 } | 236 } | 
| 241 | 237 | 
| 242 bool MessageQueue::Peek(Message *pmsg, int cmsWait) { | 238 bool MessageQueue::Peek(Message *pmsg, int cmsWait) { | 
| 243   if (fPeekKeep_) { | 239   if (fPeekKeep_) { | 
| 244     *pmsg = msgPeek_; | 240     *pmsg = msgPeek_; | 
| 245     return true; | 241     return true; | 
| 246   } | 242   } | 
| 247   if (!Get(pmsg, cmsWait)) | 243   if (!Get(pmsg, cmsWait)) | 
| 248     return false; | 244     return false; | 
| 249   msgPeek_ = *pmsg; | 245   msgPeek_ = *pmsg; | 
| (...skipping 63 matching lines...) Expand 10 before | Expand all | Expand 10 after  Loading... | 
| 313       // If this was a dispose message, delete it and skip it. | 309       // If this was a dispose message, delete it and skip it. | 
| 314       if (MQID_DISPOSE == pmsg->message_id) { | 310       if (MQID_DISPOSE == pmsg->message_id) { | 
| 315         ASSERT(NULL == pmsg->phandler); | 311         ASSERT(NULL == pmsg->phandler); | 
| 316         delete pmsg->pdata; | 312         delete pmsg->pdata; | 
| 317         *pmsg = Message(); | 313         *pmsg = Message(); | 
| 318         continue; | 314         continue; | 
| 319       } | 315       } | 
| 320       return true; | 316       return true; | 
| 321     } | 317     } | 
| 322 | 318 | 
| 323     if (IsQuitting()) | 319     if (fStop_) | 
| 324       break; | 320       break; | 
| 325 | 321 | 
| 326     // Which is shorter, the delay wait or the asked wait? | 322     // Which is shorter, the delay wait or the asked wait? | 
| 327 | 323 | 
| 328     int64_t cmsNext; | 324     int64_t cmsNext; | 
| 329     if (cmsWait == kForever) { | 325     if (cmsWait == kForever) { | 
| 330       cmsNext = cmsDelayNext; | 326       cmsNext = cmsDelayNext; | 
| 331     } else { | 327     } else { | 
| 332       cmsNext = std::max<int64_t>(0, cmsTotal - cmsElapsed); | 328       cmsNext = std::max<int64_t>(0, cmsTotal - cmsElapsed); | 
| 333       if ((cmsDelayNext != kForever) && (cmsDelayNext < cmsNext)) | 329       if ((cmsDelayNext != kForever) && (cmsDelayNext < cmsNext)) | 
| (...skipping 20 matching lines...) Expand all  Loading... | 
| 354 } | 350 } | 
| 355 | 351 | 
| 356 void MessageQueue::ReceiveSends() { | 352 void MessageQueue::ReceiveSends() { | 
| 357 } | 353 } | 
| 358 | 354 | 
| 359 void MessageQueue::Post(const Location& posted_from, | 355 void MessageQueue::Post(const Location& posted_from, | 
| 360                         MessageHandler* phandler, | 356                         MessageHandler* phandler, | 
| 361                         uint32_t id, | 357                         uint32_t id, | 
| 362                         MessageData* pdata, | 358                         MessageData* pdata, | 
| 363                         bool time_sensitive) { | 359                         bool time_sensitive) { | 
| 364   if (IsQuitting()) | 360   if (fStop_) | 
| 365     return; | 361     return; | 
| 366 | 362 | 
| 367   // Keep thread safe | 363   // Keep thread safe | 
| 368   // Add the message to the end of the queue | 364   // Add the message to the end of the queue | 
| 369   // Signal for the multiplexer to return | 365   // Signal for the multiplexer to return | 
| 370 | 366 | 
| 371   { | 367   { | 
| 372     CritScope cs(&crit_); | 368     CritScope cs(&crit_); | 
| 373     Message msg; | 369     Message msg; | 
| 374     msg.posted_from = posted_from; | 370     msg.posted_from = posted_from; | 
| (...skipping 35 matching lines...) Expand 10 before | Expand all | Expand 10 after  Loading... | 
| 410   return DoDelayPost(posted_from, TimeUntil(tstamp), tstamp, phandler, id, | 406   return DoDelayPost(posted_from, TimeUntil(tstamp), tstamp, phandler, id, | 
| 411                      pdata); | 407                      pdata); | 
| 412 } | 408 } | 
| 413 | 409 | 
| 414 void MessageQueue::DoDelayPost(const Location& posted_from, | 410 void MessageQueue::DoDelayPost(const Location& posted_from, | 
| 415                                int64_t cmsDelay, | 411                                int64_t cmsDelay, | 
| 416                                int64_t tstamp, | 412                                int64_t tstamp, | 
| 417                                MessageHandler* phandler, | 413                                MessageHandler* phandler, | 
| 418                                uint32_t id, | 414                                uint32_t id, | 
| 419                                MessageData* pdata) { | 415                                MessageData* pdata) { | 
| 420   if (IsQuitting()) { | 416   if (fStop_) { | 
| 421     return; | 417     return; | 
| 422   } | 418   } | 
| 423 | 419 | 
| 424   // Keep thread safe | 420   // Keep thread safe | 
| 425   // Add to the priority queue. Gets sorted soonest first. | 421   // Add to the priority queue. Gets sorted soonest first. | 
| 426   // Signal for the multiplexer to return. | 422   // Signal for the multiplexer to return. | 
| 427 | 423 | 
| 428   { | 424   { | 
| 429     CritScope cs(&crit_); | 425     CritScope cs(&crit_); | 
| 430     Message msg; | 426     Message msg; | 
| (...skipping 85 matching lines...) Expand 10 before | Expand all | Expand 10 after  Loading... | 
| 516   pmsg->phandler->OnMessage(pmsg); | 512   pmsg->phandler->OnMessage(pmsg); | 
| 517   int64_t end_time = TimeMillis(); | 513   int64_t end_time = TimeMillis(); | 
| 518   int64_t diff = TimeDiff(end_time, start_time); | 514   int64_t diff = TimeDiff(end_time, start_time); | 
| 519   if (diff >= kSlowDispatchLoggingThreshold) { | 515   if (diff >= kSlowDispatchLoggingThreshold) { | 
| 520     LOG(LS_INFO) << "Message took " << diff << "ms to dispatch. Posted from: " | 516     LOG(LS_INFO) << "Message took " << diff << "ms to dispatch. Posted from: " | 
| 521                  << pmsg->posted_from.ToString(); | 517                  << pmsg->posted_from.ToString(); | 
| 522   } | 518   } | 
| 523 } | 519 } | 
| 524 | 520 | 
| 525 }  // namespace rtc | 521 }  // namespace rtc | 
| OLD | NEW | 
|---|