Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(10)

Side by Side Diff: webrtc/base/messagequeue.cc

Issue 2023193002: Protect MessageQueue stop field with a critical section to avoid data races. (Closed) Base URL: https://chromium.googlesource.com/external/webrtc.git@master
Patch Set: Created 4 years, 6 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « webrtc/base/messagequeue.h ('k') | webrtc/base/thread.cc » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 /* 1 /*
2 * Copyright 2004 The WebRTC Project Authors. All rights reserved. 2 * Copyright 2004 The WebRTC Project Authors. All rights reserved.
3 * 3 *
4 * Use of this source code is governed by a BSD-style license 4 * Use of this source code is governed by a BSD-style license
5 * that can be found in the LICENSE file in the root of the source 5 * that can be found in the LICENSE file in the root of the source
6 * tree. An additional intellectual property rights grant can be found 6 * tree. An additional intellectual property rights grant can be found
7 * in the file PATENTS. All contributing project authors may 7 * in the file PATENTS. All contributing project authors may
8 * be found in the AUTHORS file in the root of the source tree. 8 * be found in the AUTHORS file in the root of the source tree.
9 */ 9 */
10 #include <algorithm> 10 #include <algorithm>
(...skipping 115 matching lines...) Expand 10 before | Expand all | Expand 10 after
126 } 126 }
127 } 127 }
128 128
129 void MessageQueueManager::OnMessage(Message* pmsg) { 129 void MessageQueueManager::OnMessage(Message* pmsg) {
130 RTC_DCHECK(pmsg->message_id == MSG_WAKE_MESSAGE_QUEUE); 130 RTC_DCHECK(pmsg->message_id == MSG_WAKE_MESSAGE_QUEUE);
131 } 131 }
132 132
133 //------------------------------------------------------------------ 133 //------------------------------------------------------------------
134 // MessageQueue 134 // MessageQueue
135 MessageQueue::MessageQueue(SocketServer* ss, bool init_queue) 135 MessageQueue::MessageQueue(SocketServer* ss, bool init_queue)
136 : fStop_(false), fPeekKeep_(false), 136 : fPeekKeep_(false),
137 dmsgq_next_num_(0), fInitialized_(false), fDestroyed_(false), ss_(ss) { 137 dmsgq_next_num_(0),
138 fInitialized_(false),
139 fDestroyed_(false),
140 stop_(false),
141 ss_(ss) {
138 RTC_DCHECK(ss); 142 RTC_DCHECK(ss);
139 // Currently, MessageQueue holds a socket server, and is the base class for 143 // Currently, MessageQueue holds a socket server, and is the base class for
140 // Thread. It seems like it makes more sense for Thread to hold the socket 144 // Thread. It seems like it makes more sense for Thread to hold the socket
141 // server, and provide it to the MessageQueue, since the Thread controls 145 // server, and provide it to the MessageQueue, since the Thread controls
142 // the I/O model, and MQ is agnostic to those details. Anyway, this causes 146 // the I/O model, and MQ is agnostic to those details. Anyway, this causes
143 // messagequeue_unittest to depend on network libraries... yuck. 147 // messagequeue_unittest to depend on network libraries... yuck.
144 ss_->SetMessageQueue(this); 148 ss_->SetMessageQueue(this);
145 if (init_queue) { 149 if (init_queue) {
146 DoInit(); 150 DoInit();
147 } 151 }
(...skipping 51 matching lines...) Expand 10 before | Expand all | Expand 10 after
199 ss_ = ss ? ss : own_ss_.get(); 203 ss_ = ss ? ss : own_ss_.get();
200 ss_->SetMessageQueue(this); 204 ss_->SetMessageQueue(this);
201 } 205 }
202 206
203 void MessageQueue::WakeUpSocketServer() { 207 void MessageQueue::WakeUpSocketServer() {
204 SharedScope ss(&ss_lock_); 208 SharedScope ss(&ss_lock_);
205 ss_->WakeUp(); 209 ss_->WakeUp();
206 } 210 }
207 211
208 void MessageQueue::Quit() { 212 void MessageQueue::Quit() {
209 fStop_ = true; 213 {
214 CritScope cs(&stop_crit_);
215 stop_ = true;
pthatcher1 2016/06/01 15:29:14 Might as well rename "stop_" to "quitting_".
216 }
210 WakeUpSocketServer(); 217 WakeUpSocketServer();
211 } 218 }
212 219
213 bool MessageQueue::IsQuitting() { 220 bool MessageQueue::IsQuitting() {
214 return fStop_; 221 CritScope cs(&stop_crit_);
pthatcher1 2016/06/01 15:29:14 Should we use an std::atomic here once they are al
tommi (sloooow) - chröme 2016/06/01 15:53:29 We do have alternatives to std::atomic that we can
222 return stop_;
215 } 223 }
216 224
217 void MessageQueue::Restart() { 225 void MessageQueue::Restart() {
218 fStop_ = false; 226 CritScope cs(&stop_crit_);
227 stop_ = false;
219 } 228 }
220 229
221 bool MessageQueue::Peek(Message *pmsg, int cmsWait) { 230 bool MessageQueue::Peek(Message *pmsg, int cmsWait) {
222 if (fPeekKeep_) { 231 if (fPeekKeep_) {
223 *pmsg = msgPeek_; 232 *pmsg = msgPeek_;
224 return true; 233 return true;
225 } 234 }
226 if (!Get(pmsg, cmsWait)) 235 if (!Get(pmsg, cmsWait))
227 return false; 236 return false;
228 msgPeek_ = *pmsg; 237 msgPeek_ = *pmsg;
(...skipping 63 matching lines...) Expand 10 before | Expand all | Expand 10 after
292 // If this was a dispose message, delete it and skip it. 301 // If this was a dispose message, delete it and skip it.
293 if (MQID_DISPOSE == pmsg->message_id) { 302 if (MQID_DISPOSE == pmsg->message_id) {
294 ASSERT(NULL == pmsg->phandler); 303 ASSERT(NULL == pmsg->phandler);
295 delete pmsg->pdata; 304 delete pmsg->pdata;
296 *pmsg = Message(); 305 *pmsg = Message();
297 continue; 306 continue;
298 } 307 }
299 return true; 308 return true;
300 } 309 }
301 310
302 if (fStop_) 311 if (IsQuitting())
303 break; 312 break;
304 313
305 // Which is shorter, the delay wait or the asked wait? 314 // Which is shorter, the delay wait or the asked wait?
306 315
307 int64_t cmsNext; 316 int64_t cmsNext;
308 if (cmsWait == kForever) { 317 if (cmsWait == kForever) {
309 cmsNext = cmsDelayNext; 318 cmsNext = cmsDelayNext;
310 } else { 319 } else {
311 cmsNext = std::max<int64_t>(0, cmsTotal - cmsElapsed); 320 cmsNext = std::max<int64_t>(0, cmsTotal - cmsElapsed);
312 if ((cmsDelayNext != kForever) && (cmsDelayNext < cmsNext)) 321 if ((cmsDelayNext != kForever) && (cmsDelayNext < cmsNext))
(...skipping 19 matching lines...) Expand all
332 return false; 341 return false;
333 } 342 }
334 343
335 void MessageQueue::ReceiveSends() { 344 void MessageQueue::ReceiveSends() {
336 } 345 }
337 346
338 void MessageQueue::Post(MessageHandler* phandler, 347 void MessageQueue::Post(MessageHandler* phandler,
339 uint32_t id, 348 uint32_t id,
340 MessageData* pdata, 349 MessageData* pdata,
341 bool time_sensitive) { 350 bool time_sensitive) {
342 if (fStop_) 351 if (IsQuitting())
343 return; 352 return;
344 353
345 // Keep thread safe 354 // Keep thread safe
346 // Add the message to the end of the queue 355 // Add the message to the end of the queue
347 // Signal for the multiplexer to return 356 // Signal for the multiplexer to return
348 357
349 { 358 {
350 CritScope cs(&crit_); 359 CritScope cs(&crit_);
351 Message msg; 360 Message msg;
352 msg.phandler = phandler; 361 msg.phandler = phandler;
(...skipping 28 matching lines...) Expand all
381 uint32_t id, 390 uint32_t id,
382 MessageData* pdata) { 391 MessageData* pdata) {
383 return DoDelayPost(TimeUntil(tstamp), tstamp, phandler, id, pdata); 392 return DoDelayPost(TimeUntil(tstamp), tstamp, phandler, id, pdata);
384 } 393 }
385 394
386 void MessageQueue::DoDelayPost(int64_t cmsDelay, 395 void MessageQueue::DoDelayPost(int64_t cmsDelay,
387 int64_t tstamp, 396 int64_t tstamp,
388 MessageHandler* phandler, 397 MessageHandler* phandler,
389 uint32_t id, 398 uint32_t id,
390 MessageData* pdata) { 399 MessageData* pdata) {
391 if (fStop_) { 400 if (IsQuitting()) {
392 return; 401 return;
393 } 402 }
394 403
395 // Keep thread safe 404 // Keep thread safe
396 // Add to the priority queue. Gets sorted soonest first. 405 // Add to the priority queue. Gets sorted soonest first.
397 // Signal for the multiplexer to return. 406 // Signal for the multiplexer to return.
398 407
399 { 408 {
400 CritScope cs(&crit_); 409 CritScope cs(&crit_);
401 Message msg; 410 Message msg;
(...skipping 75 matching lines...) Expand 10 before | Expand all | Expand 10 after
477 dmsgq_.container().erase(new_end, dmsgq_.container().end()); 486 dmsgq_.container().erase(new_end, dmsgq_.container().end());
478 dmsgq_.reheap(); 487 dmsgq_.reheap();
479 } 488 }
480 489
481 void MessageQueue::Dispatch(Message *pmsg) { 490 void MessageQueue::Dispatch(Message *pmsg) {
482 TRACE_EVENT0("webrtc", "MessageQueue::Dispatch"); 491 TRACE_EVENT0("webrtc", "MessageQueue::Dispatch");
483 pmsg->phandler->OnMessage(pmsg); 492 pmsg->phandler->OnMessage(pmsg);
484 } 493 }
485 494
486 } // namespace rtc 495 } // namespace rtc
OLDNEW
« no previous file with comments | « webrtc/base/messagequeue.h ('k') | webrtc/base/thread.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698