| OLD | NEW | 
|---|
| 1 /* | 1 /* | 
| 2  *  Copyright 2004 The WebRTC Project Authors. All rights reserved. | 2  *  Copyright 2004 The WebRTC Project Authors. All rights reserved. | 
| 3  * | 3  * | 
| 4  *  Use of this source code is governed by a BSD-style license | 4  *  Use of this source code is governed by a BSD-style license | 
| 5  *  that can be found in the LICENSE file in the root of the source | 5  *  that can be found in the LICENSE file in the root of the source | 
| 6  *  tree. An additional intellectual property rights grant can be found | 6  *  tree. An additional intellectual property rights grant can be found | 
| 7  *  in the file PATENTS.  All contributing project authors may | 7  *  in the file PATENTS.  All contributing project authors may | 
| 8  *  be found in the AUTHORS file in the root of the source tree. | 8  *  be found in the AUTHORS file in the root of the source tree. | 
| 9  */ | 9  */ | 
| 10 | 10 | 
| (...skipping 99 matching lines...) Expand 10 before | Expand all | Expand 10 after  Loading... | 
| 110   CritScope cs(&crit_); | 110   CritScope cs(&crit_); | 
| 111   std::vector<MessageQueue *>::iterator iter; | 111   std::vector<MessageQueue *>::iterator iter; | 
| 112   for (iter = message_queues_.begin(); iter != message_queues_.end(); iter++) | 112   for (iter = message_queues_.begin(); iter != message_queues_.end(); iter++) | 
| 113     (*iter)->Clear(handler); | 113     (*iter)->Clear(handler); | 
| 114 } | 114 } | 
| 115 | 115 | 
| 116 //------------------------------------------------------------------ | 116 //------------------------------------------------------------------ | 
| 117 // MessageQueue | 117 // MessageQueue | 
| 118 | 118 | 
| 119 MessageQueue::MessageQueue(SocketServer* ss, bool init_queue) | 119 MessageQueue::MessageQueue(SocketServer* ss, bool init_queue) | 
| 120     : ss_(ss), fStop_(false), fPeekKeep_(false), | 120     : fStop_(false), fPeekKeep_(false), | 
| 121       dmsgq_next_num_(0), fInitialized_(false), fDestroyed_(false) { | 121       dmsgq_next_num_(0), fInitialized_(false), fDestroyed_(false), ss_(ss) { | 
| 122   if (!ss_) { | 122   if (!ss_) { | 
| 123     // Currently, MessageQueue holds a socket server, and is the base class for | 123     // Currently, MessageQueue holds a socket server, and is the base class for | 
| 124     // Thread.  It seems like it makes more sense for Thread to hold the socket | 124     // Thread.  It seems like it makes more sense for Thread to hold the socket | 
| 125     // server, and provide it to the MessageQueue, since the Thread controls | 125     // server, and provide it to the MessageQueue, since the Thread controls | 
| 126     // the I/O model, and MQ is agnostic to those details.  Anyway, this causes | 126     // the I/O model, and MQ is agnostic to those details.  Anyway, this causes | 
| 127     // messagequeue_unittest to depend on network libraries... yuck. | 127     // messagequeue_unittest to depend on network libraries... yuck. | 
| 128     default_ss_.reset(new DefaultSocketServer()); | 128     default_ss_.reset(new DefaultSocketServer()); | 
| 129     ss_ = default_ss_.get(); | 129     ss_ = default_ss_.get(); | 
| 130   } | 130   } | 
| 131   ss_->SetMessageQueue(this); | 131   ss_->SetMessageQueue(this); | 
| (...skipping 20 matching lines...) Expand all  Loading... | 
| 152     return; | 152     return; | 
| 153   } | 153   } | 
| 154 | 154 | 
| 155   fDestroyed_ = true; | 155   fDestroyed_ = true; | 
| 156   // The signal is done from here to ensure | 156   // The signal is done from here to ensure | 
| 157   // that it always gets called when the queue | 157   // that it always gets called when the queue | 
| 158   // is going away. | 158   // is going away. | 
| 159   SignalQueueDestroyed(); | 159   SignalQueueDestroyed(); | 
| 160   MessageQueueManager::Remove(this); | 160   MessageQueueManager::Remove(this); | 
| 161   Clear(NULL); | 161   Clear(NULL); | 
|  | 162 | 
|  | 163   SharedScope ss(&ss_lock_); | 
| 162   if (ss_) { | 164   if (ss_) { | 
| 163     ss_->SetMessageQueue(NULL); | 165     ss_->SetMessageQueue(NULL); | 
| 164   } | 166   } | 
| 165 } | 167 } | 
| 166 | 168 | 
|  | 169 SocketServer* MessageQueue::socketserver() { | 
|  | 170   SharedScope ss(&ss_lock_); | 
|  | 171   return ss_; | 
|  | 172 } | 
|  | 173 | 
| 167 void MessageQueue::set_socketserver(SocketServer* ss) { | 174 void MessageQueue::set_socketserver(SocketServer* ss) { | 
|  | 175   // Need to lock exclusively here to prevent simultaneous modifications from | 
|  | 176   // other threads. Can't be a shared lock to prevent races with other reading | 
|  | 177   // threads. | 
|  | 178   // Other places that only read "ss_" can use a shared lock as simultaneous | 
|  | 179   // read access is allowed. | 
|  | 180   ExclusiveScope es(&ss_lock_); | 
| 168   ss_ = ss ? ss : default_ss_.get(); | 181   ss_ = ss ? ss : default_ss_.get(); | 
| 169   ss_->SetMessageQueue(this); | 182   ss_->SetMessageQueue(this); | 
| 170 } | 183 } | 
| 171 | 184 | 
| 172 void MessageQueue::Quit() { | 185 void MessageQueue::WakeUpSocketServer() { | 
| 173   fStop_ = true; | 186   SharedScope ss(&ss_lock_); | 
| 174   ss_->WakeUp(); | 187   ss_->WakeUp(); | 
| 175 } | 188 } | 
| 176 | 189 | 
|  | 190 void MessageQueue::Quit() { | 
|  | 191   fStop_ = true; | 
|  | 192   WakeUpSocketServer(); | 
|  | 193 } | 
|  | 194 | 
| 177 bool MessageQueue::IsQuitting() { | 195 bool MessageQueue::IsQuitting() { | 
| 178   return fStop_; | 196   return fStop_; | 
| 179 } | 197 } | 
| 180 | 198 | 
| 181 void MessageQueue::Restart() { | 199 void MessageQueue::Restart() { | 
| 182   fStop_ = false; | 200   fStop_ = false; | 
| 183 } | 201 } | 
| 184 | 202 | 
| 185 bool MessageQueue::Peek(Message *pmsg, int cmsWait) { | 203 bool MessageQueue::Peek(Message *pmsg, int cmsWait) { | 
| 186   if (fPeekKeep_) { | 204   if (fPeekKeep_) { | 
| (...skipping 83 matching lines...) Expand 10 before | Expand all | Expand 10 after  Loading... | 
| 270 | 288 | 
| 271     int cmsNext; | 289     int cmsNext; | 
| 272     if (cmsWait == kForever) { | 290     if (cmsWait == kForever) { | 
| 273       cmsNext = cmsDelayNext; | 291       cmsNext = cmsDelayNext; | 
| 274     } else { | 292     } else { | 
| 275       cmsNext = std::max(0, cmsTotal - cmsElapsed); | 293       cmsNext = std::max(0, cmsTotal - cmsElapsed); | 
| 276       if ((cmsDelayNext != kForever) && (cmsDelayNext < cmsNext)) | 294       if ((cmsDelayNext != kForever) && (cmsDelayNext < cmsNext)) | 
| 277         cmsNext = cmsDelayNext; | 295         cmsNext = cmsDelayNext; | 
| 278     } | 296     } | 
| 279 | 297 | 
| 280     // Wait and multiplex in the meantime | 298     { | 
| 281     if (!ss_->Wait(cmsNext, process_io)) | 299       // Wait and multiplex in the meantime | 
| 282       return false; | 300       SharedScope ss(&ss_lock_); | 
|  | 301       if (!ss_->Wait(cmsNext, process_io)) | 
|  | 302         return false; | 
|  | 303     } | 
| 283 | 304 | 
| 284     // If the specified timeout expired, return | 305     // If the specified timeout expired, return | 
| 285 | 306 | 
| 286     msCurrent = Time(); | 307     msCurrent = Time(); | 
| 287     cmsElapsed = TimeDiff(msCurrent, msStart); | 308     cmsElapsed = TimeDiff(msCurrent, msStart); | 
| 288     if (cmsWait != kForever) { | 309     if (cmsWait != kForever) { | 
| 289       if (cmsElapsed >= cmsWait) | 310       if (cmsElapsed >= cmsWait) | 
| 290         return false; | 311         return false; | 
| 291     } | 312     } | 
| 292   } | 313   } | 
| 293   return false; | 314   return false; | 
| 294 } | 315 } | 
| 295 | 316 | 
| 296 void MessageQueue::ReceiveSends() { | 317 void MessageQueue::ReceiveSends() { | 
| 297 } | 318 } | 
| 298 | 319 | 
| 299 void MessageQueue::Post(MessageHandler* phandler, | 320 void MessageQueue::Post(MessageHandler* phandler, | 
| 300                         uint32_t id, | 321                         uint32_t id, | 
| 301                         MessageData* pdata, | 322                         MessageData* pdata, | 
| 302                         bool time_sensitive) { | 323                         bool time_sensitive) { | 
| 303   if (fStop_) | 324   if (fStop_) | 
| 304     return; | 325     return; | 
| 305 | 326 | 
| 306   // Keep thread safe | 327   // Keep thread safe | 
| 307   // Add the message to the end of the queue | 328   // Add the message to the end of the queue | 
| 308   // Signal for the multiplexer to return | 329   // Signal for the multiplexer to return | 
| 309 | 330 | 
| 310   CritScope cs(&crit_); | 331   { | 
| 311   Message msg; | 332     CritScope cs(&crit_); | 
| 312   msg.phandler = phandler; | 333     Message msg; | 
| 313   msg.message_id = id; | 334     msg.phandler = phandler; | 
| 314   msg.pdata = pdata; | 335     msg.message_id = id; | 
| 315   if (time_sensitive) { | 336     msg.pdata = pdata; | 
| 316     msg.ts_sensitive = Time() + kMaxMsgLatency; | 337     if (time_sensitive) { | 
|  | 338       msg.ts_sensitive = Time() + kMaxMsgLatency; | 
|  | 339     } | 
|  | 340     msgq_.push_back(msg); | 
| 317   } | 341   } | 
| 318   msgq_.push_back(msg); | 342   WakeUpSocketServer(); | 
| 319   ss_->WakeUp(); |  | 
| 320 } | 343 } | 
| 321 | 344 | 
| 322 void MessageQueue::PostDelayed(int cmsDelay, | 345 void MessageQueue::PostDelayed(int cmsDelay, | 
| 323                                MessageHandler* phandler, | 346                                MessageHandler* phandler, | 
| 324                                uint32_t id, | 347                                uint32_t id, | 
| 325                                MessageData* pdata) { | 348                                MessageData* pdata) { | 
| 326   return DoDelayPost(cmsDelay, TimeAfter(cmsDelay), phandler, id, pdata); | 349   return DoDelayPost(cmsDelay, TimeAfter(cmsDelay), phandler, id, pdata); | 
| 327 } | 350 } | 
| 328 | 351 | 
| 329 void MessageQueue::PostAt(uint32_t tstamp, | 352 void MessageQueue::PostAt(uint32_t tstamp, | 
| 330                           MessageHandler* phandler, | 353                           MessageHandler* phandler, | 
| 331                           uint32_t id, | 354                           uint32_t id, | 
| 332                           MessageData* pdata) { | 355                           MessageData* pdata) { | 
| 333   return DoDelayPost(TimeUntil(tstamp), tstamp, phandler, id, pdata); | 356   return DoDelayPost(TimeUntil(tstamp), tstamp, phandler, id, pdata); | 
| 334 } | 357 } | 
| 335 | 358 | 
| 336 void MessageQueue::DoDelayPost(int cmsDelay, | 359 void MessageQueue::DoDelayPost(int cmsDelay, | 
| 337                                uint32_t tstamp, | 360                                uint32_t tstamp, | 
| 338                                MessageHandler* phandler, | 361                                MessageHandler* phandler, | 
| 339                                uint32_t id, | 362                                uint32_t id, | 
| 340                                MessageData* pdata) { | 363                                MessageData* pdata) { | 
| 341   if (fStop_) | 364   if (fStop_) | 
| 342     return; | 365     return; | 
| 343 | 366 | 
| 344   // Keep thread safe | 367   // Keep thread safe | 
| 345   // Add to the priority queue. Gets sorted soonest first. | 368   // Add to the priority queue. Gets sorted soonest first. | 
| 346   // Signal for the multiplexer to return. | 369   // Signal for the multiplexer to return. | 
| 347 | 370 | 
| 348   CritScope cs(&crit_); | 371   { | 
| 349   Message msg; | 372     CritScope cs(&crit_); | 
| 350   msg.phandler = phandler; | 373     Message msg; | 
| 351   msg.message_id = id; | 374     msg.phandler = phandler; | 
| 352   msg.pdata = pdata; | 375     msg.message_id = id; | 
| 353   DelayedMessage dmsg(cmsDelay, tstamp, dmsgq_next_num_, msg); | 376     msg.pdata = pdata; | 
| 354   dmsgq_.push(dmsg); | 377     DelayedMessage dmsg(cmsDelay, tstamp, dmsgq_next_num_, msg); | 
| 355   // If this message queue processes 1 message every millisecond for 50 days, | 378     dmsgq_.push(dmsg); | 
| 356   // we will wrap this number.  Even then, only messages with identical times | 379     // If this message queue processes 1 message every millisecond for 50 days, | 
| 357   // will be misordered, and then only briefly.  This is probably ok. | 380     // we will wrap this number.  Even then, only messages with identical times | 
| 358   VERIFY(0 != ++dmsgq_next_num_); | 381     // will be misordered, and then only briefly.  This is probably ok. | 
| 359   ss_->WakeUp(); | 382     VERIFY(0 != ++dmsgq_next_num_); | 
|  | 383   } | 
|  | 384   WakeUpSocketServer(); | 
| 360 } | 385 } | 
| 361 | 386 | 
| 362 int MessageQueue::GetDelay() { | 387 int MessageQueue::GetDelay() { | 
| 363   CritScope cs(&crit_); | 388   CritScope cs(&crit_); | 
| 364 | 389 | 
| 365   if (!msgq_.empty()) | 390   if (!msgq_.empty()) | 
| 366     return 0; | 391     return 0; | 
| 367 | 392 | 
| 368   if (!dmsgq_.empty()) { | 393   if (!dmsgq_.empty()) { | 
| 369     int delay = TimeUntil(dmsgq_.top().msTrigger_); | 394     int delay = TimeUntil(dmsgq_.top().msTrigger_); | 
| (...skipping 53 matching lines...) Expand 10 before | Expand all | Expand 10 after  Loading... | 
| 423   } | 448   } | 
| 424   dmsgq_.container().erase(new_end, dmsgq_.container().end()); | 449   dmsgq_.container().erase(new_end, dmsgq_.container().end()); | 
| 425   dmsgq_.reheap(); | 450   dmsgq_.reheap(); | 
| 426 } | 451 } | 
| 427 | 452 | 
| 428 void MessageQueue::Dispatch(Message *pmsg) { | 453 void MessageQueue::Dispatch(Message *pmsg) { | 
| 429   pmsg->phandler->OnMessage(pmsg); | 454   pmsg->phandler->OnMessage(pmsg); | 
| 430 } | 455 } | 
| 431 | 456 | 
| 432 }  // namespace rtc | 457 }  // namespace rtc | 
| OLD | NEW | 
|---|