Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(376)

Side by Side Diff: webrtc/base/messagequeue.cc

Issue 2135173002: Revert of Protect MessageQueue stop field with a critical section to avoid data races. (Closed) Base URL: https://chromium.googlesource.com/external/webrtc.git@master
Patch Set: Created 4 years, 5 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « webrtc/base/messagequeue.h ('k') | webrtc/base/thread.cc » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 /* 1 /*
2 * Copyright 2004 The WebRTC Project Authors. All rights reserved. 2 * Copyright 2004 The WebRTC Project Authors. All rights reserved.
3 * 3 *
4 * Use of this source code is governed by a BSD-style license 4 * Use of this source code is governed by a BSD-style license
5 * that can be found in the LICENSE file in the root of the source 5 * that can be found in the LICENSE file in the root of the source
6 * tree. An additional intellectual property rights grant can be found 6 * tree. An additional intellectual property rights grant can be found
7 * in the file PATENTS. All contributing project authors may 7 * in the file PATENTS. All contributing project authors may
8 * be found in the AUTHORS file in the root of the source tree. 8 * be found in the AUTHORS file in the root of the source tree.
9 */ 9 */
10 #include <algorithm> 10 #include <algorithm>
(...skipping 132 matching lines...) Expand 10 before | Expand all | Expand 10 after
143 // we can't synchronously wait for queues_not_done to go to 0; we need to 143 // we can't synchronously wait for queues_not_done to go to 0; we need to
144 // process messages as well. 144 // process messages as well.
145 while (AtomicOps::AcquireLoad(&queues_not_done) > 0) { 145 while (AtomicOps::AcquireLoad(&queues_not_done) > 0) {
146 rtc::Thread::Current()->ProcessMessages(0); 146 rtc::Thread::Current()->ProcessMessages(0);
147 } 147 }
148 } 148 }
149 149
150 //------------------------------------------------------------------ 150 //------------------------------------------------------------------
151 // MessageQueue 151 // MessageQueue
152 MessageQueue::MessageQueue(SocketServer* ss, bool init_queue) 152 MessageQueue::MessageQueue(SocketServer* ss, bool init_queue)
153 : fPeekKeep_(false), 153 : fStop_(false), fPeekKeep_(false),
154 dmsgq_next_num_(0), 154 dmsgq_next_num_(0), fInitialized_(false), fDestroyed_(false), ss_(ss) {
155 fInitialized_(false),
156 fDestroyed_(false),
157 stop_(0),
158 ss_(ss) {
159 RTC_DCHECK(ss); 155 RTC_DCHECK(ss);
160 // Currently, MessageQueue holds a socket server, and is the base class for 156 // Currently, MessageQueue holds a socket server, and is the base class for
161 // Thread. It seems like it makes more sense for Thread to hold the socket 157 // Thread. It seems like it makes more sense for Thread to hold the socket
162 // server, and provide it to the MessageQueue, since the Thread controls 158 // server, and provide it to the MessageQueue, since the Thread controls
163 // the I/O model, and MQ is agnostic to those details. Anyway, this causes 159 // the I/O model, and MQ is agnostic to those details. Anyway, this causes
164 // messagequeue_unittest to depend on network libraries... yuck. 160 // messagequeue_unittest to depend on network libraries... yuck.
165 ss_->SetMessageQueue(this); 161 ss_->SetMessageQueue(this);
166 if (init_queue) { 162 if (init_queue) {
167 DoInit(); 163 DoInit();
168 } 164 }
(...skipping 51 matching lines...) Expand 10 before | Expand all | Expand 10 after
220 ss_ = ss ? ss : own_ss_.get(); 216 ss_ = ss ? ss : own_ss_.get();
221 ss_->SetMessageQueue(this); 217 ss_->SetMessageQueue(this);
222 } 218 }
223 219
224 void MessageQueue::WakeUpSocketServer() { 220 void MessageQueue::WakeUpSocketServer() {
225 SharedScope ss(&ss_lock_); 221 SharedScope ss(&ss_lock_);
226 ss_->WakeUp(); 222 ss_->WakeUp();
227 } 223 }
228 224
229 void MessageQueue::Quit() { 225 void MessageQueue::Quit() {
230 AtomicOps::ReleaseStore(&stop_, 1); 226 fStop_ = true;
231 WakeUpSocketServer(); 227 WakeUpSocketServer();
232 } 228 }
233 229
234 bool MessageQueue::IsQuitting() { 230 bool MessageQueue::IsQuitting() {
235 return AtomicOps::AcquireLoad(&stop_) != 0; 231 return fStop_;
236 } 232 }
237 233
238 void MessageQueue::Restart() { 234 void MessageQueue::Restart() {
239 AtomicOps::ReleaseStore(&stop_, 0); 235 fStop_ = false;
240 } 236 }
241 237
242 bool MessageQueue::Peek(Message *pmsg, int cmsWait) { 238 bool MessageQueue::Peek(Message *pmsg, int cmsWait) {
243 if (fPeekKeep_) { 239 if (fPeekKeep_) {
244 *pmsg = msgPeek_; 240 *pmsg = msgPeek_;
245 return true; 241 return true;
246 } 242 }
247 if (!Get(pmsg, cmsWait)) 243 if (!Get(pmsg, cmsWait))
248 return false; 244 return false;
249 msgPeek_ = *pmsg; 245 msgPeek_ = *pmsg;
(...skipping 63 matching lines...) Expand 10 before | Expand all | Expand 10 after
313 // If this was a dispose message, delete it and skip it. 309 // If this was a dispose message, delete it and skip it.
314 if (MQID_DISPOSE == pmsg->message_id) { 310 if (MQID_DISPOSE == pmsg->message_id) {
315 ASSERT(NULL == pmsg->phandler); 311 ASSERT(NULL == pmsg->phandler);
316 delete pmsg->pdata; 312 delete pmsg->pdata;
317 *pmsg = Message(); 313 *pmsg = Message();
318 continue; 314 continue;
319 } 315 }
320 return true; 316 return true;
321 } 317 }
322 318
323 if (IsQuitting()) 319 if (fStop_)
324 break; 320 break;
325 321
326 // Which is shorter, the delay wait or the asked wait? 322 // Which is shorter, the delay wait or the asked wait?
327 323
328 int64_t cmsNext; 324 int64_t cmsNext;
329 if (cmsWait == kForever) { 325 if (cmsWait == kForever) {
330 cmsNext = cmsDelayNext; 326 cmsNext = cmsDelayNext;
331 } else { 327 } else {
332 cmsNext = std::max<int64_t>(0, cmsTotal - cmsElapsed); 328 cmsNext = std::max<int64_t>(0, cmsTotal - cmsElapsed);
333 if ((cmsDelayNext != kForever) && (cmsDelayNext < cmsNext)) 329 if ((cmsDelayNext != kForever) && (cmsDelayNext < cmsNext))
(...skipping 20 matching lines...) Expand all
354 } 350 }
355 351
356 void MessageQueue::ReceiveSends() { 352 void MessageQueue::ReceiveSends() {
357 } 353 }
358 354
359 void MessageQueue::Post(const Location& posted_from, 355 void MessageQueue::Post(const Location& posted_from,
360 MessageHandler* phandler, 356 MessageHandler* phandler,
361 uint32_t id, 357 uint32_t id,
362 MessageData* pdata, 358 MessageData* pdata,
363 bool time_sensitive) { 359 bool time_sensitive) {
364 if (IsQuitting()) 360 if (fStop_)
365 return; 361 return;
366 362
367 // Keep thread safe 363 // Keep thread safe
368 // Add the message to the end of the queue 364 // Add the message to the end of the queue
369 // Signal for the multiplexer to return 365 // Signal for the multiplexer to return
370 366
371 { 367 {
372 CritScope cs(&crit_); 368 CritScope cs(&crit_);
373 Message msg; 369 Message msg;
374 msg.posted_from = posted_from; 370 msg.posted_from = posted_from;
(...skipping 35 matching lines...) Expand 10 before | Expand all | Expand 10 after
410 return DoDelayPost(posted_from, TimeUntil(tstamp), tstamp, phandler, id, 406 return DoDelayPost(posted_from, TimeUntil(tstamp), tstamp, phandler, id,
411 pdata); 407 pdata);
412 } 408 }
413 409
414 void MessageQueue::DoDelayPost(const Location& posted_from, 410 void MessageQueue::DoDelayPost(const Location& posted_from,
415 int64_t cmsDelay, 411 int64_t cmsDelay,
416 int64_t tstamp, 412 int64_t tstamp,
417 MessageHandler* phandler, 413 MessageHandler* phandler,
418 uint32_t id, 414 uint32_t id,
419 MessageData* pdata) { 415 MessageData* pdata) {
420 if (IsQuitting()) { 416 if (fStop_) {
421 return; 417 return;
422 } 418 }
423 419
424 // Keep thread safe 420 // Keep thread safe
425 // Add to the priority queue. Gets sorted soonest first. 421 // Add to the priority queue. Gets sorted soonest first.
426 // Signal for the multiplexer to return. 422 // Signal for the multiplexer to return.
427 423
428 { 424 {
429 CritScope cs(&crit_); 425 CritScope cs(&crit_);
430 Message msg; 426 Message msg;
(...skipping 85 matching lines...) Expand 10 before | Expand all | Expand 10 after
516 pmsg->phandler->OnMessage(pmsg); 512 pmsg->phandler->OnMessage(pmsg);
517 int64_t end_time = TimeMillis(); 513 int64_t end_time = TimeMillis();
518 int64_t diff = TimeDiff(end_time, start_time); 514 int64_t diff = TimeDiff(end_time, start_time);
519 if (diff >= kSlowDispatchLoggingThreshold) { 515 if (diff >= kSlowDispatchLoggingThreshold) {
520 LOG(LS_INFO) << "Message took " << diff << "ms to dispatch. Posted from: " 516 LOG(LS_INFO) << "Message took " << diff << "ms to dispatch. Posted from: "
521 << pmsg->posted_from.ToString(); 517 << pmsg->posted_from.ToString();
522 } 518 }
523 } 519 }
524 520
525 } // namespace rtc 521 } // namespace rtc
OLDNEW
« no previous file with comments | « webrtc/base/messagequeue.h ('k') | webrtc/base/thread.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698