OLD | NEW |
| (Empty) |
1 /* | |
2 * Copyright 2004 The WebRTC Project Authors. All rights reserved. | |
3 * | |
4 * Use of this source code is governed by a BSD-style license | |
5 * that can be found in the LICENSE file in the root of the source | |
6 * tree. An additional intellectual property rights grant can be found | |
7 * in the file PATENTS. All contributing project authors may | |
8 * be found in the AUTHORS file in the root of the source tree. | |
9 */ | |
10 #include <algorithm> | |
11 | |
12 #include "webrtc/base/atomicops.h" | |
13 #include "webrtc/base/checks.h" | |
14 #include "webrtc/base/logging.h" | |
15 #include "webrtc/base/messagequeue.h" | |
16 #include "webrtc/base/stringencode.h" | |
17 #include "webrtc/base/thread.h" | |
18 #include "webrtc/base/trace_event.h" | |
19 | |
20 namespace rtc { | |
21 namespace { | |
22 | |
23 const int kMaxMsgLatency = 150; // 150 ms | |
24 const int kSlowDispatchLoggingThreshold = 50; // 50 ms | |
25 | |
26 class SCOPED_LOCKABLE DebugNonReentrantCritScope { | |
27 public: | |
28 DebugNonReentrantCritScope(const CriticalSection* cs, bool* locked) | |
29 EXCLUSIVE_LOCK_FUNCTION(cs) | |
30 : cs_(cs), locked_(locked) { | |
31 cs_->Enter(); | |
32 RTC_DCHECK(!*locked_); | |
33 *locked_ = true; | |
34 } | |
35 | |
36 ~DebugNonReentrantCritScope() UNLOCK_FUNCTION() { | |
37 *locked_ = false; | |
38 cs_->Leave(); | |
39 } | |
40 | |
41 private: | |
42 const CriticalSection* const cs_; | |
43 bool* locked_; | |
44 | |
45 RTC_DISALLOW_COPY_AND_ASSIGN(DebugNonReentrantCritScope); | |
46 }; | |
47 } // namespace | |
48 | |
49 //------------------------------------------------------------------ | |
50 // MessageQueueManager | |
51 | |
52 MessageQueueManager* MessageQueueManager::instance_ = nullptr; | |
53 | |
54 MessageQueueManager* MessageQueueManager::Instance() { | |
55 // Note: This is not thread safe, but it is first called before threads are | |
56 // spawned. | |
57 if (!instance_) | |
58 instance_ = new MessageQueueManager; | |
59 return instance_; | |
60 } | |
61 | |
62 bool MessageQueueManager::IsInitialized() { | |
63 return instance_ != nullptr; | |
64 } | |
65 | |
66 MessageQueueManager::MessageQueueManager() : locked_(false) {} | |
67 | |
68 MessageQueueManager::~MessageQueueManager() { | |
69 } | |
70 | |
71 void MessageQueueManager::Add(MessageQueue *message_queue) { | |
72 return Instance()->AddInternal(message_queue); | |
73 } | |
74 void MessageQueueManager::AddInternal(MessageQueue *message_queue) { | |
75 DebugNonReentrantCritScope cs(&crit_, &locked_); | |
76 message_queues_.push_back(message_queue); | |
77 } | |
78 | |
79 void MessageQueueManager::Remove(MessageQueue *message_queue) { | |
80 // If there isn't a message queue manager instance, then there isn't a queue | |
81 // to remove. | |
82 if (!instance_) return; | |
83 return Instance()->RemoveInternal(message_queue); | |
84 } | |
85 void MessageQueueManager::RemoveInternal(MessageQueue *message_queue) { | |
86 // If this is the last MessageQueue, destroy the manager as well so that | |
87 // we don't leak this object at program shutdown. As mentioned above, this is | |
88 // not thread-safe, but this should only happen at program termination (when | |
89 // the ThreadManager is destroyed, and threads are no longer active). | |
90 bool destroy = false; | |
91 { | |
92 DebugNonReentrantCritScope cs(&crit_, &locked_); | |
93 std::vector<MessageQueue *>::iterator iter; | |
94 iter = std::find(message_queues_.begin(), message_queues_.end(), | |
95 message_queue); | |
96 if (iter != message_queues_.end()) { | |
97 message_queues_.erase(iter); | |
98 } | |
99 destroy = message_queues_.empty(); | |
100 } | |
101 if (destroy) { | |
102 instance_ = nullptr; | |
103 delete this; | |
104 } | |
105 } | |
106 | |
107 void MessageQueueManager::Clear(MessageHandler *handler) { | |
108 // If there isn't a message queue manager instance, then there aren't any | |
109 // queues to remove this handler from. | |
110 if (!instance_) return; | |
111 return Instance()->ClearInternal(handler); | |
112 } | |
113 void MessageQueueManager::ClearInternal(MessageHandler *handler) { | |
114 DebugNonReentrantCritScope cs(&crit_, &locked_); | |
115 std::vector<MessageQueue *>::iterator iter; | |
116 for (iter = message_queues_.begin(); iter != message_queues_.end(); iter++) | |
117 (*iter)->Clear(handler); | |
118 } | |
119 | |
120 void MessageQueueManager::ProcessAllMessageQueues() { | |
121 if (!instance_) { | |
122 return; | |
123 } | |
124 return Instance()->ProcessAllMessageQueuesInternal(); | |
125 } | |
126 | |
127 void MessageQueueManager::ProcessAllMessageQueuesInternal() { | |
128 // This works by posting a delayed message at the current time and waiting | |
129 // for it to be dispatched on all queues, which will ensure that all messages | |
130 // that came before it were also dispatched. | |
131 volatile int queues_not_done = 0; | |
132 | |
133 // This class is used so that whether the posted message is processed, or the | |
134 // message queue is simply cleared, queues_not_done gets decremented. | |
135 class ScopedIncrement : public MessageData { | |
136 public: | |
137 ScopedIncrement(volatile int* value) : value_(value) { | |
138 AtomicOps::Increment(value_); | |
139 } | |
140 ~ScopedIncrement() override { AtomicOps::Decrement(value_); } | |
141 | |
142 private: | |
143 volatile int* value_; | |
144 }; | |
145 | |
146 { | |
147 DebugNonReentrantCritScope cs(&crit_, &locked_); | |
148 for (MessageQueue* queue : message_queues_) { | |
149 if (!queue->IsProcessingMessages()) { | |
150 // If the queue is not processing messages, it can | |
151 // be ignored. If we tried to post a message to it, it would be dropped | |
152 // or ignored. | |
153 continue; | |
154 } | |
155 queue->PostDelayed(RTC_FROM_HERE, 0, nullptr, MQID_DISPOSE, | |
156 new ScopedIncrement(&queues_not_done)); | |
157 } | |
158 } | |
159 // Note: One of the message queues may have been on this thread, which is why | |
160 // we can't synchronously wait for queues_not_done to go to 0; we need to | |
161 // process messages as well. | |
162 while (AtomicOps::AcquireLoad(&queues_not_done) > 0) { | |
163 rtc::Thread::Current()->ProcessMessages(0); | |
164 } | |
165 } | |
166 | |
167 //------------------------------------------------------------------ | |
168 // MessageQueue | |
169 MessageQueue::MessageQueue(SocketServer* ss, bool init_queue) | |
170 : fPeekKeep_(false), | |
171 dmsgq_next_num_(0), | |
172 fInitialized_(false), | |
173 fDestroyed_(false), | |
174 stop_(0), | |
175 ss_(ss) { | |
176 RTC_DCHECK(ss); | |
177 // Currently, MessageQueue holds a socket server, and is the base class for | |
178 // Thread. It seems like it makes more sense for Thread to hold the socket | |
179 // server, and provide it to the MessageQueue, since the Thread controls | |
180 // the I/O model, and MQ is agnostic to those details. Anyway, this causes | |
181 // messagequeue_unittest to depend on network libraries... yuck. | |
182 ss_->SetMessageQueue(this); | |
183 if (init_queue) { | |
184 DoInit(); | |
185 } | |
186 } | |
187 | |
188 MessageQueue::MessageQueue(std::unique_ptr<SocketServer> ss, bool init_queue) | |
189 : MessageQueue(ss.get(), init_queue) { | |
190 own_ss_ = std::move(ss); | |
191 } | |
192 | |
193 MessageQueue::~MessageQueue() { | |
194 DoDestroy(); | |
195 } | |
196 | |
197 void MessageQueue::DoInit() { | |
198 if (fInitialized_) { | |
199 return; | |
200 } | |
201 | |
202 fInitialized_ = true; | |
203 MessageQueueManager::Add(this); | |
204 } | |
205 | |
206 void MessageQueue::DoDestroy() { | |
207 if (fDestroyed_) { | |
208 return; | |
209 } | |
210 | |
211 fDestroyed_ = true; | |
212 // The signal is done from here to ensure | |
213 // that it always gets called when the queue | |
214 // is going away. | |
215 SignalQueueDestroyed(); | |
216 MessageQueueManager::Remove(this); | |
217 Clear(nullptr); | |
218 | |
219 if (ss_) { | |
220 ss_->SetMessageQueue(nullptr); | |
221 } | |
222 } | |
223 | |
224 SocketServer* MessageQueue::socketserver() { | |
225 return ss_; | |
226 } | |
227 | |
228 void MessageQueue::WakeUpSocketServer() { | |
229 ss_->WakeUp(); | |
230 } | |
231 | |
232 void MessageQueue::Quit() { | |
233 AtomicOps::ReleaseStore(&stop_, 1); | |
234 WakeUpSocketServer(); | |
235 } | |
236 | |
237 bool MessageQueue::IsQuitting() { | |
238 return AtomicOps::AcquireLoad(&stop_) != 0; | |
239 } | |
240 | |
241 bool MessageQueue::IsProcessingMessages() { | |
242 return !IsQuitting(); | |
243 } | |
244 | |
245 void MessageQueue::Restart() { | |
246 AtomicOps::ReleaseStore(&stop_, 0); | |
247 } | |
248 | |
249 bool MessageQueue::Peek(Message *pmsg, int cmsWait) { | |
250 if (fPeekKeep_) { | |
251 *pmsg = msgPeek_; | |
252 return true; | |
253 } | |
254 if (!Get(pmsg, cmsWait)) | |
255 return false; | |
256 msgPeek_ = *pmsg; | |
257 fPeekKeep_ = true; | |
258 return true; | |
259 } | |
260 | |
261 bool MessageQueue::Get(Message *pmsg, int cmsWait, bool process_io) { | |
262 // Return and clear peek if present | |
263 // Always return the peek if it exists so there is Peek/Get symmetry | |
264 | |
265 if (fPeekKeep_) { | |
266 *pmsg = msgPeek_; | |
267 fPeekKeep_ = false; | |
268 return true; | |
269 } | |
270 | |
271 // Get w/wait + timer scan / dispatch + socket / event multiplexer dispatch | |
272 | |
273 int64_t cmsTotal = cmsWait; | |
274 int64_t cmsElapsed = 0; | |
275 int64_t msStart = TimeMillis(); | |
276 int64_t msCurrent = msStart; | |
277 while (true) { | |
278 // Check for sent messages | |
279 ReceiveSends(); | |
280 | |
281 // Check for posted events | |
282 int64_t cmsDelayNext = kForever; | |
283 bool first_pass = true; | |
284 while (true) { | |
285 // All queue operations need to be locked, but nothing else in this loop | |
286 // (specifically handling disposed message) can happen inside the crit. | |
287 // Otherwise, disposed MessageHandlers will cause deadlocks. | |
288 { | |
289 CritScope cs(&crit_); | |
290 // On the first pass, check for delayed messages that have been | |
291 // triggered and calculate the next trigger time. | |
292 if (first_pass) { | |
293 first_pass = false; | |
294 while (!dmsgq_.empty()) { | |
295 if (msCurrent < dmsgq_.top().msTrigger_) { | |
296 cmsDelayNext = TimeDiff(dmsgq_.top().msTrigger_, msCurrent); | |
297 break; | |
298 } | |
299 msgq_.push_back(dmsgq_.top().msg_); | |
300 dmsgq_.pop(); | |
301 } | |
302 } | |
303 // Pull a message off the message queue, if available. | |
304 if (msgq_.empty()) { | |
305 break; | |
306 } else { | |
307 *pmsg = msgq_.front(); | |
308 msgq_.pop_front(); | |
309 } | |
310 } // crit_ is released here. | |
311 | |
312 // Log a warning for time-sensitive messages that we're late to deliver. | |
313 if (pmsg->ts_sensitive) { | |
314 int64_t delay = TimeDiff(msCurrent, pmsg->ts_sensitive); | |
315 if (delay > 0) { | |
316 LOG_F(LS_WARNING) << "id: " << pmsg->message_id << " delay: " | |
317 << (delay + kMaxMsgLatency) << "ms"; | |
318 } | |
319 } | |
320 // If this was a dispose message, delete it and skip it. | |
321 if (MQID_DISPOSE == pmsg->message_id) { | |
322 RTC_DCHECK(nullptr == pmsg->phandler); | |
323 delete pmsg->pdata; | |
324 *pmsg = Message(); | |
325 continue; | |
326 } | |
327 return true; | |
328 } | |
329 | |
330 if (IsQuitting()) | |
331 break; | |
332 | |
333 // Which is shorter, the delay wait or the asked wait? | |
334 | |
335 int64_t cmsNext; | |
336 if (cmsWait == kForever) { | |
337 cmsNext = cmsDelayNext; | |
338 } else { | |
339 cmsNext = std::max<int64_t>(0, cmsTotal - cmsElapsed); | |
340 if ((cmsDelayNext != kForever) && (cmsDelayNext < cmsNext)) | |
341 cmsNext = cmsDelayNext; | |
342 } | |
343 | |
344 { | |
345 // Wait and multiplex in the meantime | |
346 if (!ss_->Wait(static_cast<int>(cmsNext), process_io)) | |
347 return false; | |
348 } | |
349 | |
350 // If the specified timeout expired, return | |
351 | |
352 msCurrent = TimeMillis(); | |
353 cmsElapsed = TimeDiff(msCurrent, msStart); | |
354 if (cmsWait != kForever) { | |
355 if (cmsElapsed >= cmsWait) | |
356 return false; | |
357 } | |
358 } | |
359 return false; | |
360 } | |
361 | |
362 void MessageQueue::ReceiveSends() { | |
363 } | |
364 | |
365 void MessageQueue::Post(const Location& posted_from, | |
366 MessageHandler* phandler, | |
367 uint32_t id, | |
368 MessageData* pdata, | |
369 bool time_sensitive) { | |
370 if (IsQuitting()) | |
371 return; | |
372 | |
373 // Keep thread safe | |
374 // Add the message to the end of the queue | |
375 // Signal for the multiplexer to return | |
376 | |
377 { | |
378 CritScope cs(&crit_); | |
379 Message msg; | |
380 msg.posted_from = posted_from; | |
381 msg.phandler = phandler; | |
382 msg.message_id = id; | |
383 msg.pdata = pdata; | |
384 if (time_sensitive) { | |
385 msg.ts_sensitive = TimeMillis() + kMaxMsgLatency; | |
386 } | |
387 msgq_.push_back(msg); | |
388 } | |
389 WakeUpSocketServer(); | |
390 } | |
391 | |
392 void MessageQueue::PostDelayed(const Location& posted_from, | |
393 int cmsDelay, | |
394 MessageHandler* phandler, | |
395 uint32_t id, | |
396 MessageData* pdata) { | |
397 return DoDelayPost(posted_from, cmsDelay, TimeAfter(cmsDelay), phandler, id, | |
398 pdata); | |
399 } | |
400 | |
401 void MessageQueue::PostAt(const Location& posted_from, | |
402 uint32_t tstamp, | |
403 MessageHandler* phandler, | |
404 uint32_t id, | |
405 MessageData* pdata) { | |
406 // This should work even if it is used (unexpectedly). | |
407 int64_t delay = static_cast<uint32_t>(TimeMillis()) - tstamp; | |
408 return DoDelayPost(posted_from, delay, tstamp, phandler, id, pdata); | |
409 } | |
410 | |
411 void MessageQueue::PostAt(const Location& posted_from, | |
412 int64_t tstamp, | |
413 MessageHandler* phandler, | |
414 uint32_t id, | |
415 MessageData* pdata) { | |
416 return DoDelayPost(posted_from, TimeUntil(tstamp), tstamp, phandler, id, | |
417 pdata); | |
418 } | |
419 | |
420 void MessageQueue::DoDelayPost(const Location& posted_from, | |
421 int64_t cmsDelay, | |
422 int64_t tstamp, | |
423 MessageHandler* phandler, | |
424 uint32_t id, | |
425 MessageData* pdata) { | |
426 if (IsQuitting()) { | |
427 return; | |
428 } | |
429 | |
430 // Keep thread safe | |
431 // Add to the priority queue. Gets sorted soonest first. | |
432 // Signal for the multiplexer to return. | |
433 | |
434 { | |
435 CritScope cs(&crit_); | |
436 Message msg; | |
437 msg.posted_from = posted_from; | |
438 msg.phandler = phandler; | |
439 msg.message_id = id; | |
440 msg.pdata = pdata; | |
441 DelayedMessage dmsg(cmsDelay, tstamp, dmsgq_next_num_, msg); | |
442 dmsgq_.push(dmsg); | |
443 // If this message queue processes 1 message every millisecond for 50 days, | |
444 // we will wrap this number. Even then, only messages with identical times | |
445 // will be misordered, and then only briefly. This is probably ok. | |
446 ++dmsgq_next_num_; | |
447 RTC_DCHECK_NE(0, dmsgq_next_num_); | |
448 } | |
449 WakeUpSocketServer(); | |
450 } | |
451 | |
452 int MessageQueue::GetDelay() { | |
453 CritScope cs(&crit_); | |
454 | |
455 if (!msgq_.empty()) | |
456 return 0; | |
457 | |
458 if (!dmsgq_.empty()) { | |
459 int delay = TimeUntil(dmsgq_.top().msTrigger_); | |
460 if (delay < 0) | |
461 delay = 0; | |
462 return delay; | |
463 } | |
464 | |
465 return kForever; | |
466 } | |
467 | |
468 void MessageQueue::Clear(MessageHandler* phandler, | |
469 uint32_t id, | |
470 MessageList* removed) { | |
471 CritScope cs(&crit_); | |
472 | |
473 // Remove messages with phandler | |
474 | |
475 if (fPeekKeep_ && msgPeek_.Match(phandler, id)) { | |
476 if (removed) { | |
477 removed->push_back(msgPeek_); | |
478 } else { | |
479 delete msgPeek_.pdata; | |
480 } | |
481 fPeekKeep_ = false; | |
482 } | |
483 | |
484 // Remove from ordered message queue | |
485 | |
486 for (MessageList::iterator it = msgq_.begin(); it != msgq_.end();) { | |
487 if (it->Match(phandler, id)) { | |
488 if (removed) { | |
489 removed->push_back(*it); | |
490 } else { | |
491 delete it->pdata; | |
492 } | |
493 it = msgq_.erase(it); | |
494 } else { | |
495 ++it; | |
496 } | |
497 } | |
498 | |
499 // Remove from priority queue. Not directly iterable, so use this approach | |
500 | |
501 PriorityQueue::container_type::iterator new_end = dmsgq_.container().begin(); | |
502 for (PriorityQueue::container_type::iterator it = new_end; | |
503 it != dmsgq_.container().end(); ++it) { | |
504 if (it->msg_.Match(phandler, id)) { | |
505 if (removed) { | |
506 removed->push_back(it->msg_); | |
507 } else { | |
508 delete it->msg_.pdata; | |
509 } | |
510 } else { | |
511 *new_end++ = *it; | |
512 } | |
513 } | |
514 dmsgq_.container().erase(new_end, dmsgq_.container().end()); | |
515 dmsgq_.reheap(); | |
516 } | |
517 | |
518 void MessageQueue::Dispatch(Message *pmsg) { | |
519 TRACE_EVENT2("webrtc", "MessageQueue::Dispatch", "src_file_and_line", | |
520 pmsg->posted_from.file_and_line(), "src_func", | |
521 pmsg->posted_from.function_name()); | |
522 int64_t start_time = TimeMillis(); | |
523 pmsg->phandler->OnMessage(pmsg); | |
524 int64_t end_time = TimeMillis(); | |
525 int64_t diff = TimeDiff(end_time, start_time); | |
526 if (diff >= kSlowDispatchLoggingThreshold) { | |
527 LOG(LS_INFO) << "Message took " << diff << "ms to dispatch. Posted from: " | |
528 << pmsg->posted_from.ToString(); | |
529 } | |
530 } | |
531 | |
532 } // namespace rtc | |
OLD | NEW |