OLD | NEW |
---|---|
1 /* | 1 /* |
2 * Copyright 2004 The WebRTC Project Authors. All rights reserved. | 2 * Copyright 2004 The WebRTC Project Authors. All rights reserved. |
3 * | 3 * |
4 * Use of this source code is governed by a BSD-style license | 4 * Use of this source code is governed by a BSD-style license |
5 * that can be found in the LICENSE file in the root of the source | 5 * that can be found in the LICENSE file in the root of the source |
6 * tree. An additional intellectual property rights grant can be found | 6 * tree. An additional intellectual property rights grant can be found |
7 * in the file PATENTS. All contributing project authors may | 7 * in the file PATENTS. All contributing project authors may |
8 * be found in the AUTHORS file in the root of the source tree. | 8 * be found in the AUTHORS file in the root of the source tree. |
9 */ | 9 */ |
10 #include <algorithm> | 10 #include <algorithm> |
11 | 11 |
12 #include "webrtc/rtc_base/atomicops.h" | 12 #include "webrtc/rtc_base/atomicops.h" |
13 #include "webrtc/rtc_base/checks.h" | 13 #include "webrtc/rtc_base/checks.h" |
14 #include "webrtc/rtc_base/logging.h" | 14 #include "webrtc/rtc_base/logging.h" |
15 #include "webrtc/rtc_base/messagequeue.h" | 15 #include "webrtc/rtc_base/messagequeue.h" |
16 #include "webrtc/rtc_base/stringencode.h" | 16 #include "webrtc/rtc_base/stringencode.h" |
17 #include "webrtc/rtc_base/thread.h" | 17 #include "webrtc/rtc_base/thread.h" |
18 #include "webrtc/rtc_base/trace_event.h" | 18 #include "webrtc/rtc_base/trace_event.h" |
19 | 19 |
20 namespace rtc { | 20 namespace rtc { |
21 namespace { | 21 namespace { |
22 | 22 |
23 const int kMaxMsgLatency = 150; // 150 ms | 23 const int kMaxMsgLatency = 150; // 150 ms |
24 const int kSlowDispatchLoggingThreshold = 50; // 50 ms | 24 const int kSlowDispatchLoggingThreshold = 50; // 50 ms |
25 | 25 |
26 class SCOPED_LOCKABLE DebugNonReentrantCritScope { | |
27 public: | |
28 DebugNonReentrantCritScope(const CriticalSection* cs, bool* locked) | |
29 EXCLUSIVE_LOCK_FUNCTION(cs) | |
30 : cs_(cs), locked_(locked) { | |
31 cs_->Enter(); | |
32 RTC_DCHECK(!*locked_); | |
33 *locked_ = true; | |
34 } | |
35 | |
36 ~DebugNonReentrantCritScope() UNLOCK_FUNCTION() { | |
37 *locked_ = false; | |
38 cs_->Leave(); | |
39 } | |
40 | |
41 private: | |
42 const CriticalSection* const cs_; | |
43 bool* locked_; | |
44 | |
45 RTC_DISALLOW_COPY_AND_ASSIGN(DebugNonReentrantCritScope); | |
46 }; | |
Taylor Brandstetter
2017/07/05 22:41:47
This still seems like a useful type of helper clas
joachim
2017/07/05 23:03:11
Done (renamed to "MarkProcessingCritScope").
| |
47 | |
48 class FunctorPostMessageHandler : public MessageHandler { | 26 class FunctorPostMessageHandler : public MessageHandler { |
49 public: | 27 public: |
50 void OnMessage(Message* msg) override { | 28 void OnMessage(Message* msg) override { |
51 RunnableData* data = static_cast<RunnableData*>(msg->pdata); | 29 RunnableData* data = static_cast<RunnableData*>(msg->pdata); |
52 data->Run(); | 30 data->Run(); |
53 delete data; | 31 delete data; |
54 } | 32 } |
55 }; | 33 }; |
56 | 34 |
57 } // namespace | 35 } // namespace |
58 | 36 |
59 //------------------------------------------------------------------ | 37 //------------------------------------------------------------------ |
60 // MessageQueueManager | 38 // MessageQueueManager |
61 | 39 |
62 MessageQueueManager* MessageQueueManager::instance_ = nullptr; | 40 MessageQueueManager* MessageQueueManager::instance_ = nullptr; |
63 | 41 |
64 MessageQueueManager* MessageQueueManager::Instance() { | 42 MessageQueueManager* MessageQueueManager::Instance() { |
65 // Note: This is not thread safe, but it is first called before threads are | 43 // Note: This is not thread safe, but it is first called before threads are |
66 // spawned. | 44 // spawned. |
67 if (!instance_) | 45 if (!instance_) |
68 instance_ = new MessageQueueManager; | 46 instance_ = new MessageQueueManager; |
69 return instance_; | 47 return instance_; |
70 } | 48 } |
71 | 49 |
72 bool MessageQueueManager::IsInitialized() { | 50 bool MessageQueueManager::IsInitialized() { |
73 return instance_ != nullptr; | 51 return instance_ != nullptr; |
74 } | 52 } |
75 | 53 |
76 MessageQueueManager::MessageQueueManager() : locked_(false) {} | 54 MessageQueueManager::MessageQueueManager() : processing_(0) {} |
77 | 55 |
78 MessageQueueManager::~MessageQueueManager() { | 56 MessageQueueManager::~MessageQueueManager() { |
79 } | 57 } |
80 | 58 |
81 void MessageQueueManager::Add(MessageQueue *message_queue) { | 59 void MessageQueueManager::Add(MessageQueue *message_queue) { |
82 return Instance()->AddInternal(message_queue); | 60 return Instance()->AddInternal(message_queue); |
83 } | 61 } |
84 void MessageQueueManager::AddInternal(MessageQueue *message_queue) { | 62 void MessageQueueManager::AddInternal(MessageQueue *message_queue) { |
85 DebugNonReentrantCritScope cs(&crit_, &locked_); | 63 CritScope cs(&crit_); |
64 // Prevent changes while the list of message queues is processed. | |
65 RTC_DCHECK_EQ(processing_, 0); | |
86 message_queues_.push_back(message_queue); | 66 message_queues_.push_back(message_queue); |
87 } | 67 } |
88 | 68 |
89 void MessageQueueManager::Remove(MessageQueue *message_queue) { | 69 void MessageQueueManager::Remove(MessageQueue *message_queue) { |
90 // If there isn't a message queue manager instance, then there isn't a queue | 70 // If there isn't a message queue manager instance, then there isn't a queue |
91 // to remove. | 71 // to remove. |
92 if (!instance_) return; | 72 if (!instance_) return; |
93 return Instance()->RemoveInternal(message_queue); | 73 return Instance()->RemoveInternal(message_queue); |
94 } | 74 } |
95 void MessageQueueManager::RemoveInternal(MessageQueue *message_queue) { | 75 void MessageQueueManager::RemoveInternal(MessageQueue *message_queue) { |
96 // If this is the last MessageQueue, destroy the manager as well so that | 76 // If this is the last MessageQueue, destroy the manager as well so that |
97 // we don't leak this object at program shutdown. As mentioned above, this is | 77 // we don't leak this object at program shutdown. As mentioned above, this is |
98 // not thread-safe, but this should only happen at program termination (when | 78 // not thread-safe, but this should only happen at program termination (when |
99 // the ThreadManager is destroyed, and threads are no longer active). | 79 // the ThreadManager is destroyed, and threads are no longer active). |
100 bool destroy = false; | 80 bool destroy = false; |
101 { | 81 { |
102 DebugNonReentrantCritScope cs(&crit_, &locked_); | 82 CritScope cs(&crit_); |
83 // Prevent changes while the list of message queues is processed. | |
84 RTC_DCHECK_EQ(processing_, 0); | |
103 std::vector<MessageQueue *>::iterator iter; | 85 std::vector<MessageQueue *>::iterator iter; |
104 iter = std::find(message_queues_.begin(), message_queues_.end(), | 86 iter = std::find(message_queues_.begin(), message_queues_.end(), |
105 message_queue); | 87 message_queue); |
106 if (iter != message_queues_.end()) { | 88 if (iter != message_queues_.end()) { |
107 message_queues_.erase(iter); | 89 message_queues_.erase(iter); |
108 } | 90 } |
109 destroy = message_queues_.empty(); | 91 destroy = message_queues_.empty(); |
110 } | 92 } |
111 if (destroy) { | 93 if (destroy) { |
112 instance_ = nullptr; | 94 instance_ = nullptr; |
113 delete this; | 95 delete this; |
114 } | 96 } |
115 } | 97 } |
116 | 98 |
117 void MessageQueueManager::Clear(MessageHandler *handler) { | 99 void MessageQueueManager::Clear(MessageHandler *handler) { |
118 // If there isn't a message queue manager instance, then there aren't any | 100 // If there isn't a message queue manager instance, then there aren't any |
119 // queues to remove this handler from. | 101 // queues to remove this handler from. |
120 if (!instance_) return; | 102 if (!instance_) return; |
121 return Instance()->ClearInternal(handler); | 103 return Instance()->ClearInternal(handler); |
122 } | 104 } |
123 void MessageQueueManager::ClearInternal(MessageHandler *handler) { | 105 void MessageQueueManager::ClearInternal(MessageHandler *handler) { |
124 DebugNonReentrantCritScope cs(&crit_, &locked_); | 106 CritScope cs(&crit_); |
Taylor Brandstetter
2017/07/05 22:41:47
May want to comment here that this allows a delete
joachim
2017/07/05 23:03:11
Done.
| |
107 processing_++; | |
125 std::vector<MessageQueue *>::iterator iter; | 108 std::vector<MessageQueue *>::iterator iter; |
126 for (iter = message_queues_.begin(); iter != message_queues_.end(); iter++) | 109 for (MessageQueue* queue : message_queues_) { |
127 (*iter)->Clear(handler); | 110 queue->Clear(handler); |
111 } | |
112 processing_--; | |
128 } | 113 } |
129 | 114 |
130 void MessageQueueManager::ProcessAllMessageQueues() { | 115 void MessageQueueManager::ProcessAllMessageQueues() { |
131 if (!instance_) { | 116 if (!instance_) { |
132 return; | 117 return; |
133 } | 118 } |
134 return Instance()->ProcessAllMessageQueuesInternal(); | 119 return Instance()->ProcessAllMessageQueuesInternal(); |
135 } | 120 } |
136 | 121 |
137 void MessageQueueManager::ProcessAllMessageQueuesInternal() { | 122 void MessageQueueManager::ProcessAllMessageQueuesInternal() { |
138 // This works by posting a delayed message at the current time and waiting | 123 // This works by posting a delayed message at the current time and waiting |
139 // for it to be dispatched on all queues, which will ensure that all messages | 124 // for it to be dispatched on all queues, which will ensure that all messages |
140 // that came before it were also dispatched. | 125 // that came before it were also dispatched. |
141 volatile int queues_not_done = 0; | 126 volatile int queues_not_done = 0; |
142 | 127 |
143 // This class is used so that whether the posted message is processed, or the | 128 // This class is used so that whether the posted message is processed, or the |
144 // message queue is simply cleared, queues_not_done gets decremented. | 129 // message queue is simply cleared, queues_not_done gets decremented. |
145 class ScopedIncrement : public MessageData { | 130 class ScopedIncrement : public MessageData { |
146 public: | 131 public: |
147 ScopedIncrement(volatile int* value) : value_(value) { | 132 ScopedIncrement(volatile int* value) : value_(value) { |
148 AtomicOps::Increment(value_); | 133 AtomicOps::Increment(value_); |
149 } | 134 } |
150 ~ScopedIncrement() override { AtomicOps::Decrement(value_); } | 135 ~ScopedIncrement() override { AtomicOps::Decrement(value_); } |
151 | 136 |
152 private: | 137 private: |
153 volatile int* value_; | 138 volatile int* value_; |
154 }; | 139 }; |
155 | 140 |
156 { | 141 { |
157 DebugNonReentrantCritScope cs(&crit_, &locked_); | 142 CritScope cs(&crit_); |
143 processing_++; | |
158 for (MessageQueue* queue : message_queues_) { | 144 for (MessageQueue* queue : message_queues_) { |
159 if (!queue->IsProcessingMessages()) { | 145 if (!queue->IsProcessingMessages()) { |
160 // If the queue is not processing messages, it can | 146 // If the queue is not processing messages, it can |
161 // be ignored. If we tried to post a message to it, it would be dropped | 147 // be ignored. If we tried to post a message to it, it would be dropped |
162 // or ignored. | 148 // or ignored. |
163 continue; | 149 continue; |
164 } | 150 } |
165 queue->PostDelayed(RTC_FROM_HERE, 0, nullptr, MQID_DISPOSE, | 151 queue->PostDelayed(RTC_FROM_HERE, 0, nullptr, MQID_DISPOSE, |
166 new ScopedIncrement(&queues_not_done)); | 152 new ScopedIncrement(&queues_not_done)); |
167 } | 153 } |
154 processing_--; | |
168 } | 155 } |
169 // Note: One of the message queues may have been on this thread, which is why | 156 // Note: One of the message queues may have been on this thread, which is why |
170 // we can't synchronously wait for queues_not_done to go to 0; we need to | 157 // we can't synchronously wait for queues_not_done to go to 0; we need to |
171 // process messages as well. | 158 // process messages as well. |
172 while (AtomicOps::AcquireLoad(&queues_not_done) > 0) { | 159 while (AtomicOps::AcquireLoad(&queues_not_done) > 0) { |
173 rtc::Thread::Current()->ProcessMessages(0); | 160 rtc::Thread::Current()->ProcessMessages(0); |
174 } | 161 } |
175 } | 162 } |
176 | 163 |
177 //------------------------------------------------------------------ | 164 //------------------------------------------------------------------ |
(...skipping 363 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
541 | 528 |
542 void MessageQueue::PostFunctorInternal(const Location& posted_from, | 529 void MessageQueue::PostFunctorInternal(const Location& posted_from, |
543 RunnableData* message_data) { | 530 RunnableData* message_data) { |
544 // Use static to ensure it outlives this scope. Safe since | 531 // Use static to ensure it outlives this scope. Safe since |
545 // FunctorPostMessageHandler keeps no state. | 532 // FunctorPostMessageHandler keeps no state. |
546 static FunctorPostMessageHandler handler; | 533 static FunctorPostMessageHandler handler; |
547 Post(posted_from, &handler, 0, message_data); | 534 Post(posted_from, &handler, 0, message_data); |
548 } | 535 } |
549 | 536 |
550 } // namespace rtc | 537 } // namespace rtc |
OLD | NEW |