| OLD | NEW |
| 1 /* | 1 /* |
| 2 * Copyright 2004 The WebRTC Project Authors. All rights reserved. | 2 * Copyright 2004 The WebRTC Project Authors. All rights reserved. |
| 3 * | 3 * |
| 4 * Use of this source code is governed by a BSD-style license | 4 * Use of this source code is governed by a BSD-style license |
| 5 * that can be found in the LICENSE file in the root of the source | 5 * that can be found in the LICENSE file in the root of the source |
| 6 * tree. An additional intellectual property rights grant can be found | 6 * tree. An additional intellectual property rights grant can be found |
| 7 * in the file PATENTS. All contributing project authors may | 7 * in the file PATENTS. All contributing project authors may |
| 8 * be found in the AUTHORS file in the root of the source tree. | 8 * be found in the AUTHORS file in the root of the source tree. |
| 9 */ | 9 */ |
| 10 #include <algorithm> | 10 #include <algorithm> |
| 11 | 11 |
| 12 #include "webrtc/rtc_base/atomicops.h" | 12 #include "webrtc/rtc_base/atomicops.h" |
| 13 #include "webrtc/rtc_base/checks.h" | 13 #include "webrtc/rtc_base/checks.h" |
| 14 #include "webrtc/rtc_base/logging.h" | 14 #include "webrtc/rtc_base/logging.h" |
| 15 #include "webrtc/rtc_base/messagequeue.h" | 15 #include "webrtc/rtc_base/messagequeue.h" |
| 16 #include "webrtc/rtc_base/stringencode.h" | 16 #include "webrtc/rtc_base/stringencode.h" |
| 17 #include "webrtc/rtc_base/thread.h" | 17 #include "webrtc/rtc_base/thread.h" |
| 18 #include "webrtc/rtc_base/trace_event.h" | 18 #include "webrtc/rtc_base/trace_event.h" |
| 19 | 19 |
| 20 namespace rtc { | 20 namespace rtc { |
| 21 namespace { | 21 namespace { |
| 22 | 22 |
| 23 const int kMaxMsgLatency = 150; // 150 ms | 23 const int kMaxMsgLatency = 150; // 150 ms |
| 24 const int kSlowDispatchLoggingThreshold = 50; // 50 ms | 24 const int kSlowDispatchLoggingThreshold = 50; // 50 ms |
| 25 | 25 |
| 26 class SCOPED_LOCKABLE DebugNonReentrantCritScope { | 26 class SCOPED_LOCKABLE MarkProcessingCritScope { |
| 27 public: | 27 public: |
| 28 DebugNonReentrantCritScope(const CriticalSection* cs, bool* locked) | 28 MarkProcessingCritScope(const CriticalSection* cs, size_t* processing) |
| 29 EXCLUSIVE_LOCK_FUNCTION(cs) | 29 EXCLUSIVE_LOCK_FUNCTION(cs) |
| 30 : cs_(cs), locked_(locked) { | 30 : cs_(cs), processing_(processing) { |
| 31 cs_->Enter(); | 31 cs_->Enter(); |
| 32 RTC_DCHECK(!*locked_); | 32 *processing_ += 1; |
| 33 *locked_ = true; | |
| 34 } | 33 } |
| 35 | 34 |
| 36 ~DebugNonReentrantCritScope() UNLOCK_FUNCTION() { | 35 ~MarkProcessingCritScope() UNLOCK_FUNCTION() { |
| 37 *locked_ = false; | 36 *processing_ -= 1; |
| 38 cs_->Leave(); | 37 cs_->Leave(); |
| 39 } | 38 } |
| 40 | 39 |
| 41 private: | 40 private: |
| 42 const CriticalSection* const cs_; | 41 const CriticalSection* const cs_; |
| 43 bool* locked_; | 42 size_t* processing_; |
| 44 | 43 |
| 45 RTC_DISALLOW_COPY_AND_ASSIGN(DebugNonReentrantCritScope); | 44 RTC_DISALLOW_COPY_AND_ASSIGN(MarkProcessingCritScope); |
| 46 }; | 45 }; |
| 47 | 46 |
| 48 class FunctorPostMessageHandler : public MessageHandler { | 47 class FunctorPostMessageHandler : public MessageHandler { |
| 49 public: | 48 public: |
| 50 void OnMessage(Message* msg) override { | 49 void OnMessage(Message* msg) override { |
| 51 RunnableData* data = static_cast<RunnableData*>(msg->pdata); | 50 RunnableData* data = static_cast<RunnableData*>(msg->pdata); |
| 52 data->Run(); | 51 data->Run(); |
| 53 delete data; | 52 delete data; |
| 54 } | 53 } |
| 55 }; | 54 }; |
| (...skipping 10 matching lines...) Expand all Loading... |
| 66 // spawned. | 65 // spawned. |
| 67 if (!instance_) | 66 if (!instance_) |
| 68 instance_ = new MessageQueueManager; | 67 instance_ = new MessageQueueManager; |
| 69 return instance_; | 68 return instance_; |
| 70 } | 69 } |
| 71 | 70 |
| 72 bool MessageQueueManager::IsInitialized() { | 71 bool MessageQueueManager::IsInitialized() { |
| 73 return instance_ != nullptr; | 72 return instance_ != nullptr; |
| 74 } | 73 } |
| 75 | 74 |
| 76 MessageQueueManager::MessageQueueManager() : locked_(false) {} | 75 MessageQueueManager::MessageQueueManager() : processing_(0) {} |
| 77 | 76 |
| 78 MessageQueueManager::~MessageQueueManager() { | 77 MessageQueueManager::~MessageQueueManager() { |
| 79 } | 78 } |
| 80 | 79 |
| 81 void MessageQueueManager::Add(MessageQueue *message_queue) { | 80 void MessageQueueManager::Add(MessageQueue *message_queue) { |
| 82 return Instance()->AddInternal(message_queue); | 81 return Instance()->AddInternal(message_queue); |
| 83 } | 82 } |
| 84 void MessageQueueManager::AddInternal(MessageQueue *message_queue) { | 83 void MessageQueueManager::AddInternal(MessageQueue *message_queue) { |
| 85 DebugNonReentrantCritScope cs(&crit_, &locked_); | 84 CritScope cs(&crit_); |
| 85 // Prevent changes while the list of message queues is processed. |
| 86 RTC_DCHECK_EQ(processing_, 0); |
| 86 message_queues_.push_back(message_queue); | 87 message_queues_.push_back(message_queue); |
| 87 } | 88 } |
| 88 | 89 |
| 89 void MessageQueueManager::Remove(MessageQueue *message_queue) { | 90 void MessageQueueManager::Remove(MessageQueue *message_queue) { |
| 90 // If there isn't a message queue manager instance, then there isn't a queue | 91 // If there isn't a message queue manager instance, then there isn't a queue |
| 91 // to remove. | 92 // to remove. |
| 92 if (!instance_) return; | 93 if (!instance_) return; |
| 93 return Instance()->RemoveInternal(message_queue); | 94 return Instance()->RemoveInternal(message_queue); |
| 94 } | 95 } |
| 95 void MessageQueueManager::RemoveInternal(MessageQueue *message_queue) { | 96 void MessageQueueManager::RemoveInternal(MessageQueue *message_queue) { |
| 96 // If this is the last MessageQueue, destroy the manager as well so that | 97 // If this is the last MessageQueue, destroy the manager as well so that |
| 97 // we don't leak this object at program shutdown. As mentioned above, this is | 98 // we don't leak this object at program shutdown. As mentioned above, this is |
| 98 // not thread-safe, but this should only happen at program termination (when | 99 // not thread-safe, but this should only happen at program termination (when |
| 99 // the ThreadManager is destroyed, and threads are no longer active). | 100 // the ThreadManager is destroyed, and threads are no longer active). |
| 100 bool destroy = false; | 101 bool destroy = false; |
| 101 { | 102 { |
| 102 DebugNonReentrantCritScope cs(&crit_, &locked_); | 103 CritScope cs(&crit_); |
| 104 // Prevent changes while the list of message queues is processed. |
| 105 RTC_DCHECK_EQ(processing_, 0); |
| 103 std::vector<MessageQueue *>::iterator iter; | 106 std::vector<MessageQueue *>::iterator iter; |
| 104 iter = std::find(message_queues_.begin(), message_queues_.end(), | 107 iter = std::find(message_queues_.begin(), message_queues_.end(), |
| 105 message_queue); | 108 message_queue); |
| 106 if (iter != message_queues_.end()) { | 109 if (iter != message_queues_.end()) { |
| 107 message_queues_.erase(iter); | 110 message_queues_.erase(iter); |
| 108 } | 111 } |
| 109 destroy = message_queues_.empty(); | 112 destroy = message_queues_.empty(); |
| 110 } | 113 } |
| 111 if (destroy) { | 114 if (destroy) { |
| 112 instance_ = nullptr; | 115 instance_ = nullptr; |
| 113 delete this; | 116 delete this; |
| 114 } | 117 } |
| 115 } | 118 } |
| 116 | 119 |
| 117 void MessageQueueManager::Clear(MessageHandler *handler) { | 120 void MessageQueueManager::Clear(MessageHandler *handler) { |
| 118 // If there isn't a message queue manager instance, then there aren't any | 121 // If there isn't a message queue manager instance, then there aren't any |
| 119 // queues to remove this handler from. | 122 // queues to remove this handler from. |
| 120 if (!instance_) return; | 123 if (!instance_) return; |
| 121 return Instance()->ClearInternal(handler); | 124 return Instance()->ClearInternal(handler); |
| 122 } | 125 } |
| 123 void MessageQueueManager::ClearInternal(MessageHandler *handler) { | 126 void MessageQueueManager::ClearInternal(MessageHandler *handler) { |
| 124 DebugNonReentrantCritScope cs(&crit_, &locked_); | 127 // Deleted objects may cause re-entrant calls to ClearInternal. This is |
| 128 // allowed as the list of message queues does not change while queues are |
| 129 // cleared. |
| 130 MarkProcessingCritScope cs(&crit_, &processing_); |
| 125 std::vector<MessageQueue *>::iterator iter; | 131 std::vector<MessageQueue *>::iterator iter; |
| 126 for (iter = message_queues_.begin(); iter != message_queues_.end(); iter++) | 132 for (MessageQueue* queue : message_queues_) { |
| 127 (*iter)->Clear(handler); | 133 queue->Clear(handler); |
| 134 } |
| 128 } | 135 } |
| 129 | 136 |
| 130 void MessageQueueManager::ProcessAllMessageQueues() { | 137 void MessageQueueManager::ProcessAllMessageQueues() { |
| 131 if (!instance_) { | 138 if (!instance_) { |
| 132 return; | 139 return; |
| 133 } | 140 } |
| 134 return Instance()->ProcessAllMessageQueuesInternal(); | 141 return Instance()->ProcessAllMessageQueuesInternal(); |
| 135 } | 142 } |
| 136 | 143 |
| 137 void MessageQueueManager::ProcessAllMessageQueuesInternal() { | 144 void MessageQueueManager::ProcessAllMessageQueuesInternal() { |
| 138 // This works by posting a delayed message at the current time and waiting | 145 // This works by posting a delayed message at the current time and waiting |
| 139 // for it to be dispatched on all queues, which will ensure that all messages | 146 // for it to be dispatched on all queues, which will ensure that all messages |
| 140 // that came before it were also dispatched. | 147 // that came before it were also dispatched. |
| 141 volatile int queues_not_done = 0; | 148 volatile int queues_not_done = 0; |
| 142 | 149 |
| 143 // This class is used so that whether the posted message is processed, or the | 150 // This class is used so that whether the posted message is processed, or the |
| 144 // message queue is simply cleared, queues_not_done gets decremented. | 151 // message queue is simply cleared, queues_not_done gets decremented. |
| 145 class ScopedIncrement : public MessageData { | 152 class ScopedIncrement : public MessageData { |
| 146 public: | 153 public: |
| 147 ScopedIncrement(volatile int* value) : value_(value) { | 154 ScopedIncrement(volatile int* value) : value_(value) { |
| 148 AtomicOps::Increment(value_); | 155 AtomicOps::Increment(value_); |
| 149 } | 156 } |
| 150 ~ScopedIncrement() override { AtomicOps::Decrement(value_); } | 157 ~ScopedIncrement() override { AtomicOps::Decrement(value_); } |
| 151 | 158 |
| 152 private: | 159 private: |
| 153 volatile int* value_; | 160 volatile int* value_; |
| 154 }; | 161 }; |
| 155 | 162 |
| 156 { | 163 { |
| 157 DebugNonReentrantCritScope cs(&crit_, &locked_); | 164 MarkProcessingCritScope cs(&crit_, &processing_); |
| 158 for (MessageQueue* queue : message_queues_) { | 165 for (MessageQueue* queue : message_queues_) { |
| 159 if (!queue->IsProcessingMessages()) { | 166 if (!queue->IsProcessingMessages()) { |
| 160 // If the queue is not processing messages, it can | 167 // If the queue is not processing messages, it can |
| 161 // be ignored. If we tried to post a message to it, it would be dropped | 168 // be ignored. If we tried to post a message to it, it would be dropped |
| 162 // or ignored. | 169 // or ignored. |
| 163 continue; | 170 continue; |
| 164 } | 171 } |
| 165 queue->PostDelayed(RTC_FROM_HERE, 0, nullptr, MQID_DISPOSE, | 172 queue->PostDelayed(RTC_FROM_HERE, 0, nullptr, MQID_DISPOSE, |
| 166 new ScopedIncrement(&queues_not_done)); | 173 new ScopedIncrement(&queues_not_done)); |
| 167 } | 174 } |
| (...skipping 373 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 541 | 548 |
| 542 void MessageQueue::PostFunctorInternal(const Location& posted_from, | 549 void MessageQueue::PostFunctorInternal(const Location& posted_from, |
| 543 RunnableData* message_data) { | 550 RunnableData* message_data) { |
| 544 // Use static to ensure it outlives this scope. Safe since | 551 // Use static to ensure it outlives this scope. Safe since |
| 545 // FunctorPostMessageHandler keeps no state. | 552 // FunctorPostMessageHandler keeps no state. |
| 546 static FunctorPostMessageHandler handler; | 553 static FunctorPostMessageHandler handler; |
| 547 Post(posted_from, &handler, 0, message_data); | 554 Post(posted_from, &handler, 0, message_data); |
| 548 } | 555 } |
| 549 | 556 |
| 550 } // namespace rtc | 557 } // namespace rtc |
| OLD | NEW |