| OLD | NEW |
| 1 /* | 1 /* |
| 2 * Copyright 2004 The WebRTC Project Authors. All rights reserved. | 2 * Copyright 2004 The WebRTC Project Authors. All rights reserved. |
| 3 * | 3 * |
| 4 * Use of this source code is governed by a BSD-style license | 4 * Use of this source code is governed by a BSD-style license |
| 5 * that can be found in the LICENSE file in the root of the source | 5 * that can be found in the LICENSE file in the root of the source |
| 6 * tree. An additional intellectual property rights grant can be found | 6 * tree. An additional intellectual property rights grant can be found |
| 7 * in the file PATENTS. All contributing project authors may | 7 * in the file PATENTS. All contributing project authors may |
| 8 * be found in the AUTHORS file in the root of the source tree. | 8 * be found in the AUTHORS file in the root of the source tree. |
| 9 */ | 9 */ |
| 10 | 10 |
| (...skipping 99 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 110 CritScope cs(&crit_); | 110 CritScope cs(&crit_); |
| 111 std::vector<MessageQueue *>::iterator iter; | 111 std::vector<MessageQueue *>::iterator iter; |
| 112 for (iter = message_queues_.begin(); iter != message_queues_.end(); iter++) | 112 for (iter = message_queues_.begin(); iter != message_queues_.end(); iter++) |
| 113 (*iter)->Clear(handler); | 113 (*iter)->Clear(handler); |
| 114 } | 114 } |
| 115 | 115 |
| 116 //------------------------------------------------------------------ | 116 //------------------------------------------------------------------ |
| 117 // MessageQueue | 117 // MessageQueue |
| 118 | 118 |
| 119 MessageQueue::MessageQueue(SocketServer* ss, bool init_queue) | 119 MessageQueue::MessageQueue(SocketServer* ss, bool init_queue) |
| 120 : ss_(ss), fStop_(false), fPeekKeep_(false), | 120 : fStop_(false), fPeekKeep_(false), |
| 121 dmsgq_next_num_(0), fInitialized_(false), fDestroyed_(false) { | 121 dmsgq_next_num_(0), fInitialized_(false), fDestroyed_(false), ss_(ss) { |
| 122 if (!ss_) { | 122 if (!ss_) { |
| 123 // Currently, MessageQueue holds a socket server, and is the base class for | 123 // Currently, MessageQueue holds a socket server, and is the base class for |
| 124 // Thread. It seems like it makes more sense for Thread to hold the socket | 124 // Thread. It seems like it makes more sense for Thread to hold the socket |
| 125 // server, and provide it to the MessageQueue, since the Thread controls | 125 // server, and provide it to the MessageQueue, since the Thread controls |
| 126 // the I/O model, and MQ is agnostic to those details. Anyway, this causes | 126 // the I/O model, and MQ is agnostic to those details. Anyway, this causes |
| 127 // messagequeue_unittest to depend on network libraries... yuck. | 127 // messagequeue_unittest to depend on network libraries... yuck. |
| 128 default_ss_.reset(new DefaultSocketServer()); | 128 default_ss_.reset(new DefaultSocketServer()); |
| 129 ss_ = default_ss_.get(); | 129 ss_ = default_ss_.get(); |
| 130 } | 130 } |
| 131 ss_->SetMessageQueue(this); | 131 ss_->SetMessageQueue(this); |
| (...skipping 20 matching lines...) Expand all Loading... |
| 152 return; | 152 return; |
| 153 } | 153 } |
| 154 | 154 |
| 155 fDestroyed_ = true; | 155 fDestroyed_ = true; |
| 156 // The signal is done from here to ensure | 156 // The signal is done from here to ensure |
| 157 // that it always gets called when the queue | 157 // that it always gets called when the queue |
| 158 // is going away. | 158 // is going away. |
| 159 SignalQueueDestroyed(); | 159 SignalQueueDestroyed(); |
| 160 MessageQueueManager::Remove(this); | 160 MessageQueueManager::Remove(this); |
| 161 Clear(NULL); | 161 Clear(NULL); |
| 162 |
| 163 SharedScope ss(&ss_lock_); |
| 162 if (ss_) { | 164 if (ss_) { |
| 163 ss_->SetMessageQueue(NULL); | 165 ss_->SetMessageQueue(NULL); |
| 164 } | 166 } |
| 165 } | 167 } |
| 166 | 168 |
| 169 SocketServer* MessageQueue::socketserver() { |
| 170 SharedScope ss(&ss_lock_); |
| 171 return ss_; |
| 172 } |
| 173 |
| 167 void MessageQueue::set_socketserver(SocketServer* ss) { | 174 void MessageQueue::set_socketserver(SocketServer* ss) { |
| 175 // Need to lock exclusively here to prevent simultaneous modifications from |
| 176 // other threads. Can't be a shared lock to prevent races with other reading |
| 177 // threads. |
| 178 // Other places that only read "ss_" can use a shared lock as simultaneous |
| 179 // read access is allowed. |
| 180 ExclusiveScope es(&ss_lock_); |
| 168 ss_ = ss ? ss : default_ss_.get(); | 181 ss_ = ss ? ss : default_ss_.get(); |
| 169 ss_->SetMessageQueue(this); | 182 ss_->SetMessageQueue(this); |
| 170 } | 183 } |
| 171 | 184 |
| 172 void MessageQueue::Quit() { | 185 void MessageQueue::WakeUpSocketServer() { |
| 173 fStop_ = true; | 186 SharedScope ss(&ss_lock_); |
| 174 ss_->WakeUp(); | 187 ss_->WakeUp(); |
| 175 } | 188 } |
| 176 | 189 |
| 190 void MessageQueue::Quit() { |
| 191 fStop_ = true; |
| 192 WakeUpSocketServer(); |
| 193 } |
| 194 |
| 177 bool MessageQueue::IsQuitting() { | 195 bool MessageQueue::IsQuitting() { |
| 178 return fStop_; | 196 return fStop_; |
| 179 } | 197 } |
| 180 | 198 |
| 181 void MessageQueue::Restart() { | 199 void MessageQueue::Restart() { |
| 182 fStop_ = false; | 200 fStop_ = false; |
| 183 } | 201 } |
| 184 | 202 |
| 185 bool MessageQueue::Peek(Message *pmsg, int cmsWait) { | 203 bool MessageQueue::Peek(Message *pmsg, int cmsWait) { |
| 186 if (fPeekKeep_) { | 204 if (fPeekKeep_) { |
| (...skipping 83 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 270 | 288 |
| 271 int cmsNext; | 289 int cmsNext; |
| 272 if (cmsWait == kForever) { | 290 if (cmsWait == kForever) { |
| 273 cmsNext = cmsDelayNext; | 291 cmsNext = cmsDelayNext; |
| 274 } else { | 292 } else { |
| 275 cmsNext = std::max(0, cmsTotal - cmsElapsed); | 293 cmsNext = std::max(0, cmsTotal - cmsElapsed); |
| 276 if ((cmsDelayNext != kForever) && (cmsDelayNext < cmsNext)) | 294 if ((cmsDelayNext != kForever) && (cmsDelayNext < cmsNext)) |
| 277 cmsNext = cmsDelayNext; | 295 cmsNext = cmsDelayNext; |
| 278 } | 296 } |
| 279 | 297 |
| 280 // Wait and multiplex in the meantime | 298 { |
| 281 if (!ss_->Wait(cmsNext, process_io)) | 299 // Wait and multiplex in the meantime |
| 282 return false; | 300 SharedScope ss(&ss_lock_); |
| 301 if (!ss_->Wait(cmsNext, process_io)) |
| 302 return false; |
| 303 } |
| 283 | 304 |
| 284 // If the specified timeout expired, return | 305 // If the specified timeout expired, return |
| 285 | 306 |
| 286 msCurrent = Time(); | 307 msCurrent = Time(); |
| 287 cmsElapsed = TimeDiff(msCurrent, msStart); | 308 cmsElapsed = TimeDiff(msCurrent, msStart); |
| 288 if (cmsWait != kForever) { | 309 if (cmsWait != kForever) { |
| 289 if (cmsElapsed >= cmsWait) | 310 if (cmsElapsed >= cmsWait) |
| 290 return false; | 311 return false; |
| 291 } | 312 } |
| 292 } | 313 } |
| 293 return false; | 314 return false; |
| 294 } | 315 } |
| 295 | 316 |
| 296 void MessageQueue::ReceiveSends() { | 317 void MessageQueue::ReceiveSends() { |
| 297 } | 318 } |
| 298 | 319 |
| 299 void MessageQueue::Post(MessageHandler* phandler, | 320 void MessageQueue::Post(MessageHandler* phandler, |
| 300 uint32_t id, | 321 uint32_t id, |
| 301 MessageData* pdata, | 322 MessageData* pdata, |
| 302 bool time_sensitive) { | 323 bool time_sensitive) { |
| 303 if (fStop_) | 324 if (fStop_) |
| 304 return; | 325 return; |
| 305 | 326 |
| 306 // Keep thread safe | 327 // Keep thread safe |
| 307 // Add the message to the end of the queue | 328 // Add the message to the end of the queue |
| 308 // Signal for the multiplexer to return | 329 // Signal for the multiplexer to return |
| 309 | 330 |
| 310 CritScope cs(&crit_); | 331 { |
| 311 Message msg; | 332 CritScope cs(&crit_); |
| 312 msg.phandler = phandler; | 333 Message msg; |
| 313 msg.message_id = id; | 334 msg.phandler = phandler; |
| 314 msg.pdata = pdata; | 335 msg.message_id = id; |
| 315 if (time_sensitive) { | 336 msg.pdata = pdata; |
| 316 msg.ts_sensitive = Time() + kMaxMsgLatency; | 337 if (time_sensitive) { |
| 338 msg.ts_sensitive = Time() + kMaxMsgLatency; |
| 339 } |
| 340 msgq_.push_back(msg); |
| 317 } | 341 } |
| 318 msgq_.push_back(msg); | 342 WakeUpSocketServer(); |
| 319 ss_->WakeUp(); | |
| 320 } | 343 } |
| 321 | 344 |
| 322 void MessageQueue::PostDelayed(int cmsDelay, | 345 void MessageQueue::PostDelayed(int cmsDelay, |
| 323 MessageHandler* phandler, | 346 MessageHandler* phandler, |
| 324 uint32_t id, | 347 uint32_t id, |
| 325 MessageData* pdata) { | 348 MessageData* pdata) { |
| 326 return DoDelayPost(cmsDelay, TimeAfter(cmsDelay), phandler, id, pdata); | 349 return DoDelayPost(cmsDelay, TimeAfter(cmsDelay), phandler, id, pdata); |
| 327 } | 350 } |
| 328 | 351 |
| 329 void MessageQueue::PostAt(uint32_t tstamp, | 352 void MessageQueue::PostAt(uint32_t tstamp, |
| 330 MessageHandler* phandler, | 353 MessageHandler* phandler, |
| 331 uint32_t id, | 354 uint32_t id, |
| 332 MessageData* pdata) { | 355 MessageData* pdata) { |
| 333 return DoDelayPost(TimeUntil(tstamp), tstamp, phandler, id, pdata); | 356 return DoDelayPost(TimeUntil(tstamp), tstamp, phandler, id, pdata); |
| 334 } | 357 } |
| 335 | 358 |
| 336 void MessageQueue::DoDelayPost(int cmsDelay, | 359 void MessageQueue::DoDelayPost(int cmsDelay, |
| 337 uint32_t tstamp, | 360 uint32_t tstamp, |
| 338 MessageHandler* phandler, | 361 MessageHandler* phandler, |
| 339 uint32_t id, | 362 uint32_t id, |
| 340 MessageData* pdata) { | 363 MessageData* pdata) { |
| 341 if (fStop_) | 364 if (fStop_) |
| 342 return; | 365 return; |
| 343 | 366 |
| 344 // Keep thread safe | 367 // Keep thread safe |
| 345 // Add to the priority queue. Gets sorted soonest first. | 368 // Add to the priority queue. Gets sorted soonest first. |
| 346 // Signal for the multiplexer to return. | 369 // Signal for the multiplexer to return. |
| 347 | 370 |
| 348 CritScope cs(&crit_); | 371 { |
| 349 Message msg; | 372 CritScope cs(&crit_); |
| 350 msg.phandler = phandler; | 373 Message msg; |
| 351 msg.message_id = id; | 374 msg.phandler = phandler; |
| 352 msg.pdata = pdata; | 375 msg.message_id = id; |
| 353 DelayedMessage dmsg(cmsDelay, tstamp, dmsgq_next_num_, msg); | 376 msg.pdata = pdata; |
| 354 dmsgq_.push(dmsg); | 377 DelayedMessage dmsg(cmsDelay, tstamp, dmsgq_next_num_, msg); |
| 355 // If this message queue processes 1 message every millisecond for 50 days, | 378 dmsgq_.push(dmsg); |
| 356 // we will wrap this number. Even then, only messages with identical times | 379 // If this message queue processes 1 message every millisecond for 50 days, |
| 357 // will be misordered, and then only briefly. This is probably ok. | 380 // we will wrap this number. Even then, only messages with identical times |
| 358 VERIFY(0 != ++dmsgq_next_num_); | 381 // will be misordered, and then only briefly. This is probably ok. |
| 359 ss_->WakeUp(); | 382 VERIFY(0 != ++dmsgq_next_num_); |
| 383 } |
| 384 WakeUpSocketServer(); |
| 360 } | 385 } |
| 361 | 386 |
| 362 int MessageQueue::GetDelay() { | 387 int MessageQueue::GetDelay() { |
| 363 CritScope cs(&crit_); | 388 CritScope cs(&crit_); |
| 364 | 389 |
| 365 if (!msgq_.empty()) | 390 if (!msgq_.empty()) |
| 366 return 0; | 391 return 0; |
| 367 | 392 |
| 368 if (!dmsgq_.empty()) { | 393 if (!dmsgq_.empty()) { |
| 369 int delay = TimeUntil(dmsgq_.top().msTrigger_); | 394 int delay = TimeUntil(dmsgq_.top().msTrigger_); |
| (...skipping 53 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 423 } | 448 } |
| 424 dmsgq_.container().erase(new_end, dmsgq_.container().end()); | 449 dmsgq_.container().erase(new_end, dmsgq_.container().end()); |
| 425 dmsgq_.reheap(); | 450 dmsgq_.reheap(); |
| 426 } | 451 } |
| 427 | 452 |
| 428 void MessageQueue::Dispatch(Message *pmsg) { | 453 void MessageQueue::Dispatch(Message *pmsg) { |
| 429 pmsg->phandler->OnMessage(pmsg); | 454 pmsg->phandler->OnMessage(pmsg); |
| 430 } | 455 } |
| 431 | 456 |
| 432 } // namespace rtc | 457 } // namespace rtc |
| OLD | NEW |