OLD | NEW |
1 /* | 1 /* |
2 * Copyright 2004 The WebRTC Project Authors. All rights reserved. | 2 * Copyright 2004 The WebRTC Project Authors. All rights reserved. |
3 * | 3 * |
4 * Use of this source code is governed by a BSD-style license | 4 * Use of this source code is governed by a BSD-style license |
5 * that can be found in the LICENSE file in the root of the source | 5 * that can be found in the LICENSE file in the root of the source |
6 * tree. An additional intellectual property rights grant can be found | 6 * tree. An additional intellectual property rights grant can be found |
7 * in the file PATENTS. All contributing project authors may | 7 * in the file PATENTS. All contributing project authors may |
8 * be found in the AUTHORS file in the root of the source tree. | 8 * be found in the AUTHORS file in the root of the source tree. |
9 */ | 9 */ |
10 #include <algorithm> | 10 #include <algorithm> |
(...skipping 132 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
143 // we can't synchronously wait for queues_not_done to go to 0; we need to | 143 // we can't synchronously wait for queues_not_done to go to 0; we need to |
144 // process messages as well. | 144 // process messages as well. |
145 while (AtomicOps::AcquireLoad(&queues_not_done) > 0) { | 145 while (AtomicOps::AcquireLoad(&queues_not_done) > 0) { |
146 rtc::Thread::Current()->ProcessMessages(0); | 146 rtc::Thread::Current()->ProcessMessages(0); |
147 } | 147 } |
148 } | 148 } |
149 | 149 |
150 //------------------------------------------------------------------ | 150 //------------------------------------------------------------------ |
151 // MessageQueue | 151 // MessageQueue |
152 MessageQueue::MessageQueue(SocketServer* ss, bool init_queue) | 152 MessageQueue::MessageQueue(SocketServer* ss, bool init_queue) |
153 : fStop_(false), fPeekKeep_(false), | 153 : fPeekKeep_(false), |
154 dmsgq_next_num_(0), fInitialized_(false), fDestroyed_(false), ss_(ss) { | 154 dmsgq_next_num_(0), |
| 155 fInitialized_(false), |
| 156 fDestroyed_(false), |
| 157 stop_(0), |
| 158 ss_(ss) { |
155 RTC_DCHECK(ss); | 159 RTC_DCHECK(ss); |
156 // Currently, MessageQueue holds a socket server, and is the base class for | 160 // Currently, MessageQueue holds a socket server, and is the base class for |
157 // Thread. It seems like it makes more sense for Thread to hold the socket | 161 // Thread. It seems like it makes more sense for Thread to hold the socket |
158 // server, and provide it to the MessageQueue, since the Thread controls | 162 // server, and provide it to the MessageQueue, since the Thread controls |
159 // the I/O model, and MQ is agnostic to those details. Anyway, this causes | 163 // the I/O model, and MQ is agnostic to those details. Anyway, this causes |
160 // messagequeue_unittest to depend on network libraries... yuck. | 164 // messagequeue_unittest to depend on network libraries... yuck. |
161 ss_->SetMessageQueue(this); | 165 ss_->SetMessageQueue(this); |
162 if (init_queue) { | 166 if (init_queue) { |
163 DoInit(); | 167 DoInit(); |
164 } | 168 } |
(...skipping 51 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
216 ss_ = ss ? ss : own_ss_.get(); | 220 ss_ = ss ? ss : own_ss_.get(); |
217 ss_->SetMessageQueue(this); | 221 ss_->SetMessageQueue(this); |
218 } | 222 } |
219 | 223 |
220 void MessageQueue::WakeUpSocketServer() { | 224 void MessageQueue::WakeUpSocketServer() { |
221 SharedScope ss(&ss_lock_); | 225 SharedScope ss(&ss_lock_); |
222 ss_->WakeUp(); | 226 ss_->WakeUp(); |
223 } | 227 } |
224 | 228 |
225 void MessageQueue::Quit() { | 229 void MessageQueue::Quit() { |
226 fStop_ = true; | 230 AtomicOps::ReleaseStore(&stop_, 1); |
227 WakeUpSocketServer(); | 231 WakeUpSocketServer(); |
228 } | 232 } |
229 | 233 |
230 bool MessageQueue::IsQuitting() { | 234 bool MessageQueue::IsQuitting() { |
231 return fStop_; | 235 return AtomicOps::AcquireLoad(&stop_) != 0; |
232 } | 236 } |
233 | 237 |
234 void MessageQueue::Restart() { | 238 void MessageQueue::Restart() { |
235 fStop_ = false; | 239 AtomicOps::ReleaseStore(&stop_, 0); |
236 } | 240 } |
237 | 241 |
238 bool MessageQueue::Peek(Message *pmsg, int cmsWait) { | 242 bool MessageQueue::Peek(Message *pmsg, int cmsWait) { |
239 if (fPeekKeep_) { | 243 if (fPeekKeep_) { |
240 *pmsg = msgPeek_; | 244 *pmsg = msgPeek_; |
241 return true; | 245 return true; |
242 } | 246 } |
243 if (!Get(pmsg, cmsWait)) | 247 if (!Get(pmsg, cmsWait)) |
244 return false; | 248 return false; |
245 msgPeek_ = *pmsg; | 249 msgPeek_ = *pmsg; |
(...skipping 63 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
309 // If this was a dispose message, delete it and skip it. | 313 // If this was a dispose message, delete it and skip it. |
310 if (MQID_DISPOSE == pmsg->message_id) { | 314 if (MQID_DISPOSE == pmsg->message_id) { |
311 ASSERT(NULL == pmsg->phandler); | 315 ASSERT(NULL == pmsg->phandler); |
312 delete pmsg->pdata; | 316 delete pmsg->pdata; |
313 *pmsg = Message(); | 317 *pmsg = Message(); |
314 continue; | 318 continue; |
315 } | 319 } |
316 return true; | 320 return true; |
317 } | 321 } |
318 | 322 |
319 if (fStop_) | 323 if (IsQuitting()) |
320 break; | 324 break; |
321 | 325 |
322 // Which is shorter, the delay wait or the asked wait? | 326 // Which is shorter, the delay wait or the asked wait? |
323 | 327 |
324 int64_t cmsNext; | 328 int64_t cmsNext; |
325 if (cmsWait == kForever) { | 329 if (cmsWait == kForever) { |
326 cmsNext = cmsDelayNext; | 330 cmsNext = cmsDelayNext; |
327 } else { | 331 } else { |
328 cmsNext = std::max<int64_t>(0, cmsTotal - cmsElapsed); | 332 cmsNext = std::max<int64_t>(0, cmsTotal - cmsElapsed); |
329 if ((cmsDelayNext != kForever) && (cmsDelayNext < cmsNext)) | 333 if ((cmsDelayNext != kForever) && (cmsDelayNext < cmsNext)) |
(...skipping 20 matching lines...) Expand all Loading... |
350 } | 354 } |
351 | 355 |
352 void MessageQueue::ReceiveSends() { | 356 void MessageQueue::ReceiveSends() { |
353 } | 357 } |
354 | 358 |
355 void MessageQueue::Post(const Location& posted_from, | 359 void MessageQueue::Post(const Location& posted_from, |
356 MessageHandler* phandler, | 360 MessageHandler* phandler, |
357 uint32_t id, | 361 uint32_t id, |
358 MessageData* pdata, | 362 MessageData* pdata, |
359 bool time_sensitive) { | 363 bool time_sensitive) { |
360 if (fStop_) | 364 if (IsQuitting()) |
361 return; | 365 return; |
362 | 366 |
363 // Keep thread safe | 367 // Keep thread safe |
364 // Add the message to the end of the queue | 368 // Add the message to the end of the queue |
365 // Signal for the multiplexer to return | 369 // Signal for the multiplexer to return |
366 | 370 |
367 { | 371 { |
368 CritScope cs(&crit_); | 372 CritScope cs(&crit_); |
369 Message msg; | 373 Message msg; |
370 msg.posted_from = posted_from; | 374 msg.posted_from = posted_from; |
(...skipping 35 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
406 return DoDelayPost(posted_from, TimeUntil(tstamp), tstamp, phandler, id, | 410 return DoDelayPost(posted_from, TimeUntil(tstamp), tstamp, phandler, id, |
407 pdata); | 411 pdata); |
408 } | 412 } |
409 | 413 |
410 void MessageQueue::DoDelayPost(const Location& posted_from, | 414 void MessageQueue::DoDelayPost(const Location& posted_from, |
411 int64_t cmsDelay, | 415 int64_t cmsDelay, |
412 int64_t tstamp, | 416 int64_t tstamp, |
413 MessageHandler* phandler, | 417 MessageHandler* phandler, |
414 uint32_t id, | 418 uint32_t id, |
415 MessageData* pdata) { | 419 MessageData* pdata) { |
416 if (fStop_) { | 420 if (IsQuitting()) { |
417 return; | 421 return; |
418 } | 422 } |
419 | 423 |
420 // Keep thread safe | 424 // Keep thread safe |
421 // Add to the priority queue. Gets sorted soonest first. | 425 // Add to the priority queue. Gets sorted soonest first. |
422 // Signal for the multiplexer to return. | 426 // Signal for the multiplexer to return. |
423 | 427 |
424 { | 428 { |
425 CritScope cs(&crit_); | 429 CritScope cs(&crit_); |
426 Message msg; | 430 Message msg; |
(...skipping 85 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
512 pmsg->phandler->OnMessage(pmsg); | 516 pmsg->phandler->OnMessage(pmsg); |
513 int64_t end_time = TimeMillis(); | 517 int64_t end_time = TimeMillis(); |
514 int64_t diff = TimeDiff(end_time, start_time); | 518 int64_t diff = TimeDiff(end_time, start_time); |
515 if (diff >= kSlowDispatchLoggingThreshold) { | 519 if (diff >= kSlowDispatchLoggingThreshold) { |
516 LOG(LS_INFO) << "Message took " << diff << "ms to dispatch. Posted from: " | 520 LOG(LS_INFO) << "Message took " << diff << "ms to dispatch. Posted from: " |
517 << pmsg->posted_from.ToString(); | 521 << pmsg->posted_from.ToString(); |
518 } | 522 } |
519 } | 523 } |
520 | 524 |
521 } // namespace rtc | 525 } // namespace rtc |
OLD | NEW |