| OLD | NEW |
| 1 /* | 1 /* |
| 2 * Copyright 2004 The WebRTC Project Authors. All rights reserved. | 2 * Copyright 2004 The WebRTC Project Authors. All rights reserved. |
| 3 * | 3 * |
| 4 * Use of this source code is governed by a BSD-style license | 4 * Use of this source code is governed by a BSD-style license |
| 5 * that can be found in the LICENSE file in the root of the source | 5 * that can be found in the LICENSE file in the root of the source |
| 6 * tree. An additional intellectual property rights grant can be found | 6 * tree. An additional intellectual property rights grant can be found |
| 7 * in the file PATENTS. All contributing project authors may | 7 * in the file PATENTS. All contributing project authors may |
| 8 * be found in the AUTHORS file in the root of the source tree. | 8 * be found in the AUTHORS file in the root of the source tree. |
| 9 */ | 9 */ |
| 10 | 10 |
| (...skipping 99 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 110 CritScope cs(&crit_); | 110 CritScope cs(&crit_); |
| 111 std::vector<MessageQueue *>::iterator iter; | 111 std::vector<MessageQueue *>::iterator iter; |
| 112 for (iter = message_queues_.begin(); iter != message_queues_.end(); iter++) | 112 for (iter = message_queues_.begin(); iter != message_queues_.end(); iter++) |
| 113 (*iter)->Clear(handler); | 113 (*iter)->Clear(handler); |
| 114 } | 114 } |
| 115 | 115 |
| 116 //------------------------------------------------------------------ | 116 //------------------------------------------------------------------ |
| 117 // MessageQueue | 117 // MessageQueue |
| 118 | 118 |
| 119 MessageQueue::MessageQueue(SocketServer* ss, bool init_queue) | 119 MessageQueue::MessageQueue(SocketServer* ss, bool init_queue) |
| 120 : fStop_(false), fPeekKeep_(false), | 120 : ss_(ss), fStop_(false), fPeekKeep_(false), |
| 121 dmsgq_next_num_(0), fInitialized_(false), fDestroyed_(false), ss_(ss) { | 121 dmsgq_next_num_(0), fInitialized_(false), fDestroyed_(false) { |
| 122 if (!ss_) { | 122 if (!ss_) { |
| 123 // Currently, MessageQueue holds a socket server, and is the base class for | 123 // Currently, MessageQueue holds a socket server, and is the base class for |
| 124 // Thread. It seems like it makes more sense for Thread to hold the socket | 124 // Thread. It seems like it makes more sense for Thread to hold the socket |
| 125 // server, and provide it to the MessageQueue, since the Thread controls | 125 // server, and provide it to the MessageQueue, since the Thread controls |
| 126 // the I/O model, and MQ is agnostic to those details. Anyway, this causes | 126 // the I/O model, and MQ is agnostic to those details. Anyway, this causes |
| 127 // messagequeue_unittest to depend on network libraries... yuck. | 127 // messagequeue_unittest to depend on network libraries... yuck. |
| 128 default_ss_.reset(new DefaultSocketServer()); | 128 default_ss_.reset(new DefaultSocketServer()); |
| 129 ss_ = default_ss_.get(); | 129 ss_ = default_ss_.get(); |
| 130 } | 130 } |
| 131 ss_->SetMessageQueue(this); | 131 ss_->SetMessageQueue(this); |
| (...skipping 20 matching lines...) Expand all Loading... |
| 152 return; | 152 return; |
| 153 } | 153 } |
| 154 | 154 |
| 155 fDestroyed_ = true; | 155 fDestroyed_ = true; |
| 156 // The signal is done from here to ensure | 156 // The signal is done from here to ensure |
| 157 // that it always gets called when the queue | 157 // that it always gets called when the queue |
| 158 // is going away. | 158 // is going away. |
| 159 SignalQueueDestroyed(); | 159 SignalQueueDestroyed(); |
| 160 MessageQueueManager::Remove(this); | 160 MessageQueueManager::Remove(this); |
| 161 Clear(NULL); | 161 Clear(NULL); |
| 162 | |
| 163 SharedScope ss(&ss_lock_); | |
| 164 if (ss_) { | 162 if (ss_) { |
| 165 ss_->SetMessageQueue(NULL); | 163 ss_->SetMessageQueue(NULL); |
| 166 } | 164 } |
| 167 } | 165 } |
| 168 | 166 |
| 169 SocketServer* MessageQueue::socketserver() { | |
| 170 SharedScope ss(&ss_lock_); | |
| 171 return ss_; | |
| 172 } | |
| 173 | |
| 174 void MessageQueue::set_socketserver(SocketServer* ss) { | 167 void MessageQueue::set_socketserver(SocketServer* ss) { |
| 175 // Need to lock exclusively here to prevent simultaneous modifications from | |
| 176 // other threads. Can't be a shared lock to prevent races with other reading | |
| 177 // threads. | |
| 178 // Other places that only read "ss_" can use a shared lock as simultaneous | |
| 179 // read access is allowed. | |
| 180 ExclusiveScope es(&ss_lock_); | |
| 181 ss_ = ss ? ss : default_ss_.get(); | 168 ss_ = ss ? ss : default_ss_.get(); |
| 182 ss_->SetMessageQueue(this); | 169 ss_->SetMessageQueue(this); |
| 183 } | 170 } |
| 184 | 171 |
| 185 void MessageQueue::WakeUpSocketServer() { | 172 void MessageQueue::Quit() { |
| 186 SharedScope ss(&ss_lock_); | 173 fStop_ = true; |
| 187 ss_->WakeUp(); | 174 ss_->WakeUp(); |
| 188 } | 175 } |
| 189 | 176 |
| 190 void MessageQueue::Quit() { | |
| 191 fStop_ = true; | |
| 192 WakeUpSocketServer(); | |
| 193 } | |
| 194 | |
| 195 bool MessageQueue::IsQuitting() { | 177 bool MessageQueue::IsQuitting() { |
| 196 return fStop_; | 178 return fStop_; |
| 197 } | 179 } |
| 198 | 180 |
| 199 void MessageQueue::Restart() { | 181 void MessageQueue::Restart() { |
| 200 fStop_ = false; | 182 fStop_ = false; |
| 201 } | 183 } |
| 202 | 184 |
| 203 bool MessageQueue::Peek(Message *pmsg, int cmsWait) { | 185 bool MessageQueue::Peek(Message *pmsg, int cmsWait) { |
| 204 if (fPeekKeep_) { | 186 if (fPeekKeep_) { |
| (...skipping 83 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 288 | 270 |
| 289 int cmsNext; | 271 int cmsNext; |
| 290 if (cmsWait == kForever) { | 272 if (cmsWait == kForever) { |
| 291 cmsNext = cmsDelayNext; | 273 cmsNext = cmsDelayNext; |
| 292 } else { | 274 } else { |
| 293 cmsNext = std::max(0, cmsTotal - cmsElapsed); | 275 cmsNext = std::max(0, cmsTotal - cmsElapsed); |
| 294 if ((cmsDelayNext != kForever) && (cmsDelayNext < cmsNext)) | 276 if ((cmsDelayNext != kForever) && (cmsDelayNext < cmsNext)) |
| 295 cmsNext = cmsDelayNext; | 277 cmsNext = cmsDelayNext; |
| 296 } | 278 } |
| 297 | 279 |
| 298 { | 280 // Wait and multiplex in the meantime |
| 299 // Wait and multiplex in the meantime | 281 if (!ss_->Wait(cmsNext, process_io)) |
| 300 SharedScope ss(&ss_lock_); | 282 return false; |
| 301 if (!ss_->Wait(cmsNext, process_io)) | |
| 302 return false; | |
| 303 } | |
| 304 | 283 |
| 305 // If the specified timeout expired, return | 284 // If the specified timeout expired, return |
| 306 | 285 |
| 307 msCurrent = Time(); | 286 msCurrent = Time(); |
| 308 cmsElapsed = TimeDiff(msCurrent, msStart); | 287 cmsElapsed = TimeDiff(msCurrent, msStart); |
| 309 if (cmsWait != kForever) { | 288 if (cmsWait != kForever) { |
| 310 if (cmsElapsed >= cmsWait) | 289 if (cmsElapsed >= cmsWait) |
| 311 return false; | 290 return false; |
| 312 } | 291 } |
| 313 } | 292 } |
| 314 return false; | 293 return false; |
| 315 } | 294 } |
| 316 | 295 |
| 317 void MessageQueue::ReceiveSends() { | 296 void MessageQueue::ReceiveSends() { |
| 318 } | 297 } |
| 319 | 298 |
| 320 void MessageQueue::Post(MessageHandler* phandler, | 299 void MessageQueue::Post(MessageHandler* phandler, |
| 321 uint32_t id, | 300 uint32_t id, |
| 322 MessageData* pdata, | 301 MessageData* pdata, |
| 323 bool time_sensitive) { | 302 bool time_sensitive) { |
| 324 if (fStop_) | 303 if (fStop_) |
| 325 return; | 304 return; |
| 326 | 305 |
| 327 // Keep thread safe | 306 // Keep thread safe |
| 328 // Add the message to the end of the queue | 307 // Add the message to the end of the queue |
| 329 // Signal for the multiplexer to return | 308 // Signal for the multiplexer to return |
| 330 | 309 |
| 331 { | 310 CritScope cs(&crit_); |
| 332 CritScope cs(&crit_); | 311 Message msg; |
| 333 Message msg; | 312 msg.phandler = phandler; |
| 334 msg.phandler = phandler; | 313 msg.message_id = id; |
| 335 msg.message_id = id; | 314 msg.pdata = pdata; |
| 336 msg.pdata = pdata; | 315 if (time_sensitive) { |
| 337 if (time_sensitive) { | 316 msg.ts_sensitive = Time() + kMaxMsgLatency; |
| 338 msg.ts_sensitive = Time() + kMaxMsgLatency; | |
| 339 } | |
| 340 msgq_.push_back(msg); | |
| 341 } | 317 } |
| 342 WakeUpSocketServer(); | 318 msgq_.push_back(msg); |
| 319 ss_->WakeUp(); |
| 343 } | 320 } |
| 344 | 321 |
| 345 void MessageQueue::PostDelayed(int cmsDelay, | 322 void MessageQueue::PostDelayed(int cmsDelay, |
| 346 MessageHandler* phandler, | 323 MessageHandler* phandler, |
| 347 uint32_t id, | 324 uint32_t id, |
| 348 MessageData* pdata) { | 325 MessageData* pdata) { |
| 349 return DoDelayPost(cmsDelay, TimeAfter(cmsDelay), phandler, id, pdata); | 326 return DoDelayPost(cmsDelay, TimeAfter(cmsDelay), phandler, id, pdata); |
| 350 } | 327 } |
| 351 | 328 |
| 352 void MessageQueue::PostAt(uint32_t tstamp, | 329 void MessageQueue::PostAt(uint32_t tstamp, |
| 353 MessageHandler* phandler, | 330 MessageHandler* phandler, |
| 354 uint32_t id, | 331 uint32_t id, |
| 355 MessageData* pdata) { | 332 MessageData* pdata) { |
| 356 return DoDelayPost(TimeUntil(tstamp), tstamp, phandler, id, pdata); | 333 return DoDelayPost(TimeUntil(tstamp), tstamp, phandler, id, pdata); |
| 357 } | 334 } |
| 358 | 335 |
| 359 void MessageQueue::DoDelayPost(int cmsDelay, | 336 void MessageQueue::DoDelayPost(int cmsDelay, |
| 360 uint32_t tstamp, | 337 uint32_t tstamp, |
| 361 MessageHandler* phandler, | 338 MessageHandler* phandler, |
| 362 uint32_t id, | 339 uint32_t id, |
| 363 MessageData* pdata) { | 340 MessageData* pdata) { |
| 364 if (fStop_) | 341 if (fStop_) |
| 365 return; | 342 return; |
| 366 | 343 |
| 367 // Keep thread safe | 344 // Keep thread safe |
| 368 // Add to the priority queue. Gets sorted soonest first. | 345 // Add to the priority queue. Gets sorted soonest first. |
| 369 // Signal for the multiplexer to return. | 346 // Signal for the multiplexer to return. |
| 370 | 347 |
| 371 { | 348 CritScope cs(&crit_); |
| 372 CritScope cs(&crit_); | 349 Message msg; |
| 373 Message msg; | 350 msg.phandler = phandler; |
| 374 msg.phandler = phandler; | 351 msg.message_id = id; |
| 375 msg.message_id = id; | 352 msg.pdata = pdata; |
| 376 msg.pdata = pdata; | 353 DelayedMessage dmsg(cmsDelay, tstamp, dmsgq_next_num_, msg); |
| 377 DelayedMessage dmsg(cmsDelay, tstamp, dmsgq_next_num_, msg); | 354 dmsgq_.push(dmsg); |
| 378 dmsgq_.push(dmsg); | 355 // If this message queue processes 1 message every millisecond for 50 days, |
| 379 // If this message queue processes 1 message every millisecond for 50 days, | 356 // we will wrap this number. Even then, only messages with identical times |
| 380 // we will wrap this number. Even then, only messages with identical times | 357 // will be misordered, and then only briefly. This is probably ok. |
| 381 // will be misordered, and then only briefly. This is probably ok. | 358 VERIFY(0 != ++dmsgq_next_num_); |
| 382 VERIFY(0 != ++dmsgq_next_num_); | 359 ss_->WakeUp(); |
| 383 } | |
| 384 WakeUpSocketServer(); | |
| 385 } | 360 } |
| 386 | 361 |
| 387 int MessageQueue::GetDelay() { | 362 int MessageQueue::GetDelay() { |
| 388 CritScope cs(&crit_); | 363 CritScope cs(&crit_); |
| 389 | 364 |
| 390 if (!msgq_.empty()) | 365 if (!msgq_.empty()) |
| 391 return 0; | 366 return 0; |
| 392 | 367 |
| 393 if (!dmsgq_.empty()) { | 368 if (!dmsgq_.empty()) { |
| 394 int delay = TimeUntil(dmsgq_.top().msTrigger_); | 369 int delay = TimeUntil(dmsgq_.top().msTrigger_); |
| (...skipping 53 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 448 } | 423 } |
| 449 dmsgq_.container().erase(new_end, dmsgq_.container().end()); | 424 dmsgq_.container().erase(new_end, dmsgq_.container().end()); |
| 450 dmsgq_.reheap(); | 425 dmsgq_.reheap(); |
| 451 } | 426 } |
| 452 | 427 |
| 453 void MessageQueue::Dispatch(Message *pmsg) { | 428 void MessageQueue::Dispatch(Message *pmsg) { |
| 454 pmsg->phandler->OnMessage(pmsg); | 429 pmsg->phandler->OnMessage(pmsg); |
| 455 } | 430 } |
| 456 | 431 |
| 457 } // namespace rtc | 432 } // namespace rtc |
| OLD | NEW |