Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(194)

Side by Side Diff: webrtc/base/messagequeue.cc

Issue 1714463003: Revert of Prevent data race in MessageQueue. (Closed) Base URL: https://chromium.googlesource.com/external/webrtc.git@master
Patch Set: Created 4 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « webrtc/base/messagequeue.h ('k') | webrtc/base/thread.cc » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 /* 1 /*
2 * Copyright 2004 The WebRTC Project Authors. All rights reserved. 2 * Copyright 2004 The WebRTC Project Authors. All rights reserved.
3 * 3 *
4 * Use of this source code is governed by a BSD-style license 4 * Use of this source code is governed by a BSD-style license
5 * that can be found in the LICENSE file in the root of the source 5 * that can be found in the LICENSE file in the root of the source
6 * tree. An additional intellectual property rights grant can be found 6 * tree. An additional intellectual property rights grant can be found
7 * in the file PATENTS. All contributing project authors may 7 * in the file PATENTS. All contributing project authors may
8 * be found in the AUTHORS file in the root of the source tree. 8 * be found in the AUTHORS file in the root of the source tree.
9 */ 9 */
10 10
(...skipping 99 matching lines...) Expand 10 before | Expand all | Expand 10 after
110 CritScope cs(&crit_); 110 CritScope cs(&crit_);
111 std::vector<MessageQueue *>::iterator iter; 111 std::vector<MessageQueue *>::iterator iter;
112 for (iter = message_queues_.begin(); iter != message_queues_.end(); iter++) 112 for (iter = message_queues_.begin(); iter != message_queues_.end(); iter++)
113 (*iter)->Clear(handler); 113 (*iter)->Clear(handler);
114 } 114 }
115 115
116 //------------------------------------------------------------------ 116 //------------------------------------------------------------------
117 // MessageQueue 117 // MessageQueue
118 118
119 MessageQueue::MessageQueue(SocketServer* ss, bool init_queue) 119 MessageQueue::MessageQueue(SocketServer* ss, bool init_queue)
120 : fStop_(false), fPeekKeep_(false), 120 : ss_(ss), fStop_(false), fPeekKeep_(false),
121 dmsgq_next_num_(0), fInitialized_(false), fDestroyed_(false), ss_(ss) { 121 dmsgq_next_num_(0), fInitialized_(false), fDestroyed_(false) {
122 if (!ss_) { 122 if (!ss_) {
123 // Currently, MessageQueue holds a socket server, and is the base class for 123 // Currently, MessageQueue holds a socket server, and is the base class for
124 // Thread. It seems like it makes more sense for Thread to hold the socket 124 // Thread. It seems like it makes more sense for Thread to hold the socket
125 // server, and provide it to the MessageQueue, since the Thread controls 125 // server, and provide it to the MessageQueue, since the Thread controls
126 // the I/O model, and MQ is agnostic to those details. Anyway, this causes 126 // the I/O model, and MQ is agnostic to those details. Anyway, this causes
127 // messagequeue_unittest to depend on network libraries... yuck. 127 // messagequeue_unittest to depend on network libraries... yuck.
128 default_ss_.reset(new DefaultSocketServer()); 128 default_ss_.reset(new DefaultSocketServer());
129 ss_ = default_ss_.get(); 129 ss_ = default_ss_.get();
130 } 130 }
131 ss_->SetMessageQueue(this); 131 ss_->SetMessageQueue(this);
(...skipping 20 matching lines...) Expand all
152 return; 152 return;
153 } 153 }
154 154
155 fDestroyed_ = true; 155 fDestroyed_ = true;
156 // The signal is done from here to ensure 156 // The signal is done from here to ensure
157 // that it always gets called when the queue 157 // that it always gets called when the queue
158 // is going away. 158 // is going away.
159 SignalQueueDestroyed(); 159 SignalQueueDestroyed();
160 MessageQueueManager::Remove(this); 160 MessageQueueManager::Remove(this);
161 Clear(NULL); 161 Clear(NULL);
162
163 SharedScope ss(&ss_lock_);
164 if (ss_) { 162 if (ss_) {
165 ss_->SetMessageQueue(NULL); 163 ss_->SetMessageQueue(NULL);
166 } 164 }
167 } 165 }
168 166
169 SocketServer* MessageQueue::socketserver() {
170 SharedScope ss(&ss_lock_);
171 return ss_;
172 }
173
174 void MessageQueue::set_socketserver(SocketServer* ss) { 167 void MessageQueue::set_socketserver(SocketServer* ss) {
175 // Need to lock exclusively here to prevent simultaneous modifications from
176 // other threads. Can't be a shared lock to prevent races with other reading
177 // threads.
178 // Other places that only read "ss_" can use a shared lock as simultaneous
179 // read access is allowed.
180 ExclusiveScope es(&ss_lock_);
181 ss_ = ss ? ss : default_ss_.get(); 168 ss_ = ss ? ss : default_ss_.get();
182 ss_->SetMessageQueue(this); 169 ss_->SetMessageQueue(this);
183 } 170 }
184 171
185 void MessageQueue::WakeUpSocketServer() { 172 void MessageQueue::Quit() {
186 SharedScope ss(&ss_lock_); 173 fStop_ = true;
187 ss_->WakeUp(); 174 ss_->WakeUp();
188 } 175 }
189 176
190 void MessageQueue::Quit() {
191 fStop_ = true;
192 WakeUpSocketServer();
193 }
194
195 bool MessageQueue::IsQuitting() { 177 bool MessageQueue::IsQuitting() {
196 return fStop_; 178 return fStop_;
197 } 179 }
198 180
199 void MessageQueue::Restart() { 181 void MessageQueue::Restart() {
200 fStop_ = false; 182 fStop_ = false;
201 } 183 }
202 184
203 bool MessageQueue::Peek(Message *pmsg, int cmsWait) { 185 bool MessageQueue::Peek(Message *pmsg, int cmsWait) {
204 if (fPeekKeep_) { 186 if (fPeekKeep_) {
(...skipping 83 matching lines...) Expand 10 before | Expand all | Expand 10 after
288 270
289 int cmsNext; 271 int cmsNext;
290 if (cmsWait == kForever) { 272 if (cmsWait == kForever) {
291 cmsNext = cmsDelayNext; 273 cmsNext = cmsDelayNext;
292 } else { 274 } else {
293 cmsNext = std::max(0, cmsTotal - cmsElapsed); 275 cmsNext = std::max(0, cmsTotal - cmsElapsed);
294 if ((cmsDelayNext != kForever) && (cmsDelayNext < cmsNext)) 276 if ((cmsDelayNext != kForever) && (cmsDelayNext < cmsNext))
295 cmsNext = cmsDelayNext; 277 cmsNext = cmsDelayNext;
296 } 278 }
297 279
298 { 280 // Wait and multiplex in the meantime
299 // Wait and multiplex in the meantime 281 if (!ss_->Wait(cmsNext, process_io))
300 SharedScope ss(&ss_lock_); 282 return false;
301 if (!ss_->Wait(cmsNext, process_io))
302 return false;
303 }
304 283
305 // If the specified timeout expired, return 284 // If the specified timeout expired, return
306 285
307 msCurrent = Time(); 286 msCurrent = Time();
308 cmsElapsed = TimeDiff(msCurrent, msStart); 287 cmsElapsed = TimeDiff(msCurrent, msStart);
309 if (cmsWait != kForever) { 288 if (cmsWait != kForever) {
310 if (cmsElapsed >= cmsWait) 289 if (cmsElapsed >= cmsWait)
311 return false; 290 return false;
312 } 291 }
313 } 292 }
314 return false; 293 return false;
315 } 294 }
316 295
317 void MessageQueue::ReceiveSends() { 296 void MessageQueue::ReceiveSends() {
318 } 297 }
319 298
320 void MessageQueue::Post(MessageHandler* phandler, 299 void MessageQueue::Post(MessageHandler* phandler,
321 uint32_t id, 300 uint32_t id,
322 MessageData* pdata, 301 MessageData* pdata,
323 bool time_sensitive) { 302 bool time_sensitive) {
324 if (fStop_) 303 if (fStop_)
325 return; 304 return;
326 305
327 // Keep thread safe 306 // Keep thread safe
328 // Add the message to the end of the queue 307 // Add the message to the end of the queue
329 // Signal for the multiplexer to return 308 // Signal for the multiplexer to return
330 309
331 { 310 CritScope cs(&crit_);
332 CritScope cs(&crit_); 311 Message msg;
333 Message msg; 312 msg.phandler = phandler;
334 msg.phandler = phandler; 313 msg.message_id = id;
335 msg.message_id = id; 314 msg.pdata = pdata;
336 msg.pdata = pdata; 315 if (time_sensitive) {
337 if (time_sensitive) { 316 msg.ts_sensitive = Time() + kMaxMsgLatency;
338 msg.ts_sensitive = Time() + kMaxMsgLatency;
339 }
340 msgq_.push_back(msg);
341 } 317 }
342 WakeUpSocketServer(); 318 msgq_.push_back(msg);
319 ss_->WakeUp();
343 } 320 }
344 321
345 void MessageQueue::PostDelayed(int cmsDelay, 322 void MessageQueue::PostDelayed(int cmsDelay,
346 MessageHandler* phandler, 323 MessageHandler* phandler,
347 uint32_t id, 324 uint32_t id,
348 MessageData* pdata) { 325 MessageData* pdata) {
349 return DoDelayPost(cmsDelay, TimeAfter(cmsDelay), phandler, id, pdata); 326 return DoDelayPost(cmsDelay, TimeAfter(cmsDelay), phandler, id, pdata);
350 } 327 }
351 328
352 void MessageQueue::PostAt(uint32_t tstamp, 329 void MessageQueue::PostAt(uint32_t tstamp,
353 MessageHandler* phandler, 330 MessageHandler* phandler,
354 uint32_t id, 331 uint32_t id,
355 MessageData* pdata) { 332 MessageData* pdata) {
356 return DoDelayPost(TimeUntil(tstamp), tstamp, phandler, id, pdata); 333 return DoDelayPost(TimeUntil(tstamp), tstamp, phandler, id, pdata);
357 } 334 }
358 335
359 void MessageQueue::DoDelayPost(int cmsDelay, 336 void MessageQueue::DoDelayPost(int cmsDelay,
360 uint32_t tstamp, 337 uint32_t tstamp,
361 MessageHandler* phandler, 338 MessageHandler* phandler,
362 uint32_t id, 339 uint32_t id,
363 MessageData* pdata) { 340 MessageData* pdata) {
364 if (fStop_) 341 if (fStop_)
365 return; 342 return;
366 343
367 // Keep thread safe 344 // Keep thread safe
368 // Add to the priority queue. Gets sorted soonest first. 345 // Add to the priority queue. Gets sorted soonest first.
369 // Signal for the multiplexer to return. 346 // Signal for the multiplexer to return.
370 347
371 { 348 CritScope cs(&crit_);
372 CritScope cs(&crit_); 349 Message msg;
373 Message msg; 350 msg.phandler = phandler;
374 msg.phandler = phandler; 351 msg.message_id = id;
375 msg.message_id = id; 352 msg.pdata = pdata;
376 msg.pdata = pdata; 353 DelayedMessage dmsg(cmsDelay, tstamp, dmsgq_next_num_, msg);
377 DelayedMessage dmsg(cmsDelay, tstamp, dmsgq_next_num_, msg); 354 dmsgq_.push(dmsg);
378 dmsgq_.push(dmsg); 355 // If this message queue processes 1 message every millisecond for 50 days,
379 // If this message queue processes 1 message every millisecond for 50 days, 356 // we will wrap this number. Even then, only messages with identical times
380 // we will wrap this number. Even then, only messages with identical times 357 // will be misordered, and then only briefly. This is probably ok.
381 // will be misordered, and then only briefly. This is probably ok. 358 VERIFY(0 != ++dmsgq_next_num_);
382 VERIFY(0 != ++dmsgq_next_num_); 359 ss_->WakeUp();
383 }
384 WakeUpSocketServer();
385 } 360 }
386 361
387 int MessageQueue::GetDelay() { 362 int MessageQueue::GetDelay() {
388 CritScope cs(&crit_); 363 CritScope cs(&crit_);
389 364
390 if (!msgq_.empty()) 365 if (!msgq_.empty())
391 return 0; 366 return 0;
392 367
393 if (!dmsgq_.empty()) { 368 if (!dmsgq_.empty()) {
394 int delay = TimeUntil(dmsgq_.top().msTrigger_); 369 int delay = TimeUntil(dmsgq_.top().msTrigger_);
(...skipping 53 matching lines...) Expand 10 before | Expand all | Expand 10 after
448 } 423 }
449 dmsgq_.container().erase(new_end, dmsgq_.container().end()); 424 dmsgq_.container().erase(new_end, dmsgq_.container().end());
450 dmsgq_.reheap(); 425 dmsgq_.reheap();
451 } 426 }
452 427
453 void MessageQueue::Dispatch(Message *pmsg) { 428 void MessageQueue::Dispatch(Message *pmsg) {
454 pmsg->phandler->OnMessage(pmsg); 429 pmsg->phandler->OnMessage(pmsg);
455 } 430 }
456 431
457 } // namespace rtc 432 } // namespace rtc
OLDNEW
« no previous file with comments | « webrtc/base/messagequeue.h ('k') | webrtc/base/thread.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698