Index: webrtc/base/messagequeue.cc |
diff --git a/webrtc/base/messagequeue.cc b/webrtc/base/messagequeue.cc |
index da50e2304f7a686af4245df733566528d279d185..4c2331bfe603c17cfd5c5376e85ef591bbcacb4d 100644 |
--- a/webrtc/base/messagequeue.cc |
+++ b/webrtc/base/messagequeue.cc |
@@ -9,17 +9,14 @@ |
*/ |
#include <algorithm> |
+#include "webrtc/base/atomicops.h" |
#include "webrtc/base/checks.h" |
#include "webrtc/base/common.h" |
#include "webrtc/base/logging.h" |
#include "webrtc/base/messagequeue.h" |
+#include "webrtc/base/thread.h" |
#include "webrtc/base/trace_event.h" |
-namespace { |
- |
-enum { MSG_WAKE_MESSAGE_QUEUE = 1 }; |
-} |
- |
namespace rtc { |
const int kMaxMsgLatency = 150; // 150 ms |
@@ -41,8 +38,7 @@ bool MessageQueueManager::IsInitialized() { |
return instance_ != NULL; |
} |
-MessageQueueManager::MessageQueueManager() { |
-} |
+MessageQueueManager::MessageQueueManager() {} |
MessageQueueManager::~MessageQueueManager() { |
} |
@@ -108,26 +104,36 @@ void MessageQueueManager::ClearInternal(MessageHandler *handler) { |
(*iter)->Clear(handler); |
} |
-void MessageQueueManager::WakeAllMessageQueues() { |
+void MessageQueueManager::ProcessAllMessageQueues() { |
if (!instance_) { |
return; |
} |
- return Instance()->WakeAllMessageQueuesInternal(); |
+ return Instance()->ProcessAllMessageQueuesInternal(); |
} |
-void MessageQueueManager::WakeAllMessageQueuesInternal() { |
+void MessageQueueManager::ProcessAllMessageQueuesInternal() { |
#if CS_DEBUG_CHECKS // CurrentThreadIsOwner returns true by default. |
ASSERT(!crit_.CurrentThreadIsOwner()); // See note above. |
#endif |
- CritScope cs(&crit_); |
- for (MessageQueue* queue : message_queues_) { |
- // Posting an arbitrary message will force the message queue to wake up. |
- queue->Post(this, MSG_WAKE_MESSAGE_QUEUE); |
+ // Post a delayed message at the current time and wait for it to be dispatched |
+ // on all queues, which will ensure that all messages that came before it were |
+ // also dispatched. |
+ volatile int queues_not_done; |
+ auto functor = [&queues_not_done] { AtomicOps::Decrement(&queues_not_done); }; |
+ FunctorMessageHandler<void, decltype(functor)> handler(functor); |
+ { |
+ CritScope cs(&crit_); |
+ queues_not_done = static_cast<int>(message_queues_.size()); |
+ for (MessageQueue* queue : message_queues_) { |
+ queue->PostDelayed(0, &handler); |
+ } |
+ } |
+ // Note: One of the message queues may have been on this thread, which is why |
+ // we can't synchronously wait for queues_not_done to go to 0; we need to |
+ // process messages as well. |
+ while (AtomicOps::AcquireLoad(&queues_not_done) > 0) { |
+ rtc::Thread::Current()->ProcessMessages(0); |
} |
-} |
- |
-void MessageQueueManager::OnMessage(Message* pmsg) { |
- RTC_DCHECK(pmsg->message_id == MSG_WAKE_MESSAGE_QUEUE); |
} |
//------------------------------------------------------------------ |