OLD | NEW |
---|---|
1 /* | 1 /* |
2 * Copyright 2004 The WebRTC Project Authors. All rights reserved. | 2 * Copyright 2004 The WebRTC Project Authors. All rights reserved. |
3 * | 3 * |
4 * Use of this source code is governed by a BSD-style license | 4 * Use of this source code is governed by a BSD-style license |
5 * that can be found in the LICENSE file in the root of the source | 5 * that can be found in the LICENSE file in the root of the source |
6 * tree. An additional intellectual property rights grant can be found | 6 * tree. An additional intellectual property rights grant can be found |
7 * in the file PATENTS. All contributing project authors may | 7 * in the file PATENTS. All contributing project authors may |
8 * be found in the AUTHORS file in the root of the source tree. | 8 * be found in the AUTHORS file in the root of the source tree. |
9 */ | 9 */ |
10 #include <algorithm> | 10 #include <algorithm> |
(...skipping 123 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
134 // we can't synchronously wait for queues_not_done to go to 0; we need to | 134 // we can't synchronously wait for queues_not_done to go to 0; we need to |
135 // process messages as well. | 135 // process messages as well. |
136 while (AtomicOps::AcquireLoad(&queues_not_done) > 0) { | 136 while (AtomicOps::AcquireLoad(&queues_not_done) > 0) { |
137 rtc::Thread::Current()->ProcessMessages(0); | 137 rtc::Thread::Current()->ProcessMessages(0); |
138 } | 138 } |
139 } | 139 } |
140 | 140 |
141 //------------------------------------------------------------------ | 141 //------------------------------------------------------------------ |
142 // MessageQueue | 142 // MessageQueue |
143 MessageQueue::MessageQueue(SocketServer* ss, bool init_queue) | 143 MessageQueue::MessageQueue(SocketServer* ss, bool init_queue) |
144 : fStop_(false), fPeekKeep_(false), | 144 : fPeekKeep_(false), |
145 dmsgq_next_num_(0), fInitialized_(false), fDestroyed_(false), ss_(ss) { | 145 dmsgq_next_num_(0), |
146 fInitialized_(false), | |
147 fDestroyed_(false), | |
148 stop_(false), | |
Taylor Brandstetter
2016/07/07 22:01:47
Since it's an int now, should probably initialize
andresp
2016/07/08 10:05:37
Done.
| |
149 ss_(ss) { | |
146 RTC_DCHECK(ss); | 150 RTC_DCHECK(ss); |
147 // Currently, MessageQueue holds a socket server, and is the base class for | 151 // Currently, MessageQueue holds a socket server, and is the base class for |
148 // Thread. It seems like it makes more sense for Thread to hold the socket | 152 // Thread. It seems like it makes more sense for Thread to hold the socket |
149 // server, and provide it to the MessageQueue, since the Thread controls | 153 // server, and provide it to the MessageQueue, since the Thread controls |
150 // the I/O model, and MQ is agnostic to those details. Anyway, this causes | 154 // the I/O model, and MQ is agnostic to those details. Anyway, this causes |
151 // messagequeue_unittest to depend on network libraries... yuck. | 155 // messagequeue_unittest to depend on network libraries... yuck. |
152 ss_->SetMessageQueue(this); | 156 ss_->SetMessageQueue(this); |
153 if (init_queue) { | 157 if (init_queue) { |
154 DoInit(); | 158 DoInit(); |
155 } | 159 } |
(...skipping 51 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
207 ss_ = ss ? ss : own_ss_.get(); | 211 ss_ = ss ? ss : own_ss_.get(); |
208 ss_->SetMessageQueue(this); | 212 ss_->SetMessageQueue(this); |
209 } | 213 } |
210 | 214 |
211 void MessageQueue::WakeUpSocketServer() { | 215 void MessageQueue::WakeUpSocketServer() { |
212 SharedScope ss(&ss_lock_); | 216 SharedScope ss(&ss_lock_); |
213 ss_->WakeUp(); | 217 ss_->WakeUp(); |
214 } | 218 } |
215 | 219 |
216 void MessageQueue::Quit() { | 220 void MessageQueue::Quit() { |
217 fStop_ = true; | 221 AtomicOps::ReleaseStore(&stop_, 1); |
218 WakeUpSocketServer(); | 222 WakeUpSocketServer(); |
219 } | 223 } |
220 | 224 |
221 bool MessageQueue::IsQuitting() { | 225 bool MessageQueue::IsQuitting() { |
222 return fStop_; | 226 return AtomicOps::AcquireLoad(&stop_) != 0; |
223 } | 227 } |
224 | 228 |
225 void MessageQueue::Restart() { | 229 void MessageQueue::Restart() { |
226 fStop_ = false; | 230 AtomicOps::ReleaseStore(&stop_, 0); |
227 } | 231 } |
228 | 232 |
229 bool MessageQueue::Peek(Message *pmsg, int cmsWait) { | 233 bool MessageQueue::Peek(Message *pmsg, int cmsWait) { |
230 if (fPeekKeep_) { | 234 if (fPeekKeep_) { |
231 *pmsg = msgPeek_; | 235 *pmsg = msgPeek_; |
232 return true; | 236 return true; |
233 } | 237 } |
234 if (!Get(pmsg, cmsWait)) | 238 if (!Get(pmsg, cmsWait)) |
235 return false; | 239 return false; |
236 msgPeek_ = *pmsg; | 240 msgPeek_ = *pmsg; |
(...skipping 63 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
300 // If this was a dispose message, delete it and skip it. | 304 // If this was a dispose message, delete it and skip it. |
301 if (MQID_DISPOSE == pmsg->message_id) { | 305 if (MQID_DISPOSE == pmsg->message_id) { |
302 ASSERT(NULL == pmsg->phandler); | 306 ASSERT(NULL == pmsg->phandler); |
303 delete pmsg->pdata; | 307 delete pmsg->pdata; |
304 *pmsg = Message(); | 308 *pmsg = Message(); |
305 continue; | 309 continue; |
306 } | 310 } |
307 return true; | 311 return true; |
308 } | 312 } |
309 | 313 |
310 if (fStop_) | 314 if (IsQuitting()) |
311 break; | 315 break; |
312 | 316 |
313 // Which is shorter, the delay wait or the asked wait? | 317 // Which is shorter, the delay wait or the asked wait? |
314 | 318 |
315 int64_t cmsNext; | 319 int64_t cmsNext; |
316 if (cmsWait == kForever) { | 320 if (cmsWait == kForever) { |
317 cmsNext = cmsDelayNext; | 321 cmsNext = cmsDelayNext; |
318 } else { | 322 } else { |
319 cmsNext = std::max<int64_t>(0, cmsTotal - cmsElapsed); | 323 cmsNext = std::max<int64_t>(0, cmsTotal - cmsElapsed); |
320 if ((cmsDelayNext != kForever) && (cmsDelayNext < cmsNext)) | 324 if ((cmsDelayNext != kForever) && (cmsDelayNext < cmsNext)) |
(...skipping 20 matching lines...) Expand all Loading... | |
341 } | 345 } |
342 | 346 |
343 void MessageQueue::ReceiveSends() { | 347 void MessageQueue::ReceiveSends() { |
344 } | 348 } |
345 | 349 |
346 void MessageQueue::Post(const Location& posted_from, | 350 void MessageQueue::Post(const Location& posted_from, |
347 MessageHandler* phandler, | 351 MessageHandler* phandler, |
348 uint32_t id, | 352 uint32_t id, |
349 MessageData* pdata, | 353 MessageData* pdata, |
350 bool time_sensitive) { | 354 bool time_sensitive) { |
351 if (fStop_) | 355 if (IsQuitting()) |
352 return; | 356 return; |
353 | 357 |
354 // Keep thread safe | 358 // Keep thread safe |
355 // Add the message to the end of the queue | 359 // Add the message to the end of the queue |
356 // Signal for the multiplexer to return | 360 // Signal for the multiplexer to return |
357 | 361 |
358 { | 362 { |
359 CritScope cs(&crit_); | 363 CritScope cs(&crit_); |
360 Message msg; | 364 Message msg; |
361 msg.posted_from = posted_from; | 365 msg.posted_from = posted_from; |
(...skipping 35 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
397 return DoDelayPost(posted_from, TimeUntil(tstamp), tstamp, phandler, id, | 401 return DoDelayPost(posted_from, TimeUntil(tstamp), tstamp, phandler, id, |
398 pdata); | 402 pdata); |
399 } | 403 } |
400 | 404 |
401 void MessageQueue::DoDelayPost(const Location& posted_from, | 405 void MessageQueue::DoDelayPost(const Location& posted_from, |
402 int64_t cmsDelay, | 406 int64_t cmsDelay, |
403 int64_t tstamp, | 407 int64_t tstamp, |
404 MessageHandler* phandler, | 408 MessageHandler* phandler, |
405 uint32_t id, | 409 uint32_t id, |
406 MessageData* pdata) { | 410 MessageData* pdata) { |
407 if (fStop_) { | 411 if (IsQuitting()) { |
408 return; | 412 return; |
409 } | 413 } |
410 | 414 |
411 // Keep thread safe | 415 // Keep thread safe |
412 // Add to the priority queue. Gets sorted soonest first. | 416 // Add to the priority queue. Gets sorted soonest first. |
413 // Signal for the multiplexer to return. | 417 // Signal for the multiplexer to return. |
414 | 418 |
415 { | 419 { |
416 CritScope cs(&crit_); | 420 CritScope cs(&crit_); |
417 Message msg; | 421 Message msg; |
(...skipping 85 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
503 pmsg->phandler->OnMessage(pmsg); | 507 pmsg->phandler->OnMessage(pmsg); |
504 int64_t end_time = TimeMillis(); | 508 int64_t end_time = TimeMillis(); |
505 int64_t diff = TimeDiff(end_time, start_time); | 509 int64_t diff = TimeDiff(end_time, start_time); |
506 if (diff >= kSlowDispatchLoggingThreshold) { | 510 if (diff >= kSlowDispatchLoggingThreshold) { |
507 LOG(LS_INFO) << "Message took " << diff << "ms to dispatch. Posted from: " | 511 LOG(LS_INFO) << "Message took " << diff << "ms to dispatch. Posted from: " |
508 << pmsg->posted_from.ToString(); | 512 << pmsg->posted_from.ToString(); |
509 } | 513 } |
510 } | 514 } |
511 | 515 |
512 } // namespace rtc | 516 } // namespace rtc |
OLD | NEW |