Chromium Code Reviews| OLD | NEW |
|---|---|
| 1 /* | 1 /* |
| 2 * Copyright 2004 The WebRTC Project Authors. All rights reserved. | 2 * Copyright 2004 The WebRTC Project Authors. All rights reserved. |
| 3 * | 3 * |
| 4 * Use of this source code is governed by a BSD-style license | 4 * Use of this source code is governed by a BSD-style license |
| 5 * that can be found in the LICENSE file in the root of the source | 5 * that can be found in the LICENSE file in the root of the source |
| 6 * tree. An additional intellectual property rights grant can be found | 6 * tree. An additional intellectual property rights grant can be found |
| 7 * in the file PATENTS. All contributing project authors may | 7 * in the file PATENTS. All contributing project authors may |
| 8 * be found in the AUTHORS file in the root of the source tree. | 8 * be found in the AUTHORS file in the root of the source tree. |
| 9 */ | 9 */ |
| 10 #include <algorithm> | 10 #include <algorithm> |
| 11 | 11 |
| 12 #include "webrtc/rtc_base/atomicops.h" | 12 #include "webrtc/rtc_base/atomicops.h" |
| 13 #include "webrtc/rtc_base/checks.h" | 13 #include "webrtc/rtc_base/checks.h" |
| 14 #include "webrtc/rtc_base/logging.h" | 14 #include "webrtc/rtc_base/logging.h" |
| 15 #include "webrtc/rtc_base/messagequeue.h" | 15 #include "webrtc/rtc_base/messagequeue.h" |
| 16 #include "webrtc/rtc_base/stringencode.h" | 16 #include "webrtc/rtc_base/stringencode.h" |
| 17 #include "webrtc/rtc_base/thread.h" | 17 #include "webrtc/rtc_base/thread.h" |
| 18 #include "webrtc/rtc_base/trace_event.h" | 18 #include "webrtc/rtc_base/trace_event.h" |
| 19 | 19 |
| 20 namespace rtc { | 20 namespace rtc { |
| 21 namespace { | 21 namespace { |
| 22 | 22 |
| 23 const int kMaxMsgLatency = 150; // 150 ms | 23 const int kMaxMsgLatency = 150; // 150 ms |
| 24 const int kSlowDispatchLoggingThreshold = 50; // 50 ms | 24 const int kSlowDispatchLoggingThreshold = 50; // 50 ms |
| 25 | 25 |
| 26 class SCOPED_LOCKABLE DebugNonReentrantCritScope { | |
| 27 public: | |
| 28 DebugNonReentrantCritScope(const CriticalSection* cs, bool* locked) | |
| 29 EXCLUSIVE_LOCK_FUNCTION(cs) | |
| 30 : cs_(cs), locked_(locked) { | |
| 31 cs_->Enter(); | |
| 32 RTC_DCHECK(!*locked_); | |
| 33 *locked_ = true; | |
| 34 } | |
| 35 | |
| 36 ~DebugNonReentrantCritScope() UNLOCK_FUNCTION() { | |
| 37 *locked_ = false; | |
| 38 cs_->Leave(); | |
| 39 } | |
| 40 | |
| 41 private: | |
| 42 const CriticalSection* const cs_; | |
| 43 bool* locked_; | |
| 44 | |
| 45 RTC_DISALLOW_COPY_AND_ASSIGN(DebugNonReentrantCritScope); | |
| 46 }; | |
|
Taylor Brandstetter
2017/07/05 22:41:47
This still seems like a useful type of helper clas
joachim
2017/07/05 23:03:11
Done (renamed to "MarkProcessingCritScope").
| |
| 47 | |
| 48 class FunctorPostMessageHandler : public MessageHandler { | 26 class FunctorPostMessageHandler : public MessageHandler { |
| 49 public: | 27 public: |
| 50 void OnMessage(Message* msg) override { | 28 void OnMessage(Message* msg) override { |
| 51 RunnableData* data = static_cast<RunnableData*>(msg->pdata); | 29 RunnableData* data = static_cast<RunnableData*>(msg->pdata); |
| 52 data->Run(); | 30 data->Run(); |
| 53 delete data; | 31 delete data; |
| 54 } | 32 } |
| 55 }; | 33 }; |
| 56 | 34 |
| 57 } // namespace | 35 } // namespace |
| 58 | 36 |
| 59 //------------------------------------------------------------------ | 37 //------------------------------------------------------------------ |
| 60 // MessageQueueManager | 38 // MessageQueueManager |
| 61 | 39 |
| 62 MessageQueueManager* MessageQueueManager::instance_ = nullptr; | 40 MessageQueueManager* MessageQueueManager::instance_ = nullptr; |
| 63 | 41 |
| 64 MessageQueueManager* MessageQueueManager::Instance() { | 42 MessageQueueManager* MessageQueueManager::Instance() { |
| 65 // Note: This is not thread safe, but it is first called before threads are | 43 // Note: This is not thread safe, but it is first called before threads are |
| 66 // spawned. | 44 // spawned. |
| 67 if (!instance_) | 45 if (!instance_) |
| 68 instance_ = new MessageQueueManager; | 46 instance_ = new MessageQueueManager; |
| 69 return instance_; | 47 return instance_; |
| 70 } | 48 } |
| 71 | 49 |
| 72 bool MessageQueueManager::IsInitialized() { | 50 bool MessageQueueManager::IsInitialized() { |
| 73 return instance_ != nullptr; | 51 return instance_ != nullptr; |
| 74 } | 52 } |
| 75 | 53 |
| 76 MessageQueueManager::MessageQueueManager() : locked_(false) {} | 54 MessageQueueManager::MessageQueueManager() : processing_(0) {} |
| 77 | 55 |
| 78 MessageQueueManager::~MessageQueueManager() { | 56 MessageQueueManager::~MessageQueueManager() { |
| 79 } | 57 } |
| 80 | 58 |
| 81 void MessageQueueManager::Add(MessageQueue *message_queue) { | 59 void MessageQueueManager::Add(MessageQueue *message_queue) { |
| 82 return Instance()->AddInternal(message_queue); | 60 return Instance()->AddInternal(message_queue); |
| 83 } | 61 } |
| 84 void MessageQueueManager::AddInternal(MessageQueue *message_queue) { | 62 void MessageQueueManager::AddInternal(MessageQueue *message_queue) { |
| 85 DebugNonReentrantCritScope cs(&crit_, &locked_); | 63 CritScope cs(&crit_); |
| 64 // Prevent changes while the list of message queues is processed. | |
| 65 RTC_DCHECK_EQ(processing_, 0); | |
| 86 message_queues_.push_back(message_queue); | 66 message_queues_.push_back(message_queue); |
| 87 } | 67 } |
| 88 | 68 |
| 89 void MessageQueueManager::Remove(MessageQueue *message_queue) { | 69 void MessageQueueManager::Remove(MessageQueue *message_queue) { |
| 90 // If there isn't a message queue manager instance, then there isn't a queue | 70 // If there isn't a message queue manager instance, then there isn't a queue |
| 91 // to remove. | 71 // to remove. |
| 92 if (!instance_) return; | 72 if (!instance_) return; |
| 93 return Instance()->RemoveInternal(message_queue); | 73 return Instance()->RemoveInternal(message_queue); |
| 94 } | 74 } |
| 95 void MessageQueueManager::RemoveInternal(MessageQueue *message_queue) { | 75 void MessageQueueManager::RemoveInternal(MessageQueue *message_queue) { |
| 96 // If this is the last MessageQueue, destroy the manager as well so that | 76 // If this is the last MessageQueue, destroy the manager as well so that |
| 97 // we don't leak this object at program shutdown. As mentioned above, this is | 77 // we don't leak this object at program shutdown. As mentioned above, this is |
| 98 // not thread-safe, but this should only happen at program termination (when | 78 // not thread-safe, but this should only happen at program termination (when |
| 99 // the ThreadManager is destroyed, and threads are no longer active). | 79 // the ThreadManager is destroyed, and threads are no longer active). |
| 100 bool destroy = false; | 80 bool destroy = false; |
| 101 { | 81 { |
| 102 DebugNonReentrantCritScope cs(&crit_, &locked_); | 82 CritScope cs(&crit_); |
| 83 // Prevent changes while the list of message queues is processed. | |
| 84 RTC_DCHECK_EQ(processing_, 0); | |
| 103 std::vector<MessageQueue *>::iterator iter; | 85 std::vector<MessageQueue *>::iterator iter; |
| 104 iter = std::find(message_queues_.begin(), message_queues_.end(), | 86 iter = std::find(message_queues_.begin(), message_queues_.end(), |
| 105 message_queue); | 87 message_queue); |
| 106 if (iter != message_queues_.end()) { | 88 if (iter != message_queues_.end()) { |
| 107 message_queues_.erase(iter); | 89 message_queues_.erase(iter); |
| 108 } | 90 } |
| 109 destroy = message_queues_.empty(); | 91 destroy = message_queues_.empty(); |
| 110 } | 92 } |
| 111 if (destroy) { | 93 if (destroy) { |
| 112 instance_ = nullptr; | 94 instance_ = nullptr; |
| 113 delete this; | 95 delete this; |
| 114 } | 96 } |
| 115 } | 97 } |
| 116 | 98 |
| 117 void MessageQueueManager::Clear(MessageHandler *handler) { | 99 void MessageQueueManager::Clear(MessageHandler *handler) { |
| 118 // If there isn't a message queue manager instance, then there aren't any | 100 // If there isn't a message queue manager instance, then there aren't any |
| 119 // queues to remove this handler from. | 101 // queues to remove this handler from. |
| 120 if (!instance_) return; | 102 if (!instance_) return; |
| 121 return Instance()->ClearInternal(handler); | 103 return Instance()->ClearInternal(handler); |
| 122 } | 104 } |
| 123 void MessageQueueManager::ClearInternal(MessageHandler *handler) { | 105 void MessageQueueManager::ClearInternal(MessageHandler *handler) { |
| 124 DebugNonReentrantCritScope cs(&crit_, &locked_); | 106 CritScope cs(&crit_); |
|
Taylor Brandstetter
2017/07/05 22:41:47
May want to comment here that this allows a delete
joachim
2017/07/05 23:03:11
Done.
| |
| 107 processing_++; | |
| 125 std::vector<MessageQueue *>::iterator iter; | 108 std::vector<MessageQueue *>::iterator iter; |
| 126 for (iter = message_queues_.begin(); iter != message_queues_.end(); iter++) | 109 for (MessageQueue* queue : message_queues_) { |
| 127 (*iter)->Clear(handler); | 110 queue->Clear(handler); |
| 111 } | |
| 112 processing_--; | |
| 128 } | 113 } |
| 129 | 114 |
| 130 void MessageQueueManager::ProcessAllMessageQueues() { | 115 void MessageQueueManager::ProcessAllMessageQueues() { |
| 131 if (!instance_) { | 116 if (!instance_) { |
| 132 return; | 117 return; |
| 133 } | 118 } |
| 134 return Instance()->ProcessAllMessageQueuesInternal(); | 119 return Instance()->ProcessAllMessageQueuesInternal(); |
| 135 } | 120 } |
| 136 | 121 |
| 137 void MessageQueueManager::ProcessAllMessageQueuesInternal() { | 122 void MessageQueueManager::ProcessAllMessageQueuesInternal() { |
| 138 // This works by posting a delayed message at the current time and waiting | 123 // This works by posting a delayed message at the current time and waiting |
| 139 // for it to be dispatched on all queues, which will ensure that all messages | 124 // for it to be dispatched on all queues, which will ensure that all messages |
| 140 // that came before it were also dispatched. | 125 // that came before it were also dispatched. |
| 141 volatile int queues_not_done = 0; | 126 volatile int queues_not_done = 0; |
| 142 | 127 |
| 143 // This class is used so that whether the posted message is processed, or the | 128 // This class is used so that whether the posted message is processed, or the |
| 144 // message queue is simply cleared, queues_not_done gets decremented. | 129 // message queue is simply cleared, queues_not_done gets decremented. |
| 145 class ScopedIncrement : public MessageData { | 130 class ScopedIncrement : public MessageData { |
| 146 public: | 131 public: |
| 147 ScopedIncrement(volatile int* value) : value_(value) { | 132 ScopedIncrement(volatile int* value) : value_(value) { |
| 148 AtomicOps::Increment(value_); | 133 AtomicOps::Increment(value_); |
| 149 } | 134 } |
| 150 ~ScopedIncrement() override { AtomicOps::Decrement(value_); } | 135 ~ScopedIncrement() override { AtomicOps::Decrement(value_); } |
| 151 | 136 |
| 152 private: | 137 private: |
| 153 volatile int* value_; | 138 volatile int* value_; |
| 154 }; | 139 }; |
| 155 | 140 |
| 156 { | 141 { |
| 157 DebugNonReentrantCritScope cs(&crit_, &locked_); | 142 CritScope cs(&crit_); |
| 143 processing_++; | |
| 158 for (MessageQueue* queue : message_queues_) { | 144 for (MessageQueue* queue : message_queues_) { |
| 159 if (!queue->IsProcessingMessages()) { | 145 if (!queue->IsProcessingMessages()) { |
| 160 // If the queue is not processing messages, it can | 146 // If the queue is not processing messages, it can |
| 161 // be ignored. If we tried to post a message to it, it would be dropped | 147 // be ignored. If we tried to post a message to it, it would be dropped |
| 162 // or ignored. | 148 // or ignored. |
| 163 continue; | 149 continue; |
| 164 } | 150 } |
| 165 queue->PostDelayed(RTC_FROM_HERE, 0, nullptr, MQID_DISPOSE, | 151 queue->PostDelayed(RTC_FROM_HERE, 0, nullptr, MQID_DISPOSE, |
| 166 new ScopedIncrement(&queues_not_done)); | 152 new ScopedIncrement(&queues_not_done)); |
| 167 } | 153 } |
| 154 processing_--; | |
| 168 } | 155 } |
| 169 // Note: One of the message queues may have been on this thread, which is why | 156 // Note: One of the message queues may have been on this thread, which is why |
| 170 // we can't synchronously wait for queues_not_done to go to 0; we need to | 157 // we can't synchronously wait for queues_not_done to go to 0; we need to |
| 171 // process messages as well. | 158 // process messages as well. |
| 172 while (AtomicOps::AcquireLoad(&queues_not_done) > 0) { | 159 while (AtomicOps::AcquireLoad(&queues_not_done) > 0) { |
| 173 rtc::Thread::Current()->ProcessMessages(0); | 160 rtc::Thread::Current()->ProcessMessages(0); |
| 174 } | 161 } |
| 175 } | 162 } |
| 176 | 163 |
| 177 //------------------------------------------------------------------ | 164 //------------------------------------------------------------------ |
| (...skipping 363 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 541 | 528 |
| 542 void MessageQueue::PostFunctorInternal(const Location& posted_from, | 529 void MessageQueue::PostFunctorInternal(const Location& posted_from, |
| 543 RunnableData* message_data) { | 530 RunnableData* message_data) { |
| 544 // Use static to ensure it outlives this scope. Safe since | 531 // Use static to ensure it outlives this scope. Safe since |
| 545 // FunctorPostMessageHandler keeps no state. | 532 // FunctorPostMessageHandler keeps no state. |
| 546 static FunctorPostMessageHandler handler; | 533 static FunctorPostMessageHandler handler; |
| 547 Post(posted_from, &handler, 0, message_data); | 534 Post(posted_from, &handler, 0, message_data); |
| 548 } | 535 } |
| 549 | 536 |
| 550 } // namespace rtc | 537 } // namespace rtc |
| OLD | NEW |