OLD | NEW |
---|---|
1 /* | 1 /* |
2 * Copyright 2004 The WebRTC Project Authors. All rights reserved. | 2 * Copyright 2004 The WebRTC Project Authors. All rights reserved. |
3 * | 3 * |
4 * Use of this source code is governed by a BSD-style license | 4 * Use of this source code is governed by a BSD-style license |
5 * that can be found in the LICENSE file in the root of the source | 5 * that can be found in the LICENSE file in the root of the source |
6 * tree. An additional intellectual property rights grant can be found | 6 * tree. An additional intellectual property rights grant can be found |
7 * in the file PATENTS. All contributing project authors may | 7 * in the file PATENTS. All contributing project authors may |
8 * be found in the AUTHORS file in the root of the source tree. | 8 * be found in the AUTHORS file in the root of the source tree. |
9 */ | 9 */ |
10 #include <algorithm> | 10 #include <algorithm> |
(...skipping 108 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
119 } | 119 } |
120 | 120 |
121 void MessageQueueManager::ProcessAllMessageQueues() { | 121 void MessageQueueManager::ProcessAllMessageQueues() { |
122 if (!instance_) { | 122 if (!instance_) { |
123 return; | 123 return; |
124 } | 124 } |
125 return Instance()->ProcessAllMessageQueuesInternal(); | 125 return Instance()->ProcessAllMessageQueuesInternal(); |
126 } | 126 } |
127 | 127 |
128 void MessageQueueManager::ProcessAllMessageQueuesInternal() { | 128 void MessageQueueManager::ProcessAllMessageQueuesInternal() { |
129 // Post a delayed message at the current time and wait for it to be dispatched | 129 // This works by posting a delayed message at the current time and waiting |
130 // on all queues, which will ensure that all messages that came before it were | 130 // for it to be dispatched on all queues, which will ensure that all messages |
131 // also dispatched. | 131 // that came before it were also dispatched. |
132 volatile int queues_not_done; | 132 volatile int queues_not_done = 0; |
133 auto functor = [&queues_not_done] { AtomicOps::Decrement(&queues_not_done); }; | 133 |
134 FunctorMessageHandler<void, decltype(functor)> handler(functor); | 134 // This class is used so that whether the posted message is processed, or the |
135 // message queue is simply cleared, queues_not_done gets decremented. | |
136 class AutoDecrementer : public MessageData { | |
honghaiz3
2016/09/09 17:59:02
Why do you replace the original Message handler wi
Taylor Brandstetter
2016/09/09 23:33:12
I tried to explain in the comment above. It's need
| |
137 public: | |
138 AutoDecrementer(volatile int* value) : value_(value) {} | |
pthatcher1
2016/09/09 17:11:34
Why not also have the class increment in the const
Taylor Brandstetter
2016/09/09 17:28:32
Sure, might as well.
| |
139 ~AutoDecrementer() override { AtomicOps::Decrement(value_); } | |
140 | |
141 private: | |
142 volatile int* value_; | |
143 }; | |
144 | |
145 NoOpMessageHandler handler; | |
135 { | 146 { |
136 DebugNonReentrantCritScope cs(&crit_, &locked_); | 147 DebugNonReentrantCritScope cs(&crit_, &locked_); |
137 queues_not_done = static_cast<int>(message_queues_.size()); | |
138 for (MessageQueue* queue : message_queues_) { | 148 for (MessageQueue* queue : message_queues_) { |
139 queue->PostDelayed(RTC_FROM_HERE, 0, &handler); | 149 if (queue->IsQuitting()) { |
150 // If the queue is quitting, it's done processing messages so it can | |
151 // be ignored. If we tried to post a message to it, it would be dropped. | |
152 continue; | |
153 } | |
154 AtomicOps::Increment(&queues_not_done); | |
155 queue->PostDelayed(RTC_FROM_HERE, 0, &handler, 0, | |
156 new AutoDecrementer(&queues_not_done)); | |
140 } | 157 } |
141 } | 158 } |
142 // Note: One of the message queues may have been on this thread, which is why | 159 // Note: One of the message queues may have been on this thread, which is why |
143 // we can't synchronously wait for queues_not_done to go to 0; we need to | 160 // we can't synchronously wait for queues_not_done to go to 0; we need to |
144 // process messages as well. | 161 // process messages as well. |
145 while (AtomicOps::AcquireLoad(&queues_not_done) > 0) { | 162 while (AtomicOps::AcquireLoad(&queues_not_done) > 0) { |
146 rtc::Thread::Current()->ProcessMessages(0); | 163 rtc::Thread::Current()->ProcessMessages(0); |
147 } | 164 } |
148 } | 165 } |
149 | 166 |
(...skipping 366 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
516 pmsg->phandler->OnMessage(pmsg); | 533 pmsg->phandler->OnMessage(pmsg); |
517 int64_t end_time = TimeMillis(); | 534 int64_t end_time = TimeMillis(); |
518 int64_t diff = TimeDiff(end_time, start_time); | 535 int64_t diff = TimeDiff(end_time, start_time); |
519 if (diff >= kSlowDispatchLoggingThreshold) { | 536 if (diff >= kSlowDispatchLoggingThreshold) { |
520 LOG(LS_INFO) << "Message took " << diff << "ms to dispatch. Posted from: " | 537 LOG(LS_INFO) << "Message took " << diff << "ms to dispatch. Posted from: " |
521 << pmsg->posted_from.ToString(); | 538 << pmsg->posted_from.ToString(); |
522 } | 539 } |
523 } | 540 } |
524 | 541 |
525 } // namespace rtc | 542 } // namespace rtc |
OLD | NEW |