OLD | NEW |
1 /* | 1 /* |
2 * Copyright 2004 The WebRTC Project Authors. All rights reserved. | 2 * Copyright 2004 The WebRTC Project Authors. All rights reserved. |
3 * | 3 * |
4 * Use of this source code is governed by a BSD-style license | 4 * Use of this source code is governed by a BSD-style license |
5 * that can be found in the LICENSE file in the root of the source | 5 * that can be found in the LICENSE file in the root of the source |
6 * tree. An additional intellectual property rights grant can be found | 6 * tree. An additional intellectual property rights grant can be found |
7 * in the file PATENTS. All contributing project authors may | 7 * in the file PATENTS. All contributing project authors may |
8 * be found in the AUTHORS file in the root of the source tree. | 8 * be found in the AUTHORS file in the root of the source tree. |
9 */ | 9 */ |
10 | 10 |
(...skipping 99 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
110 CritScope cs(&crit_); | 110 CritScope cs(&crit_); |
111 std::vector<MessageQueue *>::iterator iter; | 111 std::vector<MessageQueue *>::iterator iter; |
112 for (iter = message_queues_.begin(); iter != message_queues_.end(); iter++) | 112 for (iter = message_queues_.begin(); iter != message_queues_.end(); iter++) |
113 (*iter)->Clear(handler); | 113 (*iter)->Clear(handler); |
114 } | 114 } |
115 | 115 |
116 //------------------------------------------------------------------ | 116 //------------------------------------------------------------------ |
117 // MessageQueue | 117 // MessageQueue |
118 | 118 |
119 MessageQueue::MessageQueue(SocketServer* ss, bool init_queue) | 119 MessageQueue::MessageQueue(SocketServer* ss, bool init_queue) |
120 : fStop_(false), fPeekKeep_(false), | 120 : ss_(ss), fStop_(false), fPeekKeep_(false), |
121 dmsgq_next_num_(0), fInitialized_(false), fDestroyed_(false), ss_(ss) { | 121 dmsgq_next_num_(0), fInitialized_(false), fDestroyed_(false) { |
122 if (!ss_) { | 122 if (!ss_) { |
123 // Currently, MessageQueue holds a socket server, and is the base class for | 123 // Currently, MessageQueue holds a socket server, and is the base class for |
124 // Thread. It seems like it makes more sense for Thread to hold the socket | 124 // Thread. It seems like it makes more sense for Thread to hold the socket |
125 // server, and provide it to the MessageQueue, since the Thread controls | 125 // server, and provide it to the MessageQueue, since the Thread controls |
126 // the I/O model, and MQ is agnostic to those details. Anyway, this causes | 126 // the I/O model, and MQ is agnostic to those details. Anyway, this causes |
127 // messagequeue_unittest to depend on network libraries... yuck. | 127 // messagequeue_unittest to depend on network libraries... yuck. |
128 default_ss_.reset(new DefaultSocketServer()); | 128 default_ss_.reset(new DefaultSocketServer()); |
129 ss_ = default_ss_.get(); | 129 ss_ = default_ss_.get(); |
130 } | 130 } |
131 ss_->SetMessageQueue(this); | 131 ss_->SetMessageQueue(this); |
(...skipping 20 matching lines...) Expand all Loading... |
152 return; | 152 return; |
153 } | 153 } |
154 | 154 |
155 fDestroyed_ = true; | 155 fDestroyed_ = true; |
156 // The signal is done from here to ensure | 156 // The signal is done from here to ensure |
157 // that it always gets called when the queue | 157 // that it always gets called when the queue |
158 // is going away. | 158 // is going away. |
159 SignalQueueDestroyed(); | 159 SignalQueueDestroyed(); |
160 MessageQueueManager::Remove(this); | 160 MessageQueueManager::Remove(this); |
161 Clear(NULL); | 161 Clear(NULL); |
162 | |
163 SharedScope ss(&ss_lock_); | |
164 if (ss_) { | 162 if (ss_) { |
165 ss_->SetMessageQueue(NULL); | 163 ss_->SetMessageQueue(NULL); |
166 } | 164 } |
167 } | 165 } |
168 | 166 |
169 SocketServer* MessageQueue::socketserver() { | |
170 SharedScope ss(&ss_lock_); | |
171 return ss_; | |
172 } | |
173 | |
174 void MessageQueue::set_socketserver(SocketServer* ss) { | 167 void MessageQueue::set_socketserver(SocketServer* ss) { |
175 // Need to lock exclusively here to prevent simultaneous modifications from | |
176 // other threads. Can't be a shared lock to prevent races with other reading | |
177 // threads. | |
178 // Other places that only read "ss_" can use a shared lock as simultaneous | |
179 // read access is allowed. | |
180 ExclusiveScope es(&ss_lock_); | |
181 ss_ = ss ? ss : default_ss_.get(); | 168 ss_ = ss ? ss : default_ss_.get(); |
182 ss_->SetMessageQueue(this); | 169 ss_->SetMessageQueue(this); |
183 } | 170 } |
184 | 171 |
185 void MessageQueue::WakeUpSocketServer() { | 172 void MessageQueue::Quit() { |
186 SharedScope ss(&ss_lock_); | 173 fStop_ = true; |
187 ss_->WakeUp(); | 174 ss_->WakeUp(); |
188 } | 175 } |
189 | 176 |
190 void MessageQueue::Quit() { | |
191 fStop_ = true; | |
192 WakeUpSocketServer(); | |
193 } | |
194 | |
195 bool MessageQueue::IsQuitting() { | 177 bool MessageQueue::IsQuitting() { |
196 return fStop_; | 178 return fStop_; |
197 } | 179 } |
198 | 180 |
199 void MessageQueue::Restart() { | 181 void MessageQueue::Restart() { |
200 fStop_ = false; | 182 fStop_ = false; |
201 } | 183 } |
202 | 184 |
203 bool MessageQueue::Peek(Message *pmsg, int cmsWait) { | 185 bool MessageQueue::Peek(Message *pmsg, int cmsWait) { |
204 if (fPeekKeep_) { | 186 if (fPeekKeep_) { |
(...skipping 83 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
288 | 270 |
289 int cmsNext; | 271 int cmsNext; |
290 if (cmsWait == kForever) { | 272 if (cmsWait == kForever) { |
291 cmsNext = cmsDelayNext; | 273 cmsNext = cmsDelayNext; |
292 } else { | 274 } else { |
293 cmsNext = std::max(0, cmsTotal - cmsElapsed); | 275 cmsNext = std::max(0, cmsTotal - cmsElapsed); |
294 if ((cmsDelayNext != kForever) && (cmsDelayNext < cmsNext)) | 276 if ((cmsDelayNext != kForever) && (cmsDelayNext < cmsNext)) |
295 cmsNext = cmsDelayNext; | 277 cmsNext = cmsDelayNext; |
296 } | 278 } |
297 | 279 |
298 { | 280 // Wait and multiplex in the meantime |
299 // Wait and multiplex in the meantime | 281 if (!ss_->Wait(cmsNext, process_io)) |
300 SharedScope ss(&ss_lock_); | 282 return false; |
301 if (!ss_->Wait(cmsNext, process_io)) | |
302 return false; | |
303 } | |
304 | 283 |
305 // If the specified timeout expired, return | 284 // If the specified timeout expired, return |
306 | 285 |
307 msCurrent = Time(); | 286 msCurrent = Time(); |
308 cmsElapsed = TimeDiff(msCurrent, msStart); | 287 cmsElapsed = TimeDiff(msCurrent, msStart); |
309 if (cmsWait != kForever) { | 288 if (cmsWait != kForever) { |
310 if (cmsElapsed >= cmsWait) | 289 if (cmsElapsed >= cmsWait) |
311 return false; | 290 return false; |
312 } | 291 } |
313 } | 292 } |
314 return false; | 293 return false; |
315 } | 294 } |
316 | 295 |
317 void MessageQueue::ReceiveSends() { | 296 void MessageQueue::ReceiveSends() { |
318 } | 297 } |
319 | 298 |
320 void MessageQueue::Post(MessageHandler* phandler, | 299 void MessageQueue::Post(MessageHandler* phandler, |
321 uint32_t id, | 300 uint32_t id, |
322 MessageData* pdata, | 301 MessageData* pdata, |
323 bool time_sensitive) { | 302 bool time_sensitive) { |
324 if (fStop_) | 303 if (fStop_) |
325 return; | 304 return; |
326 | 305 |
327 // Keep thread safe | 306 // Keep thread safe |
328 // Add the message to the end of the queue | 307 // Add the message to the end of the queue |
329 // Signal for the multiplexer to return | 308 // Signal for the multiplexer to return |
330 | 309 |
331 { | 310 CritScope cs(&crit_); |
332 CritScope cs(&crit_); | 311 Message msg; |
333 Message msg; | 312 msg.phandler = phandler; |
334 msg.phandler = phandler; | 313 msg.message_id = id; |
335 msg.message_id = id; | 314 msg.pdata = pdata; |
336 msg.pdata = pdata; | 315 if (time_sensitive) { |
337 if (time_sensitive) { | 316 msg.ts_sensitive = Time() + kMaxMsgLatency; |
338 msg.ts_sensitive = Time() + kMaxMsgLatency; | |
339 } | |
340 msgq_.push_back(msg); | |
341 } | 317 } |
342 WakeUpSocketServer(); | 318 msgq_.push_back(msg); |
| 319 ss_->WakeUp(); |
343 } | 320 } |
344 | 321 |
345 void MessageQueue::PostDelayed(int cmsDelay, | 322 void MessageQueue::PostDelayed(int cmsDelay, |
346 MessageHandler* phandler, | 323 MessageHandler* phandler, |
347 uint32_t id, | 324 uint32_t id, |
348 MessageData* pdata) { | 325 MessageData* pdata) { |
349 return DoDelayPost(cmsDelay, TimeAfter(cmsDelay), phandler, id, pdata); | 326 return DoDelayPost(cmsDelay, TimeAfter(cmsDelay), phandler, id, pdata); |
350 } | 327 } |
351 | 328 |
352 void MessageQueue::PostAt(uint32_t tstamp, | 329 void MessageQueue::PostAt(uint32_t tstamp, |
353 MessageHandler* phandler, | 330 MessageHandler* phandler, |
354 uint32_t id, | 331 uint32_t id, |
355 MessageData* pdata) { | 332 MessageData* pdata) { |
356 return DoDelayPost(TimeUntil(tstamp), tstamp, phandler, id, pdata); | 333 return DoDelayPost(TimeUntil(tstamp), tstamp, phandler, id, pdata); |
357 } | 334 } |
358 | 335 |
359 void MessageQueue::DoDelayPost(int cmsDelay, | 336 void MessageQueue::DoDelayPost(int cmsDelay, |
360 uint32_t tstamp, | 337 uint32_t tstamp, |
361 MessageHandler* phandler, | 338 MessageHandler* phandler, |
362 uint32_t id, | 339 uint32_t id, |
363 MessageData* pdata) { | 340 MessageData* pdata) { |
364 if (fStop_) | 341 if (fStop_) |
365 return; | 342 return; |
366 | 343 |
367 // Keep thread safe | 344 // Keep thread safe |
368 // Add to the priority queue. Gets sorted soonest first. | 345 // Add to the priority queue. Gets sorted soonest first. |
369 // Signal for the multiplexer to return. | 346 // Signal for the multiplexer to return. |
370 | 347 |
371 { | 348 CritScope cs(&crit_); |
372 CritScope cs(&crit_); | 349 Message msg; |
373 Message msg; | 350 msg.phandler = phandler; |
374 msg.phandler = phandler; | 351 msg.message_id = id; |
375 msg.message_id = id; | 352 msg.pdata = pdata; |
376 msg.pdata = pdata; | 353 DelayedMessage dmsg(cmsDelay, tstamp, dmsgq_next_num_, msg); |
377 DelayedMessage dmsg(cmsDelay, tstamp, dmsgq_next_num_, msg); | 354 dmsgq_.push(dmsg); |
378 dmsgq_.push(dmsg); | 355 // If this message queue processes 1 message every millisecond for 50 days, |
379 // If this message queue processes 1 message every millisecond for 50 days, | 356 // we will wrap this number. Even then, only messages with identical times |
380 // we will wrap this number. Even then, only messages with identical times | 357 // will be misordered, and then only briefly. This is probably ok. |
381 // will be misordered, and then only briefly. This is probably ok. | 358 VERIFY(0 != ++dmsgq_next_num_); |
382 VERIFY(0 != ++dmsgq_next_num_); | 359 ss_->WakeUp(); |
383 } | |
384 WakeUpSocketServer(); | |
385 } | 360 } |
386 | 361 |
387 int MessageQueue::GetDelay() { | 362 int MessageQueue::GetDelay() { |
388 CritScope cs(&crit_); | 363 CritScope cs(&crit_); |
389 | 364 |
390 if (!msgq_.empty()) | 365 if (!msgq_.empty()) |
391 return 0; | 366 return 0; |
392 | 367 |
393 if (!dmsgq_.empty()) { | 368 if (!dmsgq_.empty()) { |
394 int delay = TimeUntil(dmsgq_.top().msTrigger_); | 369 int delay = TimeUntil(dmsgq_.top().msTrigger_); |
(...skipping 53 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
448 } | 423 } |
449 dmsgq_.container().erase(new_end, dmsgq_.container().end()); | 424 dmsgq_.container().erase(new_end, dmsgq_.container().end()); |
450 dmsgq_.reheap(); | 425 dmsgq_.reheap(); |
451 } | 426 } |
452 | 427 |
453 void MessageQueue::Dispatch(Message *pmsg) { | 428 void MessageQueue::Dispatch(Message *pmsg) { |
454 pmsg->phandler->OnMessage(pmsg); | 429 pmsg->phandler->OnMessage(pmsg); |
455 } | 430 } |
456 | 431 |
457 } // namespace rtc | 432 } // namespace rtc |
OLD | NEW |