Chromium Code Reviews| Index: webrtc/base/messagequeue.cc |
| diff --git a/webrtc/base/messagequeue.cc b/webrtc/base/messagequeue.cc |
| index da50e2304f7a686af4245df733566528d279d185..5f39dfe1d6243500d6baa24ea80e69e7bf145706 100644 |
| --- a/webrtc/base/messagequeue.cc |
| +++ b/webrtc/base/messagequeue.cc |
| @@ -9,17 +9,14 @@ |
| */ |
| #include <algorithm> |
| +#include "webrtc/base/atomicops.h" |
| #include "webrtc/base/checks.h" |
| #include "webrtc/base/common.h" |
| #include "webrtc/base/logging.h" |
| #include "webrtc/base/messagequeue.h" |
| +#include "webrtc/base/thread.h" |
| #include "webrtc/base/trace_event.h" |
| -namespace { |
| - |
| -enum { MSG_WAKE_MESSAGE_QUEUE = 1 }; |
| -} |
| - |
| namespace rtc { |
| const int kMaxMsgLatency = 150; // 150 ms |
| @@ -41,8 +38,7 @@ bool MessageQueueManager::IsInitialized() { |
| return instance_ != NULL; |
| } |
| -MessageQueueManager::MessageQueueManager() { |
| -} |
| +MessageQueueManager::MessageQueueManager() {} |
| MessageQueueManager::~MessageQueueManager() { |
| } |
| @@ -108,26 +104,36 @@ void MessageQueueManager::ClearInternal(MessageHandler *handler) { |
| (*iter)->Clear(handler); |
| } |
| -void MessageQueueManager::WakeAllMessageQueues() { |
| +void MessageQueueManager::ProcessAllMessageQueues() { |
| if (!instance_) { |
| return; |
| } |
| - return Instance()->WakeAllMessageQueuesInternal(); |
| + return Instance()->ProcessAllMessageQueuesInternal(); |
| } |
| -void MessageQueueManager::WakeAllMessageQueuesInternal() { |
| +void MessageQueueManager::ProcessAllMessageQueuesInternal() { |
| #if CS_DEBUG_CHECKS // CurrentThreadIsOwner returns true by default. |
| ASSERT(!crit_.CurrentThreadIsOwner()); // See note above. |
| #endif |
| - CritScope cs(&crit_); |
| - for (MessageQueue* queue : message_queues_) { |
| - // Posting an arbitrary message will force the message queue to wake up. |
| - queue->Post(this, MSG_WAKE_MESSAGE_QUEUE); |
| + // Post a delayed message at the current time and wait for it to be dispatched |
| + // on all queues, which will ensure that all messages that came before it were |
| + // also dispatched. |
| + volatile int queues_not_done; |
| + auto functor = [&queues_not_done] { AtomicOps::Decrement(&queues_not_done); }; |
| + FunctorMessageHandler<void, decltype(functor)> handler(functor); |
| + { |
| + CritScope cs(&crit_); |
| + queues_not_done = static_cast<int>(message_queues_.size()); |
| + for (MessageQueue* queue : message_queues_) { |
| + queue->PostDelayed(0, &handler); |
| + } |
| + } |
| + // Note: One of the message queues may have been on this thread, which is why |
| + // we can't synchronously wait for queues_not_done to go to 0; we need to |
| + // process messages as well. |
|
tommi
2016/06/02 07:59:01
Is the expectation that when this function finishe
Taylor Brandstetter
2016/06/02 17:19:54
Not that they're empty, but that they've processed
|
| + while (queues_not_done > 0) { |
| + rtc::Thread::Current()->ProcessMessages(0); |
| } |
| -} |
| - |
| -void MessageQueueManager::OnMessage(Message* pmsg) { |
| - RTC_DCHECK(pmsg->message_id == MSG_WAKE_MESSAGE_QUEUE); |
| } |
| //------------------------------------------------------------------ |