OLD | NEW |
---|---|
1 /* | 1 /* |
2 * Copyright 2004 The WebRTC Project Authors. All rights reserved. | 2 * Copyright 2004 The WebRTC Project Authors. All rights reserved. |
3 * | 3 * |
4 * Use of this source code is governed by a BSD-style license | 4 * Use of this source code is governed by a BSD-style license |
5 * that can be found in the LICENSE file in the root of the source | 5 * that can be found in the LICENSE file in the root of the source |
6 * tree. An additional intellectual property rights grant can be found | 6 * tree. An additional intellectual property rights grant can be found |
7 * in the file PATENTS. All contributing project authors may | 7 * in the file PATENTS. All contributing project authors may |
8 * be found in the AUTHORS file in the root of the source tree. | 8 * be found in the AUTHORS file in the root of the source tree. |
9 */ | 9 */ |
10 #include <algorithm> | 10 #include <algorithm> |
(...skipping 128 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
139 } | 139 } |
140 ~ScopedIncrement() override { AtomicOps::Decrement(value_); } | 140 ~ScopedIncrement() override { AtomicOps::Decrement(value_); } |
141 | 141 |
142 private: | 142 private: |
143 volatile int* value_; | 143 volatile int* value_; |
144 }; | 144 }; |
145 | 145 |
146 { | 146 { |
147 DebugNonReentrantCritScope cs(&crit_, &locked_); | 147 DebugNonReentrantCritScope cs(&crit_, &locked_); |
148 for (MessageQueue* queue : message_queues_) { | 148 for (MessageQueue* queue : message_queues_) { |
149 if (queue->IsQuitting()) { | 149 if (!queue->WaitForProcess()) { |
150 // If the queue is quitting, it's done processing messages so it can | |
151 // be ignored. If we tried to post a message to it, it would be dropped. | |
152 continue; | 150 continue; |
153 } | 151 } |
154 queue->PostDelayed(RTC_FROM_HERE, 0, nullptr, MQID_DISPOSE, | 152 queue->PostDelayed(RTC_FROM_HERE, 0, nullptr, MQID_DISPOSE, |
155 new ScopedIncrement(&queues_not_done)); | 153 new ScopedIncrement(&queues_not_done)); |
156 } | 154 } |
157 } | 155 } |
158 // Note: One of the message queues may have been on this thread, which is why | 156 // Note: One of the message queues may have been on this thread, which is why |
159 // we can't synchronously wait for queues_not_done to go to 0; we need to | 157 // we can't synchronously wait for queues_not_done to go to 0; we need to |
160 // process messages as well. | 158 // process messages as well. |
161 while (AtomicOps::AcquireLoad(&queues_not_done) > 0) { | 159 while (AtomicOps::AcquireLoad(&queues_not_done) > 0) { |
162 rtc::Thread::Current()->ProcessMessages(0); | 160 // We have to limit the number of messages that may be processed |
161 // because we may get into an infinite processing loop even after | |
162 // "queues_not_done" is 0. | |
Taylor Brandstetter
2017/02/06 19:23:42
How does that happen? That seems like a deeper pro
| |
163 rtc::Thread::Current()->ProcessMessages(0, 100); | |
163 } | 164 } |
164 } | 165 } |
165 | 166 |
166 //------------------------------------------------------------------ | 167 //------------------------------------------------------------------ |
167 // MessageQueue | 168 // MessageQueue |
168 MessageQueue::MessageQueue(SocketServer* ss, bool init_queue) | 169 MessageQueue::MessageQueue(SocketServer* ss, bool init_queue) |
169 : fPeekKeep_(false), | 170 : fPeekKeep_(false), |
170 dmsgq_next_num_(0), | 171 dmsgq_next_num_(0), |
171 fInitialized_(false), | 172 fInitialized_(false), |
172 fDestroyed_(false), | 173 fDestroyed_(false), |
(...skipping 70 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
243 } | 244 } |
244 | 245 |
245 void MessageQueue::Quit() { | 246 void MessageQueue::Quit() { |
246 AtomicOps::ReleaseStore(&stop_, 1); | 247 AtomicOps::ReleaseStore(&stop_, 1); |
247 WakeUpSocketServer(); | 248 WakeUpSocketServer(); |
248 } | 249 } |
249 | 250 |
250 bool MessageQueue::IsQuitting() { | 251 bool MessageQueue::IsQuitting() { |
251 return AtomicOps::AcquireLoad(&stop_) != 0; | 252 return AtomicOps::AcquireLoad(&stop_) != 0; |
252 } | 253 } |
254 bool MessageQueue::WaitForProcess() { | |
255 // If the queue is quitting, it's done processing messages so it can | |
256 // be ignored. If we tried to post a message to it, it would be dropped. | |
257 return !IsQuitting(); | |
258 } | |
253 | 259 |
254 void MessageQueue::Restart() { | 260 void MessageQueue::Restart() { |
255 AtomicOps::ReleaseStore(&stop_, 0); | 261 AtomicOps::ReleaseStore(&stop_, 0); |
256 } | 262 } |
257 | 263 |
258 bool MessageQueue::Peek(Message *pmsg, int cmsWait) { | 264 bool MessageQueue::Peek(Message *pmsg, int cmsWait) { |
259 if (fPeekKeep_) { | 265 if (fPeekKeep_) { |
260 *pmsg = msgPeek_; | 266 *pmsg = msgPeek_; |
261 return true; | 267 return true; |
262 } | 268 } |
(...skipping 270 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
533 pmsg->phandler->OnMessage(pmsg); | 539 pmsg->phandler->OnMessage(pmsg); |
534 int64_t end_time = TimeMillis(); | 540 int64_t end_time = TimeMillis(); |
535 int64_t diff = TimeDiff(end_time, start_time); | 541 int64_t diff = TimeDiff(end_time, start_time); |
536 if (diff >= kSlowDispatchLoggingThreshold) { | 542 if (diff >= kSlowDispatchLoggingThreshold) { |
537 LOG(LS_INFO) << "Message took " << diff << "ms to dispatch. Posted from: " | 543 LOG(LS_INFO) << "Message took " << diff << "ms to dispatch. Posted from: " |
538 << pmsg->posted_from.ToString(); | 544 << pmsg->posted_from.ToString(); |
539 } | 545 } |
540 } | 546 } |
541 | 547 |
542 } // namespace rtc | 548 } // namespace rtc |
OLD | NEW |