| Index: webrtc/base/messagequeue.cc
|
| diff --git a/webrtc/base/messagequeue.cc b/webrtc/base/messagequeue.cc
|
| index da50e2304f7a686af4245df733566528d279d185..4c2331bfe603c17cfd5c5376e85ef591bbcacb4d 100644
|
| --- a/webrtc/base/messagequeue.cc
|
| +++ b/webrtc/base/messagequeue.cc
|
| @@ -9,17 +9,14 @@
|
| */
|
| #include <algorithm>
|
|
|
| +#include "webrtc/base/atomicops.h"
|
| #include "webrtc/base/checks.h"
|
| #include "webrtc/base/common.h"
|
| #include "webrtc/base/logging.h"
|
| #include "webrtc/base/messagequeue.h"
|
| +#include "webrtc/base/thread.h"
|
| #include "webrtc/base/trace_event.h"
|
|
|
| -namespace {
|
| -
|
| -enum { MSG_WAKE_MESSAGE_QUEUE = 1 };
|
| -}
|
| -
|
| namespace rtc {
|
|
|
| const int kMaxMsgLatency = 150; // 150 ms
|
| @@ -41,8 +38,7 @@ bool MessageQueueManager::IsInitialized() {
|
| return instance_ != NULL;
|
| }
|
|
|
| -MessageQueueManager::MessageQueueManager() {
|
| -}
|
| +MessageQueueManager::MessageQueueManager() {}
|
|
|
| MessageQueueManager::~MessageQueueManager() {
|
| }
|
| @@ -108,26 +104,36 @@ void MessageQueueManager::ClearInternal(MessageHandler *handler) {
|
| (*iter)->Clear(handler);
|
| }
|
|
|
| -void MessageQueueManager::WakeAllMessageQueues() {
|
| +void MessageQueueManager::ProcessAllMessageQueues() {
|
| if (!instance_) {
|
| return;
|
| }
|
| - return Instance()->WakeAllMessageQueuesInternal();
|
| + return Instance()->ProcessAllMessageQueuesInternal();
|
| }
|
|
|
| -void MessageQueueManager::WakeAllMessageQueuesInternal() {
|
| +void MessageQueueManager::ProcessAllMessageQueuesInternal() {
|
| #if CS_DEBUG_CHECKS // CurrentThreadIsOwner returns true by default.
|
| ASSERT(!crit_.CurrentThreadIsOwner()); // See note above.
|
| #endif
|
| - CritScope cs(&crit_);
|
| - for (MessageQueue* queue : message_queues_) {
|
| - // Posting an arbitrary message will force the message queue to wake up.
|
| - queue->Post(this, MSG_WAKE_MESSAGE_QUEUE);
|
| + // Post a delayed message at the current time and wait for it to be dispatched
|
| + // on all queues, which will ensure that all messages that came before it were
|
| + // also dispatched.
|
| + volatile int queues_not_done;
|
| + auto functor = [&queues_not_done] { AtomicOps::Decrement(&queues_not_done); };
|
| + FunctorMessageHandler<void, decltype(functor)> handler(functor);
|
| + {
|
| + CritScope cs(&crit_);
|
| + queues_not_done = static_cast<int>(message_queues_.size());
|
| + for (MessageQueue* queue : message_queues_) {
|
| + queue->PostDelayed(0, &handler);
|
| + }
|
| + }
|
| + // Note: One of the message queues may have been on this thread, which is why
|
| + // we can't synchronously wait for queues_not_done to go to 0; we need to
|
| + // process messages as well.
|
| + while (AtomicOps::AcquireLoad(&queues_not_done) > 0) {
|
| + rtc::Thread::Current()->ProcessMessages(0);
|
| }
|
| -}
|
| -
|
| -void MessageQueueManager::OnMessage(Message* pmsg) {
|
| - RTC_DCHECK(pmsg->message_id == MSG_WAKE_MESSAGE_QUEUE);
|
| }
|
|
|
| //------------------------------------------------------------------
|
|
|