OLD | NEW |
1 /* | 1 /* |
2 * Copyright 2004 The WebRTC Project Authors. All rights reserved. | 2 * Copyright 2004 The WebRTC Project Authors. All rights reserved. |
3 * | 3 * |
4 * Use of this source code is governed by a BSD-style license | 4 * Use of this source code is governed by a BSD-style license |
5 * that can be found in the LICENSE file in the root of the source | 5 * that can be found in the LICENSE file in the root of the source |
6 * tree. An additional intellectual property rights grant can be found | 6 * tree. An additional intellectual property rights grant can be found |
7 * in the file PATENTS. All contributing project authors may | 7 * in the file PATENTS. All contributing project authors may |
8 * be found in the AUTHORS file in the root of the source tree. | 8 * be found in the AUTHORS file in the root of the source tree. |
9 */ | 9 */ |
10 | 10 |
(...skipping 99 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
110 CritScope cs(&crit_); | 110 CritScope cs(&crit_); |
111 std::vector<MessageQueue *>::iterator iter; | 111 std::vector<MessageQueue *>::iterator iter; |
112 for (iter = message_queues_.begin(); iter != message_queues_.end(); iter++) | 112 for (iter = message_queues_.begin(); iter != message_queues_.end(); iter++) |
113 (*iter)->Clear(handler); | 113 (*iter)->Clear(handler); |
114 } | 114 } |
115 | 115 |
116 //------------------------------------------------------------------ | 116 //------------------------------------------------------------------ |
117 // MessageQueue | 117 // MessageQueue |
118 | 118 |
119 MessageQueue::MessageQueue(SocketServer* ss, bool init_queue) | 119 MessageQueue::MessageQueue(SocketServer* ss, bool init_queue) |
120 : ss_(ss), fStop_(false), fPeekKeep_(false), | 120 : fStop_(false), fPeekKeep_(false), |
121 dmsgq_next_num_(0), fInitialized_(false), fDestroyed_(false) { | 121 dmsgq_next_num_(0), fInitialized_(false), fDestroyed_(false), ss_(ss) { |
122 if (!ss_) { | 122 if (!ss_) { |
123 // Currently, MessageQueue holds a socket server, and is the base class for | 123 // Currently, MessageQueue holds a socket server, and is the base class for |
124 // Thread. It seems like it makes more sense for Thread to hold the socket | 124 // Thread. It seems like it makes more sense for Thread to hold the socket |
125 // server, and provide it to the MessageQueue, since the Thread controls | 125 // server, and provide it to the MessageQueue, since the Thread controls |
126 // the I/O model, and MQ is agnostic to those details. Anyway, this causes | 126 // the I/O model, and MQ is agnostic to those details. Anyway, this causes |
127 // messagequeue_unittest to depend on network libraries... yuck. | 127 // messagequeue_unittest to depend on network libraries... yuck. |
128 default_ss_.reset(new DefaultSocketServer()); | 128 default_ss_.reset(new DefaultSocketServer()); |
129 ss_ = default_ss_.get(); | 129 ss_ = default_ss_.get(); |
130 } | 130 } |
131 ss_->SetMessageQueue(this); | 131 ss_->SetMessageQueue(this); |
(...skipping 20 matching lines...) Expand all Loading... |
152 return; | 152 return; |
153 } | 153 } |
154 | 154 |
155 fDestroyed_ = true; | 155 fDestroyed_ = true; |
156 // The signal is done from here to ensure | 156 // The signal is done from here to ensure |
157 // that it always gets called when the queue | 157 // that it always gets called when the queue |
158 // is going away. | 158 // is going away. |
159 SignalQueueDestroyed(); | 159 SignalQueueDestroyed(); |
160 MessageQueueManager::Remove(this); | 160 MessageQueueManager::Remove(this); |
161 Clear(NULL); | 161 Clear(NULL); |
| 162 |
| 163 SharedScope ss(&ss_lock_); |
162 if (ss_) { | 164 if (ss_) { |
163 ss_->SetMessageQueue(NULL); | 165 ss_->SetMessageQueue(NULL); |
164 } | 166 } |
165 } | 167 } |
166 | 168 |
| 169 SocketServer* MessageQueue::socketserver() { |
| 170 SharedScope ss(&ss_lock_); |
| 171 return ss_; |
| 172 } |
| 173 |
167 void MessageQueue::set_socketserver(SocketServer* ss) { | 174 void MessageQueue::set_socketserver(SocketServer* ss) { |
| 175 // Need to lock exclusively here to prevent simultaneous modifications from |
| 176 // other threads. Can't be a shared lock to prevent races with other reading |
| 177 // threads. |
| 178 // Other places that only read "ss_" can use a shared lock as simultaneous |
| 179 // read access is allowed. |
| 180 ExclusiveScope es(&ss_lock_); |
168 ss_ = ss ? ss : default_ss_.get(); | 181 ss_ = ss ? ss : default_ss_.get(); |
169 ss_->SetMessageQueue(this); | 182 ss_->SetMessageQueue(this); |
170 } | 183 } |
171 | 184 |
172 void MessageQueue::Quit() { | 185 void MessageQueue::WakeUpSocketServer() { |
173 fStop_ = true; | 186 SharedScope ss(&ss_lock_); |
174 ss_->WakeUp(); | 187 ss_->WakeUp(); |
175 } | 188 } |
176 | 189 |
| 190 void MessageQueue::Quit() { |
| 191 fStop_ = true; |
| 192 WakeUpSocketServer(); |
| 193 } |
| 194 |
177 bool MessageQueue::IsQuitting() { | 195 bool MessageQueue::IsQuitting() { |
178 return fStop_; | 196 return fStop_; |
179 } | 197 } |
180 | 198 |
181 void MessageQueue::Restart() { | 199 void MessageQueue::Restart() { |
182 fStop_ = false; | 200 fStop_ = false; |
183 } | 201 } |
184 | 202 |
185 bool MessageQueue::Peek(Message *pmsg, int cmsWait) { | 203 bool MessageQueue::Peek(Message *pmsg, int cmsWait) { |
186 if (fPeekKeep_) { | 204 if (fPeekKeep_) { |
(...skipping 83 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
270 | 288 |
271 int cmsNext; | 289 int cmsNext; |
272 if (cmsWait == kForever) { | 290 if (cmsWait == kForever) { |
273 cmsNext = cmsDelayNext; | 291 cmsNext = cmsDelayNext; |
274 } else { | 292 } else { |
275 cmsNext = std::max(0, cmsTotal - cmsElapsed); | 293 cmsNext = std::max(0, cmsTotal - cmsElapsed); |
276 if ((cmsDelayNext != kForever) && (cmsDelayNext < cmsNext)) | 294 if ((cmsDelayNext != kForever) && (cmsDelayNext < cmsNext)) |
277 cmsNext = cmsDelayNext; | 295 cmsNext = cmsDelayNext; |
278 } | 296 } |
279 | 297 |
280 // Wait and multiplex in the meantime | 298 { |
281 if (!ss_->Wait(cmsNext, process_io)) | 299 // Wait and multiplex in the meantime |
282 return false; | 300 SharedScope ss(&ss_lock_); |
| 301 if (!ss_->Wait(cmsNext, process_io)) |
| 302 return false; |
| 303 } |
283 | 304 |
284 // If the specified timeout expired, return | 305 // If the specified timeout expired, return |
285 | 306 |
286 msCurrent = Time(); | 307 msCurrent = Time(); |
287 cmsElapsed = TimeDiff(msCurrent, msStart); | 308 cmsElapsed = TimeDiff(msCurrent, msStart); |
288 if (cmsWait != kForever) { | 309 if (cmsWait != kForever) { |
289 if (cmsElapsed >= cmsWait) | 310 if (cmsElapsed >= cmsWait) |
290 return false; | 311 return false; |
291 } | 312 } |
292 } | 313 } |
293 return false; | 314 return false; |
294 } | 315 } |
295 | 316 |
296 void MessageQueue::ReceiveSends() { | 317 void MessageQueue::ReceiveSends() { |
297 } | 318 } |
298 | 319 |
299 void MessageQueue::Post(MessageHandler* phandler, | 320 void MessageQueue::Post(MessageHandler* phandler, |
300 uint32_t id, | 321 uint32_t id, |
301 MessageData* pdata, | 322 MessageData* pdata, |
302 bool time_sensitive) { | 323 bool time_sensitive) { |
303 if (fStop_) | 324 if (fStop_) |
304 return; | 325 return; |
305 | 326 |
306 // Keep thread safe | 327 // Keep thread safe |
307 // Add the message to the end of the queue | 328 // Add the message to the end of the queue |
308 // Signal for the multiplexer to return | 329 // Signal for the multiplexer to return |
309 | 330 |
310 CritScope cs(&crit_); | 331 { |
311 Message msg; | 332 CritScope cs(&crit_); |
312 msg.phandler = phandler; | 333 Message msg; |
313 msg.message_id = id; | 334 msg.phandler = phandler; |
314 msg.pdata = pdata; | 335 msg.message_id = id; |
315 if (time_sensitive) { | 336 msg.pdata = pdata; |
316 msg.ts_sensitive = Time() + kMaxMsgLatency; | 337 if (time_sensitive) { |
| 338 msg.ts_sensitive = Time() + kMaxMsgLatency; |
| 339 } |
| 340 msgq_.push_back(msg); |
317 } | 341 } |
318 msgq_.push_back(msg); | 342 WakeUpSocketServer(); |
319 ss_->WakeUp(); | |
320 } | 343 } |
321 | 344 |
322 void MessageQueue::PostDelayed(int cmsDelay, | 345 void MessageQueue::PostDelayed(int cmsDelay, |
323 MessageHandler* phandler, | 346 MessageHandler* phandler, |
324 uint32_t id, | 347 uint32_t id, |
325 MessageData* pdata) { | 348 MessageData* pdata) { |
326 return DoDelayPost(cmsDelay, TimeAfter(cmsDelay), phandler, id, pdata); | 349 return DoDelayPost(cmsDelay, TimeAfter(cmsDelay), phandler, id, pdata); |
327 } | 350 } |
328 | 351 |
329 void MessageQueue::PostAt(uint32_t tstamp, | 352 void MessageQueue::PostAt(uint32_t tstamp, |
330 MessageHandler* phandler, | 353 MessageHandler* phandler, |
331 uint32_t id, | 354 uint32_t id, |
332 MessageData* pdata) { | 355 MessageData* pdata) { |
333 return DoDelayPost(TimeUntil(tstamp), tstamp, phandler, id, pdata); | 356 return DoDelayPost(TimeUntil(tstamp), tstamp, phandler, id, pdata); |
334 } | 357 } |
335 | 358 |
336 void MessageQueue::DoDelayPost(int cmsDelay, | 359 void MessageQueue::DoDelayPost(int cmsDelay, |
337 uint32_t tstamp, | 360 uint32_t tstamp, |
338 MessageHandler* phandler, | 361 MessageHandler* phandler, |
339 uint32_t id, | 362 uint32_t id, |
340 MessageData* pdata) { | 363 MessageData* pdata) { |
341 if (fStop_) | 364 if (fStop_) |
342 return; | 365 return; |
343 | 366 |
344 // Keep thread safe | 367 // Keep thread safe |
345 // Add to the priority queue. Gets sorted soonest first. | 368 // Add to the priority queue. Gets sorted soonest first. |
346 // Signal for the multiplexer to return. | 369 // Signal for the multiplexer to return. |
347 | 370 |
348 CritScope cs(&crit_); | 371 { |
349 Message msg; | 372 CritScope cs(&crit_); |
350 msg.phandler = phandler; | 373 Message msg; |
351 msg.message_id = id; | 374 msg.phandler = phandler; |
352 msg.pdata = pdata; | 375 msg.message_id = id; |
353 DelayedMessage dmsg(cmsDelay, tstamp, dmsgq_next_num_, msg); | 376 msg.pdata = pdata; |
354 dmsgq_.push(dmsg); | 377 DelayedMessage dmsg(cmsDelay, tstamp, dmsgq_next_num_, msg); |
355 // If this message queue processes 1 message every millisecond for 50 days, | 378 dmsgq_.push(dmsg); |
356 // we will wrap this number. Even then, only messages with identical times | 379 // If this message queue processes 1 message every millisecond for 50 days, |
357 // will be misordered, and then only briefly. This is probably ok. | 380 // we will wrap this number. Even then, only messages with identical times |
358 VERIFY(0 != ++dmsgq_next_num_); | 381 // will be misordered, and then only briefly. This is probably ok. |
359 ss_->WakeUp(); | 382 VERIFY(0 != ++dmsgq_next_num_); |
| 383 } |
| 384 WakeUpSocketServer(); |
360 } | 385 } |
361 | 386 |
362 int MessageQueue::GetDelay() { | 387 int MessageQueue::GetDelay() { |
363 CritScope cs(&crit_); | 388 CritScope cs(&crit_); |
364 | 389 |
365 if (!msgq_.empty()) | 390 if (!msgq_.empty()) |
366 return 0; | 391 return 0; |
367 | 392 |
368 if (!dmsgq_.empty()) { | 393 if (!dmsgq_.empty()) { |
369 int delay = TimeUntil(dmsgq_.top().msTrigger_); | 394 int delay = TimeUntil(dmsgq_.top().msTrigger_); |
(...skipping 53 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
423 } | 448 } |
424 dmsgq_.container().erase(new_end, dmsgq_.container().end()); | 449 dmsgq_.container().erase(new_end, dmsgq_.container().end()); |
425 dmsgq_.reheap(); | 450 dmsgq_.reheap(); |
426 } | 451 } |
427 | 452 |
428 void MessageQueue::Dispatch(Message *pmsg) { | 453 void MessageQueue::Dispatch(Message *pmsg) { |
429 pmsg->phandler->OnMessage(pmsg); | 454 pmsg->phandler->OnMessage(pmsg); |
430 } | 455 } |
431 | 456 |
432 } // namespace rtc | 457 } // namespace rtc |
OLD | NEW |