Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(71)

Side by Side Diff: webrtc/base/messagequeue.cc

Issue 1675923002: Prevent data race in MessageQueue. (Closed) Base URL: https://chromium.googlesource.com/external/webrtc.git@master
Patch Set: Fixed initialization order. Created 4 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « webrtc/base/messagequeue.h ('k') | webrtc/base/thread.cc » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 /* 1 /*
2 * Copyright 2004 The WebRTC Project Authors. All rights reserved. 2 * Copyright 2004 The WebRTC Project Authors. All rights reserved.
3 * 3 *
4 * Use of this source code is governed by a BSD-style license 4 * Use of this source code is governed by a BSD-style license
5 * that can be found in the LICENSE file in the root of the source 5 * that can be found in the LICENSE file in the root of the source
6 * tree. An additional intellectual property rights grant can be found 6 * tree. An additional intellectual property rights grant can be found
7 * in the file PATENTS. All contributing project authors may 7 * in the file PATENTS. All contributing project authors may
8 * be found in the AUTHORS file in the root of the source tree. 8 * be found in the AUTHORS file in the root of the source tree.
9 */ 9 */
10 10
(...skipping 99 matching lines...) Expand 10 before | Expand all | Expand 10 after
110 CritScope cs(&crit_); 110 CritScope cs(&crit_);
111 std::vector<MessageQueue *>::iterator iter; 111 std::vector<MessageQueue *>::iterator iter;
112 for (iter = message_queues_.begin(); iter != message_queues_.end(); iter++) 112 for (iter = message_queues_.begin(); iter != message_queues_.end(); iter++)
113 (*iter)->Clear(handler); 113 (*iter)->Clear(handler);
114 } 114 }
115 115
116 //------------------------------------------------------------------ 116 //------------------------------------------------------------------
117 // MessageQueue 117 // MessageQueue
118 118
119 MessageQueue::MessageQueue(SocketServer* ss, bool init_queue) 119 MessageQueue::MessageQueue(SocketServer* ss, bool init_queue)
120 : ss_(ss), fStop_(false), fPeekKeep_(false), 120 : fStop_(false), fPeekKeep_(false),
121 dmsgq_next_num_(0), fInitialized_(false), fDestroyed_(false) { 121 dmsgq_next_num_(0), fInitialized_(false), fDestroyed_(false), ss_(ss) {
122 if (!ss_) { 122 if (!ss_) {
123 // Currently, MessageQueue holds a socket server, and is the base class for 123 // Currently, MessageQueue holds a socket server, and is the base class for
124 // Thread. It seems like it makes more sense for Thread to hold the socket 124 // Thread. It seems like it makes more sense for Thread to hold the socket
125 // server, and provide it to the MessageQueue, since the Thread controls 125 // server, and provide it to the MessageQueue, since the Thread controls
126 // the I/O model, and MQ is agnostic to those details. Anyway, this causes 126 // the I/O model, and MQ is agnostic to those details. Anyway, this causes
127 // messagequeue_unittest to depend on network libraries... yuck. 127 // messagequeue_unittest to depend on network libraries... yuck.
128 default_ss_.reset(new DefaultSocketServer()); 128 default_ss_.reset(new DefaultSocketServer());
129 ss_ = default_ss_.get(); 129 ss_ = default_ss_.get();
130 } 130 }
131 ss_->SetMessageQueue(this); 131 ss_->SetMessageQueue(this);
(...skipping 20 matching lines...) Expand all
152 return; 152 return;
153 } 153 }
154 154
155 fDestroyed_ = true; 155 fDestroyed_ = true;
156 // The signal is done from here to ensure 156 // The signal is done from here to ensure
157 // that it always gets called when the queue 157 // that it always gets called when the queue
158 // is going away. 158 // is going away.
159 SignalQueueDestroyed(); 159 SignalQueueDestroyed();
160 MessageQueueManager::Remove(this); 160 MessageQueueManager::Remove(this);
161 Clear(NULL); 161 Clear(NULL);
162
163 SharedScope ss(&ss_lock_);
162 if (ss_) { 164 if (ss_) {
163 ss_->SetMessageQueue(NULL); 165 ss_->SetMessageQueue(NULL);
164 } 166 }
165 } 167 }
166 168
169 SocketServer* MessageQueue::socketserver() {
170 SharedScope ss(&ss_lock_);
171 return ss_;
172 }
173
167 void MessageQueue::set_socketserver(SocketServer* ss) { 174 void MessageQueue::set_socketserver(SocketServer* ss) {
175 // Need to lock exclusively here to prevent simultaneous modifications from
176 // other threads. Can't be a shared lock to prevent races with other reading
177 // threads.
178 // Other places that only read "ss_" can use a shared lock as simultaneous
179 // read access is allowed.
180 ExclusiveScope es(&ss_lock_);
168 ss_ = ss ? ss : default_ss_.get(); 181 ss_ = ss ? ss : default_ss_.get();
169 ss_->SetMessageQueue(this); 182 ss_->SetMessageQueue(this);
170 } 183 }
171 184
172 void MessageQueue::Quit() { 185 void MessageQueue::WakeUpSocketServer() {
173 fStop_ = true; 186 SharedScope ss(&ss_lock_);
174 ss_->WakeUp(); 187 ss_->WakeUp();
175 } 188 }
176 189
190 void MessageQueue::Quit() {
191 fStop_ = true;
192 WakeUpSocketServer();
193 }
194
177 bool MessageQueue::IsQuitting() { 195 bool MessageQueue::IsQuitting() {
178 return fStop_; 196 return fStop_;
179 } 197 }
180 198
181 void MessageQueue::Restart() { 199 void MessageQueue::Restart() {
182 fStop_ = false; 200 fStop_ = false;
183 } 201 }
184 202
185 bool MessageQueue::Peek(Message *pmsg, int cmsWait) { 203 bool MessageQueue::Peek(Message *pmsg, int cmsWait) {
186 if (fPeekKeep_) { 204 if (fPeekKeep_) {
(...skipping 83 matching lines...) Expand 10 before | Expand all | Expand 10 after
270 288
271 int cmsNext; 289 int cmsNext;
272 if (cmsWait == kForever) { 290 if (cmsWait == kForever) {
273 cmsNext = cmsDelayNext; 291 cmsNext = cmsDelayNext;
274 } else { 292 } else {
275 cmsNext = std::max(0, cmsTotal - cmsElapsed); 293 cmsNext = std::max(0, cmsTotal - cmsElapsed);
276 if ((cmsDelayNext != kForever) && (cmsDelayNext < cmsNext)) 294 if ((cmsDelayNext != kForever) && (cmsDelayNext < cmsNext))
277 cmsNext = cmsDelayNext; 295 cmsNext = cmsDelayNext;
278 } 296 }
279 297
280 // Wait and multiplex in the meantime 298 {
281 if (!ss_->Wait(cmsNext, process_io)) 299 // Wait and multiplex in the meantime
282 return false; 300 SharedScope ss(&ss_lock_);
301 if (!ss_->Wait(cmsNext, process_io))
302 return false;
303 }
283 304
284 // If the specified timeout expired, return 305 // If the specified timeout expired, return
285 306
286 msCurrent = Time(); 307 msCurrent = Time();
287 cmsElapsed = TimeDiff(msCurrent, msStart); 308 cmsElapsed = TimeDiff(msCurrent, msStart);
288 if (cmsWait != kForever) { 309 if (cmsWait != kForever) {
289 if (cmsElapsed >= cmsWait) 310 if (cmsElapsed >= cmsWait)
290 return false; 311 return false;
291 } 312 }
292 } 313 }
293 return false; 314 return false;
294 } 315 }
295 316
296 void MessageQueue::ReceiveSends() { 317 void MessageQueue::ReceiveSends() {
297 } 318 }
298 319
299 void MessageQueue::Post(MessageHandler* phandler, 320 void MessageQueue::Post(MessageHandler* phandler,
300 uint32_t id, 321 uint32_t id,
301 MessageData* pdata, 322 MessageData* pdata,
302 bool time_sensitive) { 323 bool time_sensitive) {
303 if (fStop_) 324 if (fStop_)
304 return; 325 return;
305 326
306 // Keep thread safe 327 // Keep thread safe
307 // Add the message to the end of the queue 328 // Add the message to the end of the queue
308 // Signal for the multiplexer to return 329 // Signal for the multiplexer to return
309 330
310 CritScope cs(&crit_); 331 {
311 Message msg; 332 CritScope cs(&crit_);
312 msg.phandler = phandler; 333 Message msg;
313 msg.message_id = id; 334 msg.phandler = phandler;
314 msg.pdata = pdata; 335 msg.message_id = id;
315 if (time_sensitive) { 336 msg.pdata = pdata;
316 msg.ts_sensitive = Time() + kMaxMsgLatency; 337 if (time_sensitive) {
338 msg.ts_sensitive = Time() + kMaxMsgLatency;
339 }
340 msgq_.push_back(msg);
317 } 341 }
318 msgq_.push_back(msg); 342 WakeUpSocketServer();
319 ss_->WakeUp();
320 } 343 }
321 344
322 void MessageQueue::PostDelayed(int cmsDelay, 345 void MessageQueue::PostDelayed(int cmsDelay,
323 MessageHandler* phandler, 346 MessageHandler* phandler,
324 uint32_t id, 347 uint32_t id,
325 MessageData* pdata) { 348 MessageData* pdata) {
326 return DoDelayPost(cmsDelay, TimeAfter(cmsDelay), phandler, id, pdata); 349 return DoDelayPost(cmsDelay, TimeAfter(cmsDelay), phandler, id, pdata);
327 } 350 }
328 351
329 void MessageQueue::PostAt(uint32_t tstamp, 352 void MessageQueue::PostAt(uint32_t tstamp,
330 MessageHandler* phandler, 353 MessageHandler* phandler,
331 uint32_t id, 354 uint32_t id,
332 MessageData* pdata) { 355 MessageData* pdata) {
333 return DoDelayPost(TimeUntil(tstamp), tstamp, phandler, id, pdata); 356 return DoDelayPost(TimeUntil(tstamp), tstamp, phandler, id, pdata);
334 } 357 }
335 358
336 void MessageQueue::DoDelayPost(int cmsDelay, 359 void MessageQueue::DoDelayPost(int cmsDelay,
337 uint32_t tstamp, 360 uint32_t tstamp,
338 MessageHandler* phandler, 361 MessageHandler* phandler,
339 uint32_t id, 362 uint32_t id,
340 MessageData* pdata) { 363 MessageData* pdata) {
341 if (fStop_) 364 if (fStop_)
342 return; 365 return;
343 366
344 // Keep thread safe 367 // Keep thread safe
345 // Add to the priority queue. Gets sorted soonest first. 368 // Add to the priority queue. Gets sorted soonest first.
346 // Signal for the multiplexer to return. 369 // Signal for the multiplexer to return.
347 370
348 CritScope cs(&crit_); 371 {
349 Message msg; 372 CritScope cs(&crit_);
350 msg.phandler = phandler; 373 Message msg;
351 msg.message_id = id; 374 msg.phandler = phandler;
352 msg.pdata = pdata; 375 msg.message_id = id;
353 DelayedMessage dmsg(cmsDelay, tstamp, dmsgq_next_num_, msg); 376 msg.pdata = pdata;
354 dmsgq_.push(dmsg); 377 DelayedMessage dmsg(cmsDelay, tstamp, dmsgq_next_num_, msg);
355 // If this message queue processes 1 message every millisecond for 50 days, 378 dmsgq_.push(dmsg);
356 // we will wrap this number. Even then, only messages with identical times 379 // If this message queue processes 1 message every millisecond for 50 days,
357 // will be misordered, and then only briefly. This is probably ok. 380 // we will wrap this number. Even then, only messages with identical times
358 VERIFY(0 != ++dmsgq_next_num_); 381 // will be misordered, and then only briefly. This is probably ok.
359 ss_->WakeUp(); 382 VERIFY(0 != ++dmsgq_next_num_);
383 }
384 WakeUpSocketServer();
360 } 385 }
361 386
362 int MessageQueue::GetDelay() { 387 int MessageQueue::GetDelay() {
363 CritScope cs(&crit_); 388 CritScope cs(&crit_);
364 389
365 if (!msgq_.empty()) 390 if (!msgq_.empty())
366 return 0; 391 return 0;
367 392
368 if (!dmsgq_.empty()) { 393 if (!dmsgq_.empty()) {
369 int delay = TimeUntil(dmsgq_.top().msTrigger_); 394 int delay = TimeUntil(dmsgq_.top().msTrigger_);
(...skipping 53 matching lines...) Expand 10 before | Expand all | Expand 10 after
423 } 448 }
424 dmsgq_.container().erase(new_end, dmsgq_.container().end()); 449 dmsgq_.container().erase(new_end, dmsgq_.container().end());
425 dmsgq_.reheap(); 450 dmsgq_.reheap();
426 } 451 }
427 452
428 void MessageQueue::Dispatch(Message *pmsg) { 453 void MessageQueue::Dispatch(Message *pmsg) {
429 pmsg->phandler->OnMessage(pmsg); 454 pmsg->phandler->OnMessage(pmsg);
430 } 455 }
431 456
432 } // namespace rtc 457 } // namespace rtc
OLDNEW
« no previous file with comments | « webrtc/base/messagequeue.h ('k') | webrtc/base/thread.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698