OLD | NEW |
1 /* | 1 /* |
2 * Copyright 2004 The WebRTC Project Authors. All rights reserved. | 2 * Copyright 2004 The WebRTC Project Authors. All rights reserved. |
3 * | 3 * |
4 * Use of this source code is governed by a BSD-style license | 4 * Use of this source code is governed by a BSD-style license |
5 * that can be found in the LICENSE file in the root of the source | 5 * that can be found in the LICENSE file in the root of the source |
6 * tree. An additional intellectual property rights grant can be found | 6 * tree. An additional intellectual property rights grant can be found |
7 * in the file PATENTS. All contributing project authors may | 7 * in the file PATENTS. All contributing project authors may |
8 * be found in the AUTHORS file in the root of the source tree. | 8 * be found in the AUTHORS file in the root of the source tree. |
9 */ | 9 */ |
10 #include <algorithm> | 10 #include <algorithm> |
11 | 11 |
12 #include "webrtc/rtc_base/atomicops.h" | 12 #include "webrtc/rtc_base/atomicops.h" |
13 #include "webrtc/rtc_base/checks.h" | 13 #include "webrtc/rtc_base/checks.h" |
14 #include "webrtc/rtc_base/logging.h" | 14 #include "webrtc/rtc_base/logging.h" |
15 #include "webrtc/rtc_base/messagequeue.h" | 15 #include "webrtc/rtc_base/messagequeue.h" |
16 #include "webrtc/rtc_base/stringencode.h" | 16 #include "webrtc/rtc_base/stringencode.h" |
17 #include "webrtc/rtc_base/thread.h" | 17 #include "webrtc/rtc_base/thread.h" |
18 #include "webrtc/rtc_base/trace_event.h" | 18 #include "webrtc/rtc_base/trace_event.h" |
19 | 19 |
20 namespace rtc { | 20 namespace rtc { |
21 namespace { | 21 namespace { |
22 | 22 |
23 const int kMaxMsgLatency = 150; // 150 ms | 23 const int kMaxMsgLatency = 150; // 150 ms |
24 const int kSlowDispatchLoggingThreshold = 50; // 50 ms | 24 const int kSlowDispatchLoggingThreshold = 50; // 50 ms |
25 | 25 |
26 class SCOPED_LOCKABLE DebugNonReentrantCritScope { | 26 class SCOPED_LOCKABLE MarkProcessingCritScope { |
27 public: | 27 public: |
28 DebugNonReentrantCritScope(const CriticalSection* cs, bool* locked) | 28 MarkProcessingCritScope(const CriticalSection* cs, size_t* processing) |
29 EXCLUSIVE_LOCK_FUNCTION(cs) | 29 EXCLUSIVE_LOCK_FUNCTION(cs) |
30 : cs_(cs), locked_(locked) { | 30 : cs_(cs), processing_(processing) { |
31 cs_->Enter(); | 31 cs_->Enter(); |
32 RTC_DCHECK(!*locked_); | 32 *processing_ += 1; |
33 *locked_ = true; | |
34 } | 33 } |
35 | 34 |
36 ~DebugNonReentrantCritScope() UNLOCK_FUNCTION() { | 35 ~MarkProcessingCritScope() UNLOCK_FUNCTION() { |
37 *locked_ = false; | 36 *processing_ -= 1; |
38 cs_->Leave(); | 37 cs_->Leave(); |
39 } | 38 } |
40 | 39 |
41 private: | 40 private: |
42 const CriticalSection* const cs_; | 41 const CriticalSection* const cs_; |
43 bool* locked_; | 42 size_t* processing_; |
44 | 43 |
45 RTC_DISALLOW_COPY_AND_ASSIGN(DebugNonReentrantCritScope); | 44 RTC_DISALLOW_COPY_AND_ASSIGN(MarkProcessingCritScope); |
46 }; | 45 }; |
47 | 46 |
48 class FunctorPostMessageHandler : public MessageHandler { | 47 class FunctorPostMessageHandler : public MessageHandler { |
49 public: | 48 public: |
50 void OnMessage(Message* msg) override { | 49 void OnMessage(Message* msg) override { |
51 RunnableData* data = static_cast<RunnableData*>(msg->pdata); | 50 RunnableData* data = static_cast<RunnableData*>(msg->pdata); |
52 data->Run(); | 51 data->Run(); |
53 delete data; | 52 delete data; |
54 } | 53 } |
55 }; | 54 }; |
(...skipping 10 matching lines...) Expand all Loading... |
66 // spawned. | 65 // spawned. |
67 if (!instance_) | 66 if (!instance_) |
68 instance_ = new MessageQueueManager; | 67 instance_ = new MessageQueueManager; |
69 return instance_; | 68 return instance_; |
70 } | 69 } |
71 | 70 |
72 bool MessageQueueManager::IsInitialized() { | 71 bool MessageQueueManager::IsInitialized() { |
73 return instance_ != nullptr; | 72 return instance_ != nullptr; |
74 } | 73 } |
75 | 74 |
76 MessageQueueManager::MessageQueueManager() : locked_(false) {} | 75 MessageQueueManager::MessageQueueManager() : processing_(0) {} |
77 | 76 |
78 MessageQueueManager::~MessageQueueManager() { | 77 MessageQueueManager::~MessageQueueManager() { |
79 } | 78 } |
80 | 79 |
81 void MessageQueueManager::Add(MessageQueue *message_queue) { | 80 void MessageQueueManager::Add(MessageQueue *message_queue) { |
82 return Instance()->AddInternal(message_queue); | 81 return Instance()->AddInternal(message_queue); |
83 } | 82 } |
84 void MessageQueueManager::AddInternal(MessageQueue *message_queue) { | 83 void MessageQueueManager::AddInternal(MessageQueue *message_queue) { |
85 DebugNonReentrantCritScope cs(&crit_, &locked_); | 84 CritScope cs(&crit_); |
| 85 // Prevent changes while the list of message queues is processed. |
| 86 RTC_DCHECK_EQ(processing_, 0); |
86 message_queues_.push_back(message_queue); | 87 message_queues_.push_back(message_queue); |
87 } | 88 } |
88 | 89 |
89 void MessageQueueManager::Remove(MessageQueue *message_queue) { | 90 void MessageQueueManager::Remove(MessageQueue *message_queue) { |
90 // If there isn't a message queue manager instance, then there isn't a queue | 91 // If there isn't a message queue manager instance, then there isn't a queue |
91 // to remove. | 92 // to remove. |
92 if (!instance_) return; | 93 if (!instance_) return; |
93 return Instance()->RemoveInternal(message_queue); | 94 return Instance()->RemoveInternal(message_queue); |
94 } | 95 } |
95 void MessageQueueManager::RemoveInternal(MessageQueue *message_queue) { | 96 void MessageQueueManager::RemoveInternal(MessageQueue *message_queue) { |
96 // If this is the last MessageQueue, destroy the manager as well so that | 97 // If this is the last MessageQueue, destroy the manager as well so that |
97 // we don't leak this object at program shutdown. As mentioned above, this is | 98 // we don't leak this object at program shutdown. As mentioned above, this is |
98 // not thread-safe, but this should only happen at program termination (when | 99 // not thread-safe, but this should only happen at program termination (when |
99 // the ThreadManager is destroyed, and threads are no longer active). | 100 // the ThreadManager is destroyed, and threads are no longer active). |
100 bool destroy = false; | 101 bool destroy = false; |
101 { | 102 { |
102 DebugNonReentrantCritScope cs(&crit_, &locked_); | 103 CritScope cs(&crit_); |
| 104 // Prevent changes while the list of message queues is processed. |
| 105 RTC_DCHECK_EQ(processing_, 0); |
103 std::vector<MessageQueue *>::iterator iter; | 106 std::vector<MessageQueue *>::iterator iter; |
104 iter = std::find(message_queues_.begin(), message_queues_.end(), | 107 iter = std::find(message_queues_.begin(), message_queues_.end(), |
105 message_queue); | 108 message_queue); |
106 if (iter != message_queues_.end()) { | 109 if (iter != message_queues_.end()) { |
107 message_queues_.erase(iter); | 110 message_queues_.erase(iter); |
108 } | 111 } |
109 destroy = message_queues_.empty(); | 112 destroy = message_queues_.empty(); |
110 } | 113 } |
111 if (destroy) { | 114 if (destroy) { |
112 instance_ = nullptr; | 115 instance_ = nullptr; |
113 delete this; | 116 delete this; |
114 } | 117 } |
115 } | 118 } |
116 | 119 |
117 void MessageQueueManager::Clear(MessageHandler *handler) { | 120 void MessageQueueManager::Clear(MessageHandler *handler) { |
118 // If there isn't a message queue manager instance, then there aren't any | 121 // If there isn't a message queue manager instance, then there aren't any |
119 // queues to remove this handler from. | 122 // queues to remove this handler from. |
120 if (!instance_) return; | 123 if (!instance_) return; |
121 return Instance()->ClearInternal(handler); | 124 return Instance()->ClearInternal(handler); |
122 } | 125 } |
123 void MessageQueueManager::ClearInternal(MessageHandler *handler) { | 126 void MessageQueueManager::ClearInternal(MessageHandler *handler) { |
124 DebugNonReentrantCritScope cs(&crit_, &locked_); | 127 // Deleted objects may cause re-entrant calls to ClearInternal. This is |
| 128 // allowed as the list of message queues does not change while queues are |
| 129 // cleared. |
| 130 MarkProcessingCritScope cs(&crit_, &processing_); |
125 std::vector<MessageQueue *>::iterator iter; | 131 std::vector<MessageQueue *>::iterator iter; |
126 for (iter = message_queues_.begin(); iter != message_queues_.end(); iter++) | 132 for (MessageQueue* queue : message_queues_) { |
127 (*iter)->Clear(handler); | 133 queue->Clear(handler); |
| 134 } |
128 } | 135 } |
129 | 136 |
130 void MessageQueueManager::ProcessAllMessageQueues() { | 137 void MessageQueueManager::ProcessAllMessageQueues() { |
131 if (!instance_) { | 138 if (!instance_) { |
132 return; | 139 return; |
133 } | 140 } |
134 return Instance()->ProcessAllMessageQueuesInternal(); | 141 return Instance()->ProcessAllMessageQueuesInternal(); |
135 } | 142 } |
136 | 143 |
137 void MessageQueueManager::ProcessAllMessageQueuesInternal() { | 144 void MessageQueueManager::ProcessAllMessageQueuesInternal() { |
138 // This works by posting a delayed message at the current time and waiting | 145 // This works by posting a delayed message at the current time and waiting |
139 // for it to be dispatched on all queues, which will ensure that all messages | 146 // for it to be dispatched on all queues, which will ensure that all messages |
140 // that came before it were also dispatched. | 147 // that came before it were also dispatched. |
141 volatile int queues_not_done = 0; | 148 volatile int queues_not_done = 0; |
142 | 149 |
143 // This class is used so that whether the posted message is processed, or the | 150 // This class is used so that whether the posted message is processed, or the |
144 // message queue is simply cleared, queues_not_done gets decremented. | 151 // message queue is simply cleared, queues_not_done gets decremented. |
145 class ScopedIncrement : public MessageData { | 152 class ScopedIncrement : public MessageData { |
146 public: | 153 public: |
147 ScopedIncrement(volatile int* value) : value_(value) { | 154 ScopedIncrement(volatile int* value) : value_(value) { |
148 AtomicOps::Increment(value_); | 155 AtomicOps::Increment(value_); |
149 } | 156 } |
150 ~ScopedIncrement() override { AtomicOps::Decrement(value_); } | 157 ~ScopedIncrement() override { AtomicOps::Decrement(value_); } |
151 | 158 |
152 private: | 159 private: |
153 volatile int* value_; | 160 volatile int* value_; |
154 }; | 161 }; |
155 | 162 |
156 { | 163 { |
157 DebugNonReentrantCritScope cs(&crit_, &locked_); | 164 MarkProcessingCritScope cs(&crit_, &processing_); |
158 for (MessageQueue* queue : message_queues_) { | 165 for (MessageQueue* queue : message_queues_) { |
159 if (!queue->IsProcessingMessages()) { | 166 if (!queue->IsProcessingMessages()) { |
160 // If the queue is not processing messages, it can | 167 // If the queue is not processing messages, it can |
161 // be ignored. If we tried to post a message to it, it would be dropped | 168 // be ignored. If we tried to post a message to it, it would be dropped |
162 // or ignored. | 169 // or ignored. |
163 continue; | 170 continue; |
164 } | 171 } |
165 queue->PostDelayed(RTC_FROM_HERE, 0, nullptr, MQID_DISPOSE, | 172 queue->PostDelayed(RTC_FROM_HERE, 0, nullptr, MQID_DISPOSE, |
166 new ScopedIncrement(&queues_not_done)); | 173 new ScopedIncrement(&queues_not_done)); |
167 } | 174 } |
(...skipping 373 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
541 | 548 |
542 void MessageQueue::PostFunctorInternal(const Location& posted_from, | 549 void MessageQueue::PostFunctorInternal(const Location& posted_from, |
543 RunnableData* message_data) { | 550 RunnableData* message_data) { |
544 // Use static to ensure it outlives this scope. Safe since | 551 // Use static to ensure it outlives this scope. Safe since |
545 // FunctorPostMessageHandler keeps no state. | 552 // FunctorPostMessageHandler keeps no state. |
546 static FunctorPostMessageHandler handler; | 553 static FunctorPostMessageHandler handler; |
547 Post(posted_from, &handler, 0, message_data); | 554 Post(posted_from, &handler, 0, message_data); |
548 } | 555 } |
549 | 556 |
550 } // namespace rtc | 557 } // namespace rtc |
OLD | NEW |