OLD | NEW |
1 /* | 1 /* |
2 * Copyright 2004 The WebRTC Project Authors. All rights reserved. | 2 * Copyright 2004 The WebRTC Project Authors. All rights reserved. |
3 * | 3 * |
4 * Use of this source code is governed by a BSD-style license | 4 * Use of this source code is governed by a BSD-style license |
5 * that can be found in the LICENSE file in the root of the source | 5 * that can be found in the LICENSE file in the root of the source |
6 * tree. An additional intellectual property rights grant can be found | 6 * tree. An additional intellectual property rights grant can be found |
7 * in the file PATENTS. All contributing project authors may | 7 * in the file PATENTS. All contributing project authors may |
8 * be found in the AUTHORS file in the root of the source tree. | 8 * be found in the AUTHORS file in the root of the source tree. |
9 */ | 9 */ |
10 #include <algorithm> | 10 #include <algorithm> |
(...skipping 132 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
143 // we can't synchronously wait for queues_not_done to go to 0; we need to | 143 // we can't synchronously wait for queues_not_done to go to 0; we need to |
144 // process messages as well. | 144 // process messages as well. |
145 while (AtomicOps::AcquireLoad(&queues_not_done) > 0) { | 145 while (AtomicOps::AcquireLoad(&queues_not_done) > 0) { |
146 rtc::Thread::Current()->ProcessMessages(0); | 146 rtc::Thread::Current()->ProcessMessages(0); |
147 } | 147 } |
148 } | 148 } |
149 | 149 |
150 //------------------------------------------------------------------ | 150 //------------------------------------------------------------------ |
151 // MessageQueue | 151 // MessageQueue |
152 MessageQueue::MessageQueue(SocketServer* ss, bool init_queue) | 152 MessageQueue::MessageQueue(SocketServer* ss, bool init_queue) |
153 : fPeekKeep_(false), | 153 : fStop_(false), fPeekKeep_(false), |
154 dmsgq_next_num_(0), | 154 dmsgq_next_num_(0), fInitialized_(false), fDestroyed_(false), ss_(ss) { |
155 fInitialized_(false), | |
156 fDestroyed_(false), | |
157 stop_(0), | |
158 ss_(ss) { | |
159 RTC_DCHECK(ss); | 155 RTC_DCHECK(ss); |
160 // Currently, MessageQueue holds a socket server, and is the base class for | 156 // Currently, MessageQueue holds a socket server, and is the base class for |
161 // Thread. It seems like it makes more sense for Thread to hold the socket | 157 // Thread. It seems like it makes more sense for Thread to hold the socket |
162 // server, and provide it to the MessageQueue, since the Thread controls | 158 // server, and provide it to the MessageQueue, since the Thread controls |
163 // the I/O model, and MQ is agnostic to those details. Anyway, this causes | 159 // the I/O model, and MQ is agnostic to those details. Anyway, this causes |
164 // messagequeue_unittest to depend on network libraries... yuck. | 160 // messagequeue_unittest to depend on network libraries... yuck. |
165 ss_->SetMessageQueue(this); | 161 ss_->SetMessageQueue(this); |
166 if (init_queue) { | 162 if (init_queue) { |
167 DoInit(); | 163 DoInit(); |
168 } | 164 } |
(...skipping 51 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
220 ss_ = ss ? ss : own_ss_.get(); | 216 ss_ = ss ? ss : own_ss_.get(); |
221 ss_->SetMessageQueue(this); | 217 ss_->SetMessageQueue(this); |
222 } | 218 } |
223 | 219 |
224 void MessageQueue::WakeUpSocketServer() { | 220 void MessageQueue::WakeUpSocketServer() { |
225 SharedScope ss(&ss_lock_); | 221 SharedScope ss(&ss_lock_); |
226 ss_->WakeUp(); | 222 ss_->WakeUp(); |
227 } | 223 } |
228 | 224 |
229 void MessageQueue::Quit() { | 225 void MessageQueue::Quit() { |
230 AtomicOps::ReleaseStore(&stop_, 1); | 226 fStop_ = true; |
231 WakeUpSocketServer(); | 227 WakeUpSocketServer(); |
232 } | 228 } |
233 | 229 |
234 bool MessageQueue::IsQuitting() { | 230 bool MessageQueue::IsQuitting() { |
235 return AtomicOps::AcquireLoad(&stop_) != 0; | 231 return fStop_; |
236 } | 232 } |
237 | 233 |
238 void MessageQueue::Restart() { | 234 void MessageQueue::Restart() { |
239 AtomicOps::ReleaseStore(&stop_, 0); | 235 fStop_ = false; |
240 } | 236 } |
241 | 237 |
242 bool MessageQueue::Peek(Message *pmsg, int cmsWait) { | 238 bool MessageQueue::Peek(Message *pmsg, int cmsWait) { |
243 if (fPeekKeep_) { | 239 if (fPeekKeep_) { |
244 *pmsg = msgPeek_; | 240 *pmsg = msgPeek_; |
245 return true; | 241 return true; |
246 } | 242 } |
247 if (!Get(pmsg, cmsWait)) | 243 if (!Get(pmsg, cmsWait)) |
248 return false; | 244 return false; |
249 msgPeek_ = *pmsg; | 245 msgPeek_ = *pmsg; |
(...skipping 63 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
313 // If this was a dispose message, delete it and skip it. | 309 // If this was a dispose message, delete it and skip it. |
314 if (MQID_DISPOSE == pmsg->message_id) { | 310 if (MQID_DISPOSE == pmsg->message_id) { |
315 ASSERT(NULL == pmsg->phandler); | 311 ASSERT(NULL == pmsg->phandler); |
316 delete pmsg->pdata; | 312 delete pmsg->pdata; |
317 *pmsg = Message(); | 313 *pmsg = Message(); |
318 continue; | 314 continue; |
319 } | 315 } |
320 return true; | 316 return true; |
321 } | 317 } |
322 | 318 |
323 if (IsQuitting()) | 319 if (fStop_) |
324 break; | 320 break; |
325 | 321 |
326 // Which is shorter, the delay wait or the asked wait? | 322 // Which is shorter, the delay wait or the asked wait? |
327 | 323 |
328 int64_t cmsNext; | 324 int64_t cmsNext; |
329 if (cmsWait == kForever) { | 325 if (cmsWait == kForever) { |
330 cmsNext = cmsDelayNext; | 326 cmsNext = cmsDelayNext; |
331 } else { | 327 } else { |
332 cmsNext = std::max<int64_t>(0, cmsTotal - cmsElapsed); | 328 cmsNext = std::max<int64_t>(0, cmsTotal - cmsElapsed); |
333 if ((cmsDelayNext != kForever) && (cmsDelayNext < cmsNext)) | 329 if ((cmsDelayNext != kForever) && (cmsDelayNext < cmsNext)) |
(...skipping 20 matching lines...) Expand all Loading... |
354 } | 350 } |
355 | 351 |
356 void MessageQueue::ReceiveSends() { | 352 void MessageQueue::ReceiveSends() { |
357 } | 353 } |
358 | 354 |
359 void MessageQueue::Post(const Location& posted_from, | 355 void MessageQueue::Post(const Location& posted_from, |
360 MessageHandler* phandler, | 356 MessageHandler* phandler, |
361 uint32_t id, | 357 uint32_t id, |
362 MessageData* pdata, | 358 MessageData* pdata, |
363 bool time_sensitive) { | 359 bool time_sensitive) { |
364 if (IsQuitting()) | 360 if (fStop_) |
365 return; | 361 return; |
366 | 362 |
367 // Keep thread safe | 363 // Keep thread safe |
368 // Add the message to the end of the queue | 364 // Add the message to the end of the queue |
369 // Signal for the multiplexer to return | 365 // Signal for the multiplexer to return |
370 | 366 |
371 { | 367 { |
372 CritScope cs(&crit_); | 368 CritScope cs(&crit_); |
373 Message msg; | 369 Message msg; |
374 msg.posted_from = posted_from; | 370 msg.posted_from = posted_from; |
(...skipping 35 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
410 return DoDelayPost(posted_from, TimeUntil(tstamp), tstamp, phandler, id, | 406 return DoDelayPost(posted_from, TimeUntil(tstamp), tstamp, phandler, id, |
411 pdata); | 407 pdata); |
412 } | 408 } |
413 | 409 |
414 void MessageQueue::DoDelayPost(const Location& posted_from, | 410 void MessageQueue::DoDelayPost(const Location& posted_from, |
415 int64_t cmsDelay, | 411 int64_t cmsDelay, |
416 int64_t tstamp, | 412 int64_t tstamp, |
417 MessageHandler* phandler, | 413 MessageHandler* phandler, |
418 uint32_t id, | 414 uint32_t id, |
419 MessageData* pdata) { | 415 MessageData* pdata) { |
420 if (IsQuitting()) { | 416 if (fStop_) { |
421 return; | 417 return; |
422 } | 418 } |
423 | 419 |
424 // Keep thread safe | 420 // Keep thread safe |
425 // Add to the priority queue. Gets sorted soonest first. | 421 // Add to the priority queue. Gets sorted soonest first. |
426 // Signal for the multiplexer to return. | 422 // Signal for the multiplexer to return. |
427 | 423 |
428 { | 424 { |
429 CritScope cs(&crit_); | 425 CritScope cs(&crit_); |
430 Message msg; | 426 Message msg; |
(...skipping 85 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
516 pmsg->phandler->OnMessage(pmsg); | 512 pmsg->phandler->OnMessage(pmsg); |
517 int64_t end_time = TimeMillis(); | 513 int64_t end_time = TimeMillis(); |
518 int64_t diff = TimeDiff(end_time, start_time); | 514 int64_t diff = TimeDiff(end_time, start_time); |
519 if (diff >= kSlowDispatchLoggingThreshold) { | 515 if (diff >= kSlowDispatchLoggingThreshold) { |
520 LOG(LS_INFO) << "Message took " << diff << "ms to dispatch. Posted from: " | 516 LOG(LS_INFO) << "Message took " << diff << "ms to dispatch. Posted from: " |
521 << pmsg->posted_from.ToString(); | 517 << pmsg->posted_from.ToString(); |
522 } | 518 } |
523 } | 519 } |
524 | 520 |
525 } // namespace rtc | 521 } // namespace rtc |
OLD | NEW |