Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(101)

Side by Side Diff: webrtc/base/messagequeue.cc

Issue 2023193002: Protect MessageQueue stop field with a critical section to avoid data races. (Closed) Base URL: https://chromium.googlesource.com/external/webrtc.git@master
Patch Set: Clear suppresion is needed for vtable/destructor race Created 4 years, 5 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « webrtc/base/messagequeue.h ('k') | webrtc/base/thread.cc » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 /* 1 /*
2 * Copyright 2004 The WebRTC Project Authors. All rights reserved. 2 * Copyright 2004 The WebRTC Project Authors. All rights reserved.
3 * 3 *
4 * Use of this source code is governed by a BSD-style license 4 * Use of this source code is governed by a BSD-style license
5 * that can be found in the LICENSE file in the root of the source 5 * that can be found in the LICENSE file in the root of the source
6 * tree. An additional intellectual property rights grant can be found 6 * tree. An additional intellectual property rights grant can be found
7 * in the file PATENTS. All contributing project authors may 7 * in the file PATENTS. All contributing project authors may
8 * be found in the AUTHORS file in the root of the source tree. 8 * be found in the AUTHORS file in the root of the source tree.
9 */ 9 */
10 #include <algorithm> 10 #include <algorithm>
(...skipping 123 matching lines...) Expand 10 before | Expand all | Expand 10 after
134 // we can't synchronously wait for queues_not_done to go to 0; we need to 134 // we can't synchronously wait for queues_not_done to go to 0; we need to
135 // process messages as well. 135 // process messages as well.
136 while (AtomicOps::AcquireLoad(&queues_not_done) > 0) { 136 while (AtomicOps::AcquireLoad(&queues_not_done) > 0) {
137 rtc::Thread::Current()->ProcessMessages(0); 137 rtc::Thread::Current()->ProcessMessages(0);
138 } 138 }
139 } 139 }
140 140
141 //------------------------------------------------------------------ 141 //------------------------------------------------------------------
142 // MessageQueue 142 // MessageQueue
143 MessageQueue::MessageQueue(SocketServer* ss, bool init_queue) 143 MessageQueue::MessageQueue(SocketServer* ss, bool init_queue)
144 : fStop_(false), fPeekKeep_(false), 144 : fPeekKeep_(false),
145 dmsgq_next_num_(0), fInitialized_(false), fDestroyed_(false), ss_(ss) { 145 dmsgq_next_num_(0),
146 fInitialized_(false),
147 fDestroyed_(false),
148 stop_(false),
Taylor Brandstetter 2016/07/07 22:01:47 Since it's an int now, should probably initialize
andresp 2016/07/08 10:05:37 Done.
149 ss_(ss) {
146 RTC_DCHECK(ss); 150 RTC_DCHECK(ss);
147 // Currently, MessageQueue holds a socket server, and is the base class for 151 // Currently, MessageQueue holds a socket server, and is the base class for
148 // Thread. It seems like it makes more sense for Thread to hold the socket 152 // Thread. It seems like it makes more sense for Thread to hold the socket
149 // server, and provide it to the MessageQueue, since the Thread controls 153 // server, and provide it to the MessageQueue, since the Thread controls
150 // the I/O model, and MQ is agnostic to those details. Anyway, this causes 154 // the I/O model, and MQ is agnostic to those details. Anyway, this causes
151 // messagequeue_unittest to depend on network libraries... yuck. 155 // messagequeue_unittest to depend on network libraries... yuck.
152 ss_->SetMessageQueue(this); 156 ss_->SetMessageQueue(this);
153 if (init_queue) { 157 if (init_queue) {
154 DoInit(); 158 DoInit();
155 } 159 }
(...skipping 51 matching lines...) Expand 10 before | Expand all | Expand 10 after
207 ss_ = ss ? ss : own_ss_.get(); 211 ss_ = ss ? ss : own_ss_.get();
208 ss_->SetMessageQueue(this); 212 ss_->SetMessageQueue(this);
209 } 213 }
210 214
211 void MessageQueue::WakeUpSocketServer() { 215 void MessageQueue::WakeUpSocketServer() {
212 SharedScope ss(&ss_lock_); 216 SharedScope ss(&ss_lock_);
213 ss_->WakeUp(); 217 ss_->WakeUp();
214 } 218 }
215 219
216 void MessageQueue::Quit() { 220 void MessageQueue::Quit() {
217 fStop_ = true; 221 AtomicOps::ReleaseStore(&stop_, 1);
218 WakeUpSocketServer(); 222 WakeUpSocketServer();
219 } 223 }
220 224
221 bool MessageQueue::IsQuitting() { 225 bool MessageQueue::IsQuitting() {
222 return fStop_; 226 return AtomicOps::AcquireLoad(&stop_) != 0;
223 } 227 }
224 228
225 void MessageQueue::Restart() { 229 void MessageQueue::Restart() {
226 fStop_ = false; 230 AtomicOps::ReleaseStore(&stop_, 0);
227 } 231 }
228 232
229 bool MessageQueue::Peek(Message *pmsg, int cmsWait) { 233 bool MessageQueue::Peek(Message *pmsg, int cmsWait) {
230 if (fPeekKeep_) { 234 if (fPeekKeep_) {
231 *pmsg = msgPeek_; 235 *pmsg = msgPeek_;
232 return true; 236 return true;
233 } 237 }
234 if (!Get(pmsg, cmsWait)) 238 if (!Get(pmsg, cmsWait))
235 return false; 239 return false;
236 msgPeek_ = *pmsg; 240 msgPeek_ = *pmsg;
(...skipping 63 matching lines...) Expand 10 before | Expand all | Expand 10 after
300 // If this was a dispose message, delete it and skip it. 304 // If this was a dispose message, delete it and skip it.
301 if (MQID_DISPOSE == pmsg->message_id) { 305 if (MQID_DISPOSE == pmsg->message_id) {
302 ASSERT(NULL == pmsg->phandler); 306 ASSERT(NULL == pmsg->phandler);
303 delete pmsg->pdata; 307 delete pmsg->pdata;
304 *pmsg = Message(); 308 *pmsg = Message();
305 continue; 309 continue;
306 } 310 }
307 return true; 311 return true;
308 } 312 }
309 313
310 if (fStop_) 314 if (IsQuitting())
311 break; 315 break;
312 316
313 // Which is shorter, the delay wait or the asked wait? 317 // Which is shorter, the delay wait or the asked wait?
314 318
315 int64_t cmsNext; 319 int64_t cmsNext;
316 if (cmsWait == kForever) { 320 if (cmsWait == kForever) {
317 cmsNext = cmsDelayNext; 321 cmsNext = cmsDelayNext;
318 } else { 322 } else {
319 cmsNext = std::max<int64_t>(0, cmsTotal - cmsElapsed); 323 cmsNext = std::max<int64_t>(0, cmsTotal - cmsElapsed);
320 if ((cmsDelayNext != kForever) && (cmsDelayNext < cmsNext)) 324 if ((cmsDelayNext != kForever) && (cmsDelayNext < cmsNext))
(...skipping 20 matching lines...) Expand all
341 } 345 }
342 346
343 void MessageQueue::ReceiveSends() { 347 void MessageQueue::ReceiveSends() {
344 } 348 }
345 349
346 void MessageQueue::Post(const Location& posted_from, 350 void MessageQueue::Post(const Location& posted_from,
347 MessageHandler* phandler, 351 MessageHandler* phandler,
348 uint32_t id, 352 uint32_t id,
349 MessageData* pdata, 353 MessageData* pdata,
350 bool time_sensitive) { 354 bool time_sensitive) {
351 if (fStop_) 355 if (IsQuitting())
352 return; 356 return;
353 357
354 // Keep thread safe 358 // Keep thread safe
355 // Add the message to the end of the queue 359 // Add the message to the end of the queue
356 // Signal for the multiplexer to return 360 // Signal for the multiplexer to return
357 361
358 { 362 {
359 CritScope cs(&crit_); 363 CritScope cs(&crit_);
360 Message msg; 364 Message msg;
361 msg.posted_from = posted_from; 365 msg.posted_from = posted_from;
(...skipping 35 matching lines...) Expand 10 before | Expand all | Expand 10 after
397 return DoDelayPost(posted_from, TimeUntil(tstamp), tstamp, phandler, id, 401 return DoDelayPost(posted_from, TimeUntil(tstamp), tstamp, phandler, id,
398 pdata); 402 pdata);
399 } 403 }
400 404
401 void MessageQueue::DoDelayPost(const Location& posted_from, 405 void MessageQueue::DoDelayPost(const Location& posted_from,
402 int64_t cmsDelay, 406 int64_t cmsDelay,
403 int64_t tstamp, 407 int64_t tstamp,
404 MessageHandler* phandler, 408 MessageHandler* phandler,
405 uint32_t id, 409 uint32_t id,
406 MessageData* pdata) { 410 MessageData* pdata) {
407 if (fStop_) { 411 if (IsQuitting()) {
408 return; 412 return;
409 } 413 }
410 414
411 // Keep thread safe 415 // Keep thread safe
412 // Add to the priority queue. Gets sorted soonest first. 416 // Add to the priority queue. Gets sorted soonest first.
413 // Signal for the multiplexer to return. 417 // Signal for the multiplexer to return.
414 418
415 { 419 {
416 CritScope cs(&crit_); 420 CritScope cs(&crit_);
417 Message msg; 421 Message msg;
(...skipping 85 matching lines...) Expand 10 before | Expand all | Expand 10 after
503 pmsg->phandler->OnMessage(pmsg); 507 pmsg->phandler->OnMessage(pmsg);
504 int64_t end_time = TimeMillis(); 508 int64_t end_time = TimeMillis();
505 int64_t diff = TimeDiff(end_time, start_time); 509 int64_t diff = TimeDiff(end_time, start_time);
506 if (diff >= kSlowDispatchLoggingThreshold) { 510 if (diff >= kSlowDispatchLoggingThreshold) {
507 LOG(LS_INFO) << "Message took " << diff << "ms to dispatch. Posted from: " 511 LOG(LS_INFO) << "Message took " << diff << "ms to dispatch. Posted from: "
508 << pmsg->posted_from.ToString(); 512 << pmsg->posted_from.ToString();
509 } 513 }
510 } 514 }
511 515
512 } // namespace rtc 516 } // namespace rtc
OLDNEW
« no previous file with comments | « webrtc/base/messagequeue.h ('k') | webrtc/base/thread.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698