Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(11)

Side by Side Diff: webrtc/base/messagequeue.cc

Issue 2023193002: Protect MessageQueue stop field with a critical section to avoid data races. (Closed) Base URL: https://chromium.googlesource.com/external/webrtc.git@master
Patch Set: initialize with 0 Created 4 years, 5 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « webrtc/base/messagequeue.h ('k') | webrtc/base/thread.cc » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 /* 1 /*
2 * Copyright 2004 The WebRTC Project Authors. All rights reserved. 2 * Copyright 2004 The WebRTC Project Authors. All rights reserved.
3 * 3 *
4 * Use of this source code is governed by a BSD-style license 4 * Use of this source code is governed by a BSD-style license
5 * that can be found in the LICENSE file in the root of the source 5 * that can be found in the LICENSE file in the root of the source
6 * tree. An additional intellectual property rights grant can be found 6 * tree. An additional intellectual property rights grant can be found
7 * in the file PATENTS. All contributing project authors may 7 * in the file PATENTS. All contributing project authors may
8 * be found in the AUTHORS file in the root of the source tree. 8 * be found in the AUTHORS file in the root of the source tree.
9 */ 9 */
10 #include <algorithm> 10 #include <algorithm>
(...skipping 132 matching lines...) Expand 10 before | Expand all | Expand 10 after
143 // we can't synchronously wait for queues_not_done to go to 0; we need to 143 // we can't synchronously wait for queues_not_done to go to 0; we need to
144 // process messages as well. 144 // process messages as well.
145 while (AtomicOps::AcquireLoad(&queues_not_done) > 0) { 145 while (AtomicOps::AcquireLoad(&queues_not_done) > 0) {
146 rtc::Thread::Current()->ProcessMessages(0); 146 rtc::Thread::Current()->ProcessMessages(0);
147 } 147 }
148 } 148 }
149 149
150 //------------------------------------------------------------------ 150 //------------------------------------------------------------------
151 // MessageQueue 151 // MessageQueue
152 MessageQueue::MessageQueue(SocketServer* ss, bool init_queue) 152 MessageQueue::MessageQueue(SocketServer* ss, bool init_queue)
153 : fStop_(false), fPeekKeep_(false), 153 : fPeekKeep_(false),
154 dmsgq_next_num_(0), fInitialized_(false), fDestroyed_(false), ss_(ss) { 154 dmsgq_next_num_(0),
155 fInitialized_(false),
156 fDestroyed_(false),
157 stop_(0),
158 ss_(ss) {
155 RTC_DCHECK(ss); 159 RTC_DCHECK(ss);
156 // Currently, MessageQueue holds a socket server, and is the base class for 160 // Currently, MessageQueue holds a socket server, and is the base class for
157 // Thread. It seems like it makes more sense for Thread to hold the socket 161 // Thread. It seems like it makes more sense for Thread to hold the socket
158 // server, and provide it to the MessageQueue, since the Thread controls 162 // server, and provide it to the MessageQueue, since the Thread controls
159 // the I/O model, and MQ is agnostic to those details. Anyway, this causes 163 // the I/O model, and MQ is agnostic to those details. Anyway, this causes
160 // messagequeue_unittest to depend on network libraries... yuck. 164 // messagequeue_unittest to depend on network libraries... yuck.
161 ss_->SetMessageQueue(this); 165 ss_->SetMessageQueue(this);
162 if (init_queue) { 166 if (init_queue) {
163 DoInit(); 167 DoInit();
164 } 168 }
(...skipping 51 matching lines...) Expand 10 before | Expand all | Expand 10 after
216 ss_ = ss ? ss : own_ss_.get(); 220 ss_ = ss ? ss : own_ss_.get();
217 ss_->SetMessageQueue(this); 221 ss_->SetMessageQueue(this);
218 } 222 }
219 223
220 void MessageQueue::WakeUpSocketServer() { 224 void MessageQueue::WakeUpSocketServer() {
221 SharedScope ss(&ss_lock_); 225 SharedScope ss(&ss_lock_);
222 ss_->WakeUp(); 226 ss_->WakeUp();
223 } 227 }
224 228
225 void MessageQueue::Quit() { 229 void MessageQueue::Quit() {
226 fStop_ = true; 230 AtomicOps::ReleaseStore(&stop_, 1);
227 WakeUpSocketServer(); 231 WakeUpSocketServer();
228 } 232 }
229 233
230 bool MessageQueue::IsQuitting() { 234 bool MessageQueue::IsQuitting() {
231 return fStop_; 235 return AtomicOps::AcquireLoad(&stop_) != 0;
232 } 236 }
233 237
234 void MessageQueue::Restart() { 238 void MessageQueue::Restart() {
235 fStop_ = false; 239 AtomicOps::ReleaseStore(&stop_, 0);
236 } 240 }
237 241
238 bool MessageQueue::Peek(Message *pmsg, int cmsWait) { 242 bool MessageQueue::Peek(Message *pmsg, int cmsWait) {
239 if (fPeekKeep_) { 243 if (fPeekKeep_) {
240 *pmsg = msgPeek_; 244 *pmsg = msgPeek_;
241 return true; 245 return true;
242 } 246 }
243 if (!Get(pmsg, cmsWait)) 247 if (!Get(pmsg, cmsWait))
244 return false; 248 return false;
245 msgPeek_ = *pmsg; 249 msgPeek_ = *pmsg;
(...skipping 63 matching lines...) Expand 10 before | Expand all | Expand 10 after
309 // If this was a dispose message, delete it and skip it. 313 // If this was a dispose message, delete it and skip it.
310 if (MQID_DISPOSE == pmsg->message_id) { 314 if (MQID_DISPOSE == pmsg->message_id) {
311 ASSERT(NULL == pmsg->phandler); 315 ASSERT(NULL == pmsg->phandler);
312 delete pmsg->pdata; 316 delete pmsg->pdata;
313 *pmsg = Message(); 317 *pmsg = Message();
314 continue; 318 continue;
315 } 319 }
316 return true; 320 return true;
317 } 321 }
318 322
319 if (fStop_) 323 if (IsQuitting())
320 break; 324 break;
321 325
322 // Which is shorter, the delay wait or the asked wait? 326 // Which is shorter, the delay wait or the asked wait?
323 327
324 int64_t cmsNext; 328 int64_t cmsNext;
325 if (cmsWait == kForever) { 329 if (cmsWait == kForever) {
326 cmsNext = cmsDelayNext; 330 cmsNext = cmsDelayNext;
327 } else { 331 } else {
328 cmsNext = std::max<int64_t>(0, cmsTotal - cmsElapsed); 332 cmsNext = std::max<int64_t>(0, cmsTotal - cmsElapsed);
329 if ((cmsDelayNext != kForever) && (cmsDelayNext < cmsNext)) 333 if ((cmsDelayNext != kForever) && (cmsDelayNext < cmsNext))
(...skipping 20 matching lines...) Expand all
350 } 354 }
351 355
352 void MessageQueue::ReceiveSends() { 356 void MessageQueue::ReceiveSends() {
353 } 357 }
354 358
355 void MessageQueue::Post(const Location& posted_from, 359 void MessageQueue::Post(const Location& posted_from,
356 MessageHandler* phandler, 360 MessageHandler* phandler,
357 uint32_t id, 361 uint32_t id,
358 MessageData* pdata, 362 MessageData* pdata,
359 bool time_sensitive) { 363 bool time_sensitive) {
360 if (fStop_) 364 if (IsQuitting())
361 return; 365 return;
362 366
363 // Keep thread safe 367 // Keep thread safe
364 // Add the message to the end of the queue 368 // Add the message to the end of the queue
365 // Signal for the multiplexer to return 369 // Signal for the multiplexer to return
366 370
367 { 371 {
368 CritScope cs(&crit_); 372 CritScope cs(&crit_);
369 Message msg; 373 Message msg;
370 msg.posted_from = posted_from; 374 msg.posted_from = posted_from;
(...skipping 35 matching lines...) Expand 10 before | Expand all | Expand 10 after
406 return DoDelayPost(posted_from, TimeUntil(tstamp), tstamp, phandler, id, 410 return DoDelayPost(posted_from, TimeUntil(tstamp), tstamp, phandler, id,
407 pdata); 411 pdata);
408 } 412 }
409 413
410 void MessageQueue::DoDelayPost(const Location& posted_from, 414 void MessageQueue::DoDelayPost(const Location& posted_from,
411 int64_t cmsDelay, 415 int64_t cmsDelay,
412 int64_t tstamp, 416 int64_t tstamp,
413 MessageHandler* phandler, 417 MessageHandler* phandler,
414 uint32_t id, 418 uint32_t id,
415 MessageData* pdata) { 419 MessageData* pdata) {
416 if (fStop_) { 420 if (IsQuitting()) {
417 return; 421 return;
418 } 422 }
419 423
420 // Keep thread safe 424 // Keep thread safe
421 // Add to the priority queue. Gets sorted soonest first. 425 // Add to the priority queue. Gets sorted soonest first.
422 // Signal for the multiplexer to return. 426 // Signal for the multiplexer to return.
423 427
424 { 428 {
425 CritScope cs(&crit_); 429 CritScope cs(&crit_);
426 Message msg; 430 Message msg;
(...skipping 85 matching lines...) Expand 10 before | Expand all | Expand 10 after
512 pmsg->phandler->OnMessage(pmsg); 516 pmsg->phandler->OnMessage(pmsg);
513 int64_t end_time = TimeMillis(); 517 int64_t end_time = TimeMillis();
514 int64_t diff = TimeDiff(end_time, start_time); 518 int64_t diff = TimeDiff(end_time, start_time);
515 if (diff >= kSlowDispatchLoggingThreshold) { 519 if (diff >= kSlowDispatchLoggingThreshold) {
516 LOG(LS_INFO) << "Message took " << diff << "ms to dispatch. Posted from: " 520 LOG(LS_INFO) << "Message took " << diff << "ms to dispatch. Posted from: "
517 << pmsg->posted_from.ToString(); 521 << pmsg->posted_from.ToString();
518 } 522 }
519 } 523 }
520 524
521 } // namespace rtc 525 } // namespace rtc
OLDNEW
« no previous file with comments | « webrtc/base/messagequeue.h ('k') | webrtc/base/thread.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698