Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(707)

Side by Side Diff: webrtc/base/messagequeue.cc

Issue 2877023002: Move webrtc/{base => rtc_base} (Closed)
Patch Set: update presubmit.py and DEPS include rules Created 3 years, 5 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « webrtc/base/messagequeue.h ('k') | webrtc/base/messagequeue_unittest.cc » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
(Empty)
1 /*
2 * Copyright 2004 The WebRTC Project Authors. All rights reserved.
3 *
4 * Use of this source code is governed by a BSD-style license
5 * that can be found in the LICENSE file in the root of the source
6 * tree. An additional intellectual property rights grant can be found
7 * in the file PATENTS. All contributing project authors may
8 * be found in the AUTHORS file in the root of the source tree.
9 */
10 #include <algorithm>
11
12 #include "webrtc/base/atomicops.h"
13 #include "webrtc/base/checks.h"
14 #include "webrtc/base/logging.h"
15 #include "webrtc/base/messagequeue.h"
16 #include "webrtc/base/stringencode.h"
17 #include "webrtc/base/thread.h"
18 #include "webrtc/base/trace_event.h"
19
20 namespace rtc {
21 namespace {
22
23 const int kMaxMsgLatency = 150; // 150 ms
24 const int kSlowDispatchLoggingThreshold = 50; // 50 ms
25
26 class SCOPED_LOCKABLE DebugNonReentrantCritScope {
27 public:
28 DebugNonReentrantCritScope(const CriticalSection* cs, bool* locked)
29 EXCLUSIVE_LOCK_FUNCTION(cs)
30 : cs_(cs), locked_(locked) {
31 cs_->Enter();
32 RTC_DCHECK(!*locked_);
33 *locked_ = true;
34 }
35
36 ~DebugNonReentrantCritScope() UNLOCK_FUNCTION() {
37 *locked_ = false;
38 cs_->Leave();
39 }
40
41 private:
42 const CriticalSection* const cs_;
43 bool* locked_;
44
45 RTC_DISALLOW_COPY_AND_ASSIGN(DebugNonReentrantCritScope);
46 };
47 } // namespace
48
49 //------------------------------------------------------------------
50 // MessageQueueManager
51
52 MessageQueueManager* MessageQueueManager::instance_ = nullptr;
53
54 MessageQueueManager* MessageQueueManager::Instance() {
55 // Note: This is not thread safe, but it is first called before threads are
56 // spawned.
57 if (!instance_)
58 instance_ = new MessageQueueManager;
59 return instance_;
60 }
61
62 bool MessageQueueManager::IsInitialized() {
63 return instance_ != nullptr;
64 }
65
66 MessageQueueManager::MessageQueueManager() : locked_(false) {}
67
68 MessageQueueManager::~MessageQueueManager() {
69 }
70
71 void MessageQueueManager::Add(MessageQueue *message_queue) {
72 return Instance()->AddInternal(message_queue);
73 }
74 void MessageQueueManager::AddInternal(MessageQueue *message_queue) {
75 DebugNonReentrantCritScope cs(&crit_, &locked_);
76 message_queues_.push_back(message_queue);
77 }
78
79 void MessageQueueManager::Remove(MessageQueue *message_queue) {
80 // If there isn't a message queue manager instance, then there isn't a queue
81 // to remove.
82 if (!instance_) return;
83 return Instance()->RemoveInternal(message_queue);
84 }
85 void MessageQueueManager::RemoveInternal(MessageQueue *message_queue) {
86 // If this is the last MessageQueue, destroy the manager as well so that
87 // we don't leak this object at program shutdown. As mentioned above, this is
88 // not thread-safe, but this should only happen at program termination (when
89 // the ThreadManager is destroyed, and threads are no longer active).
90 bool destroy = false;
91 {
92 DebugNonReentrantCritScope cs(&crit_, &locked_);
93 std::vector<MessageQueue *>::iterator iter;
94 iter = std::find(message_queues_.begin(), message_queues_.end(),
95 message_queue);
96 if (iter != message_queues_.end()) {
97 message_queues_.erase(iter);
98 }
99 destroy = message_queues_.empty();
100 }
101 if (destroy) {
102 instance_ = nullptr;
103 delete this;
104 }
105 }
106
107 void MessageQueueManager::Clear(MessageHandler *handler) {
108 // If there isn't a message queue manager instance, then there aren't any
109 // queues to remove this handler from.
110 if (!instance_) return;
111 return Instance()->ClearInternal(handler);
112 }
113 void MessageQueueManager::ClearInternal(MessageHandler *handler) {
114 DebugNonReentrantCritScope cs(&crit_, &locked_);
115 std::vector<MessageQueue *>::iterator iter;
116 for (iter = message_queues_.begin(); iter != message_queues_.end(); iter++)
117 (*iter)->Clear(handler);
118 }
119
120 void MessageQueueManager::ProcessAllMessageQueues() {
121 if (!instance_) {
122 return;
123 }
124 return Instance()->ProcessAllMessageQueuesInternal();
125 }
126
127 void MessageQueueManager::ProcessAllMessageQueuesInternal() {
128 // This works by posting a delayed message at the current time and waiting
129 // for it to be dispatched on all queues, which will ensure that all messages
130 // that came before it were also dispatched.
131 volatile int queues_not_done = 0;
132
133 // This class is used so that whether the posted message is processed, or the
134 // message queue is simply cleared, queues_not_done gets decremented.
135 class ScopedIncrement : public MessageData {
136 public:
137 ScopedIncrement(volatile int* value) : value_(value) {
138 AtomicOps::Increment(value_);
139 }
140 ~ScopedIncrement() override { AtomicOps::Decrement(value_); }
141
142 private:
143 volatile int* value_;
144 };
145
146 {
147 DebugNonReentrantCritScope cs(&crit_, &locked_);
148 for (MessageQueue* queue : message_queues_) {
149 if (!queue->IsProcessingMessages()) {
150 // If the queue is not processing messages, it can
151 // be ignored. If we tried to post a message to it, it would be dropped
152 // or ignored.
153 continue;
154 }
155 queue->PostDelayed(RTC_FROM_HERE, 0, nullptr, MQID_DISPOSE,
156 new ScopedIncrement(&queues_not_done));
157 }
158 }
159 // Note: One of the message queues may have been on this thread, which is why
160 // we can't synchronously wait for queues_not_done to go to 0; we need to
161 // process messages as well.
162 while (AtomicOps::AcquireLoad(&queues_not_done) > 0) {
163 rtc::Thread::Current()->ProcessMessages(0);
164 }
165 }
166
167 //------------------------------------------------------------------
168 // MessageQueue
169 MessageQueue::MessageQueue(SocketServer* ss, bool init_queue)
170 : fPeekKeep_(false),
171 dmsgq_next_num_(0),
172 fInitialized_(false),
173 fDestroyed_(false),
174 stop_(0),
175 ss_(ss) {
176 RTC_DCHECK(ss);
177 // Currently, MessageQueue holds a socket server, and is the base class for
178 // Thread. It seems like it makes more sense for Thread to hold the socket
179 // server, and provide it to the MessageQueue, since the Thread controls
180 // the I/O model, and MQ is agnostic to those details. Anyway, this causes
181 // messagequeue_unittest to depend on network libraries... yuck.
182 ss_->SetMessageQueue(this);
183 if (init_queue) {
184 DoInit();
185 }
186 }
187
188 MessageQueue::MessageQueue(std::unique_ptr<SocketServer> ss, bool init_queue)
189 : MessageQueue(ss.get(), init_queue) {
190 own_ss_ = std::move(ss);
191 }
192
193 MessageQueue::~MessageQueue() {
194 DoDestroy();
195 }
196
197 void MessageQueue::DoInit() {
198 if (fInitialized_) {
199 return;
200 }
201
202 fInitialized_ = true;
203 MessageQueueManager::Add(this);
204 }
205
206 void MessageQueue::DoDestroy() {
207 if (fDestroyed_) {
208 return;
209 }
210
211 fDestroyed_ = true;
212 // The signal is done from here to ensure
213 // that it always gets called when the queue
214 // is going away.
215 SignalQueueDestroyed();
216 MessageQueueManager::Remove(this);
217 Clear(nullptr);
218
219 if (ss_) {
220 ss_->SetMessageQueue(nullptr);
221 }
222 }
223
224 SocketServer* MessageQueue::socketserver() {
225 return ss_;
226 }
227
228 void MessageQueue::WakeUpSocketServer() {
229 ss_->WakeUp();
230 }
231
232 void MessageQueue::Quit() {
233 AtomicOps::ReleaseStore(&stop_, 1);
234 WakeUpSocketServer();
235 }
236
237 bool MessageQueue::IsQuitting() {
238 return AtomicOps::AcquireLoad(&stop_) != 0;
239 }
240
241 bool MessageQueue::IsProcessingMessages() {
242 return !IsQuitting();
243 }
244
245 void MessageQueue::Restart() {
246 AtomicOps::ReleaseStore(&stop_, 0);
247 }
248
249 bool MessageQueue::Peek(Message *pmsg, int cmsWait) {
250 if (fPeekKeep_) {
251 *pmsg = msgPeek_;
252 return true;
253 }
254 if (!Get(pmsg, cmsWait))
255 return false;
256 msgPeek_ = *pmsg;
257 fPeekKeep_ = true;
258 return true;
259 }
260
261 bool MessageQueue::Get(Message *pmsg, int cmsWait, bool process_io) {
262 // Return and clear peek if present
263 // Always return the peek if it exists so there is Peek/Get symmetry
264
265 if (fPeekKeep_) {
266 *pmsg = msgPeek_;
267 fPeekKeep_ = false;
268 return true;
269 }
270
271 // Get w/wait + timer scan / dispatch + socket / event multiplexer dispatch
272
273 int64_t cmsTotal = cmsWait;
274 int64_t cmsElapsed = 0;
275 int64_t msStart = TimeMillis();
276 int64_t msCurrent = msStart;
277 while (true) {
278 // Check for sent messages
279 ReceiveSends();
280
281 // Check for posted events
282 int64_t cmsDelayNext = kForever;
283 bool first_pass = true;
284 while (true) {
285 // All queue operations need to be locked, but nothing else in this loop
286 // (specifically handling disposed message) can happen inside the crit.
287 // Otherwise, disposed MessageHandlers will cause deadlocks.
288 {
289 CritScope cs(&crit_);
290 // On the first pass, check for delayed messages that have been
291 // triggered and calculate the next trigger time.
292 if (first_pass) {
293 first_pass = false;
294 while (!dmsgq_.empty()) {
295 if (msCurrent < dmsgq_.top().msTrigger_) {
296 cmsDelayNext = TimeDiff(dmsgq_.top().msTrigger_, msCurrent);
297 break;
298 }
299 msgq_.push_back(dmsgq_.top().msg_);
300 dmsgq_.pop();
301 }
302 }
303 // Pull a message off the message queue, if available.
304 if (msgq_.empty()) {
305 break;
306 } else {
307 *pmsg = msgq_.front();
308 msgq_.pop_front();
309 }
310 } // crit_ is released here.
311
312 // Log a warning for time-sensitive messages that we're late to deliver.
313 if (pmsg->ts_sensitive) {
314 int64_t delay = TimeDiff(msCurrent, pmsg->ts_sensitive);
315 if (delay > 0) {
316 LOG_F(LS_WARNING) << "id: " << pmsg->message_id << " delay: "
317 << (delay + kMaxMsgLatency) << "ms";
318 }
319 }
320 // If this was a dispose message, delete it and skip it.
321 if (MQID_DISPOSE == pmsg->message_id) {
322 RTC_DCHECK(nullptr == pmsg->phandler);
323 delete pmsg->pdata;
324 *pmsg = Message();
325 continue;
326 }
327 return true;
328 }
329
330 if (IsQuitting())
331 break;
332
333 // Which is shorter, the delay wait or the asked wait?
334
335 int64_t cmsNext;
336 if (cmsWait == kForever) {
337 cmsNext = cmsDelayNext;
338 } else {
339 cmsNext = std::max<int64_t>(0, cmsTotal - cmsElapsed);
340 if ((cmsDelayNext != kForever) && (cmsDelayNext < cmsNext))
341 cmsNext = cmsDelayNext;
342 }
343
344 {
345 // Wait and multiplex in the meantime
346 if (!ss_->Wait(static_cast<int>(cmsNext), process_io))
347 return false;
348 }
349
350 // If the specified timeout expired, return
351
352 msCurrent = TimeMillis();
353 cmsElapsed = TimeDiff(msCurrent, msStart);
354 if (cmsWait != kForever) {
355 if (cmsElapsed >= cmsWait)
356 return false;
357 }
358 }
359 return false;
360 }
361
362 void MessageQueue::ReceiveSends() {
363 }
364
365 void MessageQueue::Post(const Location& posted_from,
366 MessageHandler* phandler,
367 uint32_t id,
368 MessageData* pdata,
369 bool time_sensitive) {
370 if (IsQuitting())
371 return;
372
373 // Keep thread safe
374 // Add the message to the end of the queue
375 // Signal for the multiplexer to return
376
377 {
378 CritScope cs(&crit_);
379 Message msg;
380 msg.posted_from = posted_from;
381 msg.phandler = phandler;
382 msg.message_id = id;
383 msg.pdata = pdata;
384 if (time_sensitive) {
385 msg.ts_sensitive = TimeMillis() + kMaxMsgLatency;
386 }
387 msgq_.push_back(msg);
388 }
389 WakeUpSocketServer();
390 }
391
392 void MessageQueue::PostDelayed(const Location& posted_from,
393 int cmsDelay,
394 MessageHandler* phandler,
395 uint32_t id,
396 MessageData* pdata) {
397 return DoDelayPost(posted_from, cmsDelay, TimeAfter(cmsDelay), phandler, id,
398 pdata);
399 }
400
401 void MessageQueue::PostAt(const Location& posted_from,
402 uint32_t tstamp,
403 MessageHandler* phandler,
404 uint32_t id,
405 MessageData* pdata) {
406 // This should work even if it is used (unexpectedly).
407 int64_t delay = static_cast<uint32_t>(TimeMillis()) - tstamp;
408 return DoDelayPost(posted_from, delay, tstamp, phandler, id, pdata);
409 }
410
411 void MessageQueue::PostAt(const Location& posted_from,
412 int64_t tstamp,
413 MessageHandler* phandler,
414 uint32_t id,
415 MessageData* pdata) {
416 return DoDelayPost(posted_from, TimeUntil(tstamp), tstamp, phandler, id,
417 pdata);
418 }
419
420 void MessageQueue::DoDelayPost(const Location& posted_from,
421 int64_t cmsDelay,
422 int64_t tstamp,
423 MessageHandler* phandler,
424 uint32_t id,
425 MessageData* pdata) {
426 if (IsQuitting()) {
427 return;
428 }
429
430 // Keep thread safe
431 // Add to the priority queue. Gets sorted soonest first.
432 // Signal for the multiplexer to return.
433
434 {
435 CritScope cs(&crit_);
436 Message msg;
437 msg.posted_from = posted_from;
438 msg.phandler = phandler;
439 msg.message_id = id;
440 msg.pdata = pdata;
441 DelayedMessage dmsg(cmsDelay, tstamp, dmsgq_next_num_, msg);
442 dmsgq_.push(dmsg);
443 // If this message queue processes 1 message every millisecond for 50 days,
444 // we will wrap this number. Even then, only messages with identical times
445 // will be misordered, and then only briefly. This is probably ok.
446 ++dmsgq_next_num_;
447 RTC_DCHECK_NE(0, dmsgq_next_num_);
448 }
449 WakeUpSocketServer();
450 }
451
452 int MessageQueue::GetDelay() {
453 CritScope cs(&crit_);
454
455 if (!msgq_.empty())
456 return 0;
457
458 if (!dmsgq_.empty()) {
459 int delay = TimeUntil(dmsgq_.top().msTrigger_);
460 if (delay < 0)
461 delay = 0;
462 return delay;
463 }
464
465 return kForever;
466 }
467
468 void MessageQueue::Clear(MessageHandler* phandler,
469 uint32_t id,
470 MessageList* removed) {
471 CritScope cs(&crit_);
472
473 // Remove messages with phandler
474
475 if (fPeekKeep_ && msgPeek_.Match(phandler, id)) {
476 if (removed) {
477 removed->push_back(msgPeek_);
478 } else {
479 delete msgPeek_.pdata;
480 }
481 fPeekKeep_ = false;
482 }
483
484 // Remove from ordered message queue
485
486 for (MessageList::iterator it = msgq_.begin(); it != msgq_.end();) {
487 if (it->Match(phandler, id)) {
488 if (removed) {
489 removed->push_back(*it);
490 } else {
491 delete it->pdata;
492 }
493 it = msgq_.erase(it);
494 } else {
495 ++it;
496 }
497 }
498
499 // Remove from priority queue. Not directly iterable, so use this approach
500
501 PriorityQueue::container_type::iterator new_end = dmsgq_.container().begin();
502 for (PriorityQueue::container_type::iterator it = new_end;
503 it != dmsgq_.container().end(); ++it) {
504 if (it->msg_.Match(phandler, id)) {
505 if (removed) {
506 removed->push_back(it->msg_);
507 } else {
508 delete it->msg_.pdata;
509 }
510 } else {
511 *new_end++ = *it;
512 }
513 }
514 dmsgq_.container().erase(new_end, dmsgq_.container().end());
515 dmsgq_.reheap();
516 }
517
518 void MessageQueue::Dispatch(Message *pmsg) {
519 TRACE_EVENT2("webrtc", "MessageQueue::Dispatch", "src_file_and_line",
520 pmsg->posted_from.file_and_line(), "src_func",
521 pmsg->posted_from.function_name());
522 int64_t start_time = TimeMillis();
523 pmsg->phandler->OnMessage(pmsg);
524 int64_t end_time = TimeMillis();
525 int64_t diff = TimeDiff(end_time, start_time);
526 if (diff >= kSlowDispatchLoggingThreshold) {
527 LOG(LS_INFO) << "Message took " << diff << "ms to dispatch. Posted from: "
528 << pmsg->posted_from.ToString();
529 }
530 }
531
532 } // namespace rtc
OLDNEW
« no previous file with comments | « webrtc/base/messagequeue.h ('k') | webrtc/base/messagequeue_unittest.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698