OLD | NEW |
---|---|
1 /* | 1 /* |
2 * Copyright 2004 The WebRTC Project Authors. All rights reserved. | 2 * Copyright 2004 The WebRTC Project Authors. All rights reserved. |
3 * | 3 * |
4 * Use of this source code is governed by a BSD-style license | 4 * Use of this source code is governed by a BSD-style license |
5 * that can be found in the LICENSE file in the root of the source | 5 * that can be found in the LICENSE file in the root of the source |
6 * tree. An additional intellectual property rights grant can be found | 6 * tree. An additional intellectual property rights grant can be found |
7 * in the file PATENTS. All contributing project authors may | 7 * in the file PATENTS. All contributing project authors may |
8 * be found in the AUTHORS file in the root of the source tree. | 8 * be found in the AUTHORS file in the root of the source tree. |
9 */ | 9 */ |
10 #include <algorithm> | 10 #include <algorithm> |
(...skipping 115 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
126 } | 126 } |
127 } | 127 } |
128 | 128 |
129 void MessageQueueManager::OnMessage(Message* pmsg) { | 129 void MessageQueueManager::OnMessage(Message* pmsg) { |
130 RTC_DCHECK(pmsg->message_id == MSG_WAKE_MESSAGE_QUEUE); | 130 RTC_DCHECK(pmsg->message_id == MSG_WAKE_MESSAGE_QUEUE); |
131 } | 131 } |
132 | 132 |
133 //------------------------------------------------------------------ | 133 //------------------------------------------------------------------ |
134 // MessageQueue | 134 // MessageQueue |
135 MessageQueue::MessageQueue(SocketServer* ss, bool init_queue) | 135 MessageQueue::MessageQueue(SocketServer* ss, bool init_queue) |
136 : fStop_(false), fPeekKeep_(false), | 136 : fPeekKeep_(false), |
137 dmsgq_next_num_(0), fInitialized_(false), fDestroyed_(false), ss_(ss) { | 137 dmsgq_next_num_(0), |
138 fInitialized_(false), | |
139 fDestroyed_(false), | |
140 stop_(false), | |
141 ss_(ss) { | |
138 RTC_DCHECK(ss); | 142 RTC_DCHECK(ss); |
139 // Currently, MessageQueue holds a socket server, and is the base class for | 143 // Currently, MessageQueue holds a socket server, and is the base class for |
140 // Thread. It seems like it makes more sense for Thread to hold the socket | 144 // Thread. It seems like it makes more sense for Thread to hold the socket |
141 // server, and provide it to the MessageQueue, since the Thread controls | 145 // server, and provide it to the MessageQueue, since the Thread controls |
142 // the I/O model, and MQ is agnostic to those details. Anyway, this causes | 146 // the I/O model, and MQ is agnostic to those details. Anyway, this causes |
143 // messagequeue_unittest to depend on network libraries... yuck. | 147 // messagequeue_unittest to depend on network libraries... yuck. |
144 ss_->SetMessageQueue(this); | 148 ss_->SetMessageQueue(this); |
145 if (init_queue) { | 149 if (init_queue) { |
146 DoInit(); | 150 DoInit(); |
147 } | 151 } |
(...skipping 51 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
199 ss_ = ss ? ss : own_ss_.get(); | 203 ss_ = ss ? ss : own_ss_.get(); |
200 ss_->SetMessageQueue(this); | 204 ss_->SetMessageQueue(this); |
201 } | 205 } |
202 | 206 |
203 void MessageQueue::WakeUpSocketServer() { | 207 void MessageQueue::WakeUpSocketServer() { |
204 SharedScope ss(&ss_lock_); | 208 SharedScope ss(&ss_lock_); |
205 ss_->WakeUp(); | 209 ss_->WakeUp(); |
206 } | 210 } |
207 | 211 |
208 void MessageQueue::Quit() { | 212 void MessageQueue::Quit() { |
209 fStop_ = true; | 213 { |
214 CritScope cs(&stop_crit_); | |
215 stop_ = true; | |
pthatcher1
2016/06/01 15:29:14
Might as well rename "stop_" to "quitting_".
| |
216 } | |
210 WakeUpSocketServer(); | 217 WakeUpSocketServer(); |
211 } | 218 } |
212 | 219 |
213 bool MessageQueue::IsQuitting() { | 220 bool MessageQueue::IsQuitting() { |
214 return fStop_; | 221 CritScope cs(&stop_crit_); |
pthatcher1
2016/06/01 15:29:14
Should we use an std::atomic here once they are al
tommi (sloooow) - chröme
2016/06/01 15:53:29
We do have alternatives to std::atomic that we can
| |
222 return stop_; | |
215 } | 223 } |
216 | 224 |
217 void MessageQueue::Restart() { | 225 void MessageQueue::Restart() { |
218 fStop_ = false; | 226 CritScope cs(&stop_crit_); |
227 stop_ = false; | |
219 } | 228 } |
220 | 229 |
221 bool MessageQueue::Peek(Message *pmsg, int cmsWait) { | 230 bool MessageQueue::Peek(Message *pmsg, int cmsWait) { |
222 if (fPeekKeep_) { | 231 if (fPeekKeep_) { |
223 *pmsg = msgPeek_; | 232 *pmsg = msgPeek_; |
224 return true; | 233 return true; |
225 } | 234 } |
226 if (!Get(pmsg, cmsWait)) | 235 if (!Get(pmsg, cmsWait)) |
227 return false; | 236 return false; |
228 msgPeek_ = *pmsg; | 237 msgPeek_ = *pmsg; |
(...skipping 63 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
292 // If this was a dispose message, delete it and skip it. | 301 // If this was a dispose message, delete it and skip it. |
293 if (MQID_DISPOSE == pmsg->message_id) { | 302 if (MQID_DISPOSE == pmsg->message_id) { |
294 ASSERT(NULL == pmsg->phandler); | 303 ASSERT(NULL == pmsg->phandler); |
295 delete pmsg->pdata; | 304 delete pmsg->pdata; |
296 *pmsg = Message(); | 305 *pmsg = Message(); |
297 continue; | 306 continue; |
298 } | 307 } |
299 return true; | 308 return true; |
300 } | 309 } |
301 | 310 |
302 if (fStop_) | 311 if (IsQuitting()) |
303 break; | 312 break; |
304 | 313 |
305 // Which is shorter, the delay wait or the asked wait? | 314 // Which is shorter, the delay wait or the asked wait? |
306 | 315 |
307 int64_t cmsNext; | 316 int64_t cmsNext; |
308 if (cmsWait == kForever) { | 317 if (cmsWait == kForever) { |
309 cmsNext = cmsDelayNext; | 318 cmsNext = cmsDelayNext; |
310 } else { | 319 } else { |
311 cmsNext = std::max<int64_t>(0, cmsTotal - cmsElapsed); | 320 cmsNext = std::max<int64_t>(0, cmsTotal - cmsElapsed); |
312 if ((cmsDelayNext != kForever) && (cmsDelayNext < cmsNext)) | 321 if ((cmsDelayNext != kForever) && (cmsDelayNext < cmsNext)) |
(...skipping 19 matching lines...) Expand all Loading... | |
332 return false; | 341 return false; |
333 } | 342 } |
334 | 343 |
335 void MessageQueue::ReceiveSends() { | 344 void MessageQueue::ReceiveSends() { |
336 } | 345 } |
337 | 346 |
338 void MessageQueue::Post(MessageHandler* phandler, | 347 void MessageQueue::Post(MessageHandler* phandler, |
339 uint32_t id, | 348 uint32_t id, |
340 MessageData* pdata, | 349 MessageData* pdata, |
341 bool time_sensitive) { | 350 bool time_sensitive) { |
342 if (fStop_) | 351 if (IsQuitting()) |
343 return; | 352 return; |
344 | 353 |
345 // Keep thread safe | 354 // Keep thread safe |
346 // Add the message to the end of the queue | 355 // Add the message to the end of the queue |
347 // Signal for the multiplexer to return | 356 // Signal for the multiplexer to return |
348 | 357 |
349 { | 358 { |
350 CritScope cs(&crit_); | 359 CritScope cs(&crit_); |
351 Message msg; | 360 Message msg; |
352 msg.phandler = phandler; | 361 msg.phandler = phandler; |
(...skipping 28 matching lines...) Expand all Loading... | |
381 uint32_t id, | 390 uint32_t id, |
382 MessageData* pdata) { | 391 MessageData* pdata) { |
383 return DoDelayPost(TimeUntil(tstamp), tstamp, phandler, id, pdata); | 392 return DoDelayPost(TimeUntil(tstamp), tstamp, phandler, id, pdata); |
384 } | 393 } |
385 | 394 |
386 void MessageQueue::DoDelayPost(int64_t cmsDelay, | 395 void MessageQueue::DoDelayPost(int64_t cmsDelay, |
387 int64_t tstamp, | 396 int64_t tstamp, |
388 MessageHandler* phandler, | 397 MessageHandler* phandler, |
389 uint32_t id, | 398 uint32_t id, |
390 MessageData* pdata) { | 399 MessageData* pdata) { |
391 if (fStop_) { | 400 if (IsQuitting()) { |
392 return; | 401 return; |
393 } | 402 } |
394 | 403 |
395 // Keep thread safe | 404 // Keep thread safe |
396 // Add to the priority queue. Gets sorted soonest first. | 405 // Add to the priority queue. Gets sorted soonest first. |
397 // Signal for the multiplexer to return. | 406 // Signal for the multiplexer to return. |
398 | 407 |
399 { | 408 { |
400 CritScope cs(&crit_); | 409 CritScope cs(&crit_); |
401 Message msg; | 410 Message msg; |
(...skipping 75 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
477 dmsgq_.container().erase(new_end, dmsgq_.container().end()); | 486 dmsgq_.container().erase(new_end, dmsgq_.container().end()); |
478 dmsgq_.reheap(); | 487 dmsgq_.reheap(); |
479 } | 488 } |
480 | 489 |
481 void MessageQueue::Dispatch(Message *pmsg) { | 490 void MessageQueue::Dispatch(Message *pmsg) { |
482 TRACE_EVENT0("webrtc", "MessageQueue::Dispatch"); | 491 TRACE_EVENT0("webrtc", "MessageQueue::Dispatch"); |
483 pmsg->phandler->OnMessage(pmsg); | 492 pmsg->phandler->OnMessage(pmsg); |
484 } | 493 } |
485 | 494 |
486 } // namespace rtc | 495 } // namespace rtc |
OLD | NEW |