Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(7)

Side by Side Diff: webrtc/rtc_base/messagequeue.cc

Issue 2968753002: Support re-entrant calls to MessageQueueManager::Clear. (Closed)
Patch Set: Rebased Created 3 years, 5 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 /* 1 /*
2 * Copyright 2004 The WebRTC Project Authors. All rights reserved. 2 * Copyright 2004 The WebRTC Project Authors. All rights reserved.
3 * 3 *
4 * Use of this source code is governed by a BSD-style license 4 * Use of this source code is governed by a BSD-style license
5 * that can be found in the LICENSE file in the root of the source 5 * that can be found in the LICENSE file in the root of the source
6 * tree. An additional intellectual property rights grant can be found 6 * tree. An additional intellectual property rights grant can be found
7 * in the file PATENTS. All contributing project authors may 7 * in the file PATENTS. All contributing project authors may
8 * be found in the AUTHORS file in the root of the source tree. 8 * be found in the AUTHORS file in the root of the source tree.
9 */ 9 */
10 #include <algorithm> 10 #include <algorithm>
11 11
12 #include "webrtc/rtc_base/atomicops.h" 12 #include "webrtc/rtc_base/atomicops.h"
13 #include "webrtc/rtc_base/checks.h" 13 #include "webrtc/rtc_base/checks.h"
14 #include "webrtc/rtc_base/logging.h" 14 #include "webrtc/rtc_base/logging.h"
15 #include "webrtc/rtc_base/messagequeue.h" 15 #include "webrtc/rtc_base/messagequeue.h"
16 #include "webrtc/rtc_base/stringencode.h" 16 #include "webrtc/rtc_base/stringencode.h"
17 #include "webrtc/rtc_base/thread.h" 17 #include "webrtc/rtc_base/thread.h"
18 #include "webrtc/rtc_base/trace_event.h" 18 #include "webrtc/rtc_base/trace_event.h"
19 19
20 namespace rtc { 20 namespace rtc {
21 namespace { 21 namespace {
22 22
23 const int kMaxMsgLatency = 150; // 150 ms 23 const int kMaxMsgLatency = 150; // 150 ms
24 const int kSlowDispatchLoggingThreshold = 50; // 50 ms 24 const int kSlowDispatchLoggingThreshold = 50; // 50 ms
25 25
26 class SCOPED_LOCKABLE DebugNonReentrantCritScope {
27 public:
28 DebugNonReentrantCritScope(const CriticalSection* cs, bool* locked)
29 EXCLUSIVE_LOCK_FUNCTION(cs)
30 : cs_(cs), locked_(locked) {
31 cs_->Enter();
32 RTC_DCHECK(!*locked_);
33 *locked_ = true;
34 }
35
36 ~DebugNonReentrantCritScope() UNLOCK_FUNCTION() {
37 *locked_ = false;
38 cs_->Leave();
39 }
40
41 private:
42 const CriticalSection* const cs_;
43 bool* locked_;
44
45 RTC_DISALLOW_COPY_AND_ASSIGN(DebugNonReentrantCritScope);
46 };
Taylor Brandstetter 2017/07/05 22:41:47 This still seems like a useful type of helper clas
joachim 2017/07/05 23:03:11 Done (renamed to "MarkProcessingCritScope").
47
48 class FunctorPostMessageHandler : public MessageHandler { 26 class FunctorPostMessageHandler : public MessageHandler {
49 public: 27 public:
50 void OnMessage(Message* msg) override { 28 void OnMessage(Message* msg) override {
51 RunnableData* data = static_cast<RunnableData*>(msg->pdata); 29 RunnableData* data = static_cast<RunnableData*>(msg->pdata);
52 data->Run(); 30 data->Run();
53 delete data; 31 delete data;
54 } 32 }
55 }; 33 };
56 34
57 } // namespace 35 } // namespace
58 36
59 //------------------------------------------------------------------ 37 //------------------------------------------------------------------
60 // MessageQueueManager 38 // MessageQueueManager
61 39
62 MessageQueueManager* MessageQueueManager::instance_ = nullptr; 40 MessageQueueManager* MessageQueueManager::instance_ = nullptr;
63 41
64 MessageQueueManager* MessageQueueManager::Instance() { 42 MessageQueueManager* MessageQueueManager::Instance() {
65 // Note: This is not thread safe, but it is first called before threads are 43 // Note: This is not thread safe, but it is first called before threads are
66 // spawned. 44 // spawned.
67 if (!instance_) 45 if (!instance_)
68 instance_ = new MessageQueueManager; 46 instance_ = new MessageQueueManager;
69 return instance_; 47 return instance_;
70 } 48 }
71 49
72 bool MessageQueueManager::IsInitialized() { 50 bool MessageQueueManager::IsInitialized() {
73 return instance_ != nullptr; 51 return instance_ != nullptr;
74 } 52 }
75 53
76 MessageQueueManager::MessageQueueManager() : locked_(false) {} 54 MessageQueueManager::MessageQueueManager() : processing_(0) {}
77 55
78 MessageQueueManager::~MessageQueueManager() { 56 MessageQueueManager::~MessageQueueManager() {
79 } 57 }
80 58
81 void MessageQueueManager::Add(MessageQueue *message_queue) { 59 void MessageQueueManager::Add(MessageQueue *message_queue) {
82 return Instance()->AddInternal(message_queue); 60 return Instance()->AddInternal(message_queue);
83 } 61 }
84 void MessageQueueManager::AddInternal(MessageQueue *message_queue) { 62 void MessageQueueManager::AddInternal(MessageQueue *message_queue) {
85 DebugNonReentrantCritScope cs(&crit_, &locked_); 63 CritScope cs(&crit_);
64 // Prevent changes while the list of message queues is processed.
65 RTC_DCHECK_EQ(processing_, 0);
86 message_queues_.push_back(message_queue); 66 message_queues_.push_back(message_queue);
87 } 67 }
88 68
89 void MessageQueueManager::Remove(MessageQueue *message_queue) { 69 void MessageQueueManager::Remove(MessageQueue *message_queue) {
90 // If there isn't a message queue manager instance, then there isn't a queue 70 // If there isn't a message queue manager instance, then there isn't a queue
91 // to remove. 71 // to remove.
92 if (!instance_) return; 72 if (!instance_) return;
93 return Instance()->RemoveInternal(message_queue); 73 return Instance()->RemoveInternal(message_queue);
94 } 74 }
95 void MessageQueueManager::RemoveInternal(MessageQueue *message_queue) { 75 void MessageQueueManager::RemoveInternal(MessageQueue *message_queue) {
96 // If this is the last MessageQueue, destroy the manager as well so that 76 // If this is the last MessageQueue, destroy the manager as well so that
97 // we don't leak this object at program shutdown. As mentioned above, this is 77 // we don't leak this object at program shutdown. As mentioned above, this is
98 // not thread-safe, but this should only happen at program termination (when 78 // not thread-safe, but this should only happen at program termination (when
99 // the ThreadManager is destroyed, and threads are no longer active). 79 // the ThreadManager is destroyed, and threads are no longer active).
100 bool destroy = false; 80 bool destroy = false;
101 { 81 {
102 DebugNonReentrantCritScope cs(&crit_, &locked_); 82 CritScope cs(&crit_);
83 // Prevent changes while the list of message queues is processed.
84 RTC_DCHECK_EQ(processing_, 0);
103 std::vector<MessageQueue *>::iterator iter; 85 std::vector<MessageQueue *>::iterator iter;
104 iter = std::find(message_queues_.begin(), message_queues_.end(), 86 iter = std::find(message_queues_.begin(), message_queues_.end(),
105 message_queue); 87 message_queue);
106 if (iter != message_queues_.end()) { 88 if (iter != message_queues_.end()) {
107 message_queues_.erase(iter); 89 message_queues_.erase(iter);
108 } 90 }
109 destroy = message_queues_.empty(); 91 destroy = message_queues_.empty();
110 } 92 }
111 if (destroy) { 93 if (destroy) {
112 instance_ = nullptr; 94 instance_ = nullptr;
113 delete this; 95 delete this;
114 } 96 }
115 } 97 }
116 98
117 void MessageQueueManager::Clear(MessageHandler *handler) { 99 void MessageQueueManager::Clear(MessageHandler *handler) {
118 // If there isn't a message queue manager instance, then there aren't any 100 // If there isn't a message queue manager instance, then there aren't any
119 // queues to remove this handler from. 101 // queues to remove this handler from.
120 if (!instance_) return; 102 if (!instance_) return;
121 return Instance()->ClearInternal(handler); 103 return Instance()->ClearInternal(handler);
122 } 104 }
123 void MessageQueueManager::ClearInternal(MessageHandler *handler) { 105 void MessageQueueManager::ClearInternal(MessageHandler *handler) {
124 DebugNonReentrantCritScope cs(&crit_, &locked_); 106 CritScope cs(&crit_);
Taylor Brandstetter 2017/07/05 22:41:47 May want to comment here that this allows a delete
joachim 2017/07/05 23:03:11 Done.
107 processing_++;
125 std::vector<MessageQueue *>::iterator iter; 108 std::vector<MessageQueue *>::iterator iter;
126 for (iter = message_queues_.begin(); iter != message_queues_.end(); iter++) 109 for (MessageQueue* queue : message_queues_) {
127 (*iter)->Clear(handler); 110 queue->Clear(handler);
111 }
112 processing_--;
128 } 113 }
129 114
130 void MessageQueueManager::ProcessAllMessageQueues() { 115 void MessageQueueManager::ProcessAllMessageQueues() {
131 if (!instance_) { 116 if (!instance_) {
132 return; 117 return;
133 } 118 }
134 return Instance()->ProcessAllMessageQueuesInternal(); 119 return Instance()->ProcessAllMessageQueuesInternal();
135 } 120 }
136 121
137 void MessageQueueManager::ProcessAllMessageQueuesInternal() { 122 void MessageQueueManager::ProcessAllMessageQueuesInternal() {
138 // This works by posting a delayed message at the current time and waiting 123 // This works by posting a delayed message at the current time and waiting
139 // for it to be dispatched on all queues, which will ensure that all messages 124 // for it to be dispatched on all queues, which will ensure that all messages
140 // that came before it were also dispatched. 125 // that came before it were also dispatched.
141 volatile int queues_not_done = 0; 126 volatile int queues_not_done = 0;
142 127
143 // This class is used so that whether the posted message is processed, or the 128 // This class is used so that whether the posted message is processed, or the
144 // message queue is simply cleared, queues_not_done gets decremented. 129 // message queue is simply cleared, queues_not_done gets decremented.
145 class ScopedIncrement : public MessageData { 130 class ScopedIncrement : public MessageData {
146 public: 131 public:
147 ScopedIncrement(volatile int* value) : value_(value) { 132 ScopedIncrement(volatile int* value) : value_(value) {
148 AtomicOps::Increment(value_); 133 AtomicOps::Increment(value_);
149 } 134 }
150 ~ScopedIncrement() override { AtomicOps::Decrement(value_); } 135 ~ScopedIncrement() override { AtomicOps::Decrement(value_); }
151 136
152 private: 137 private:
153 volatile int* value_; 138 volatile int* value_;
154 }; 139 };
155 140
156 { 141 {
157 DebugNonReentrantCritScope cs(&crit_, &locked_); 142 CritScope cs(&crit_);
143 processing_++;
158 for (MessageQueue* queue : message_queues_) { 144 for (MessageQueue* queue : message_queues_) {
159 if (!queue->IsProcessingMessages()) { 145 if (!queue->IsProcessingMessages()) {
160 // If the queue is not processing messages, it can 146 // If the queue is not processing messages, it can
161 // be ignored. If we tried to post a message to it, it would be dropped 147 // be ignored. If we tried to post a message to it, it would be dropped
162 // or ignored. 148 // or ignored.
163 continue; 149 continue;
164 } 150 }
165 queue->PostDelayed(RTC_FROM_HERE, 0, nullptr, MQID_DISPOSE, 151 queue->PostDelayed(RTC_FROM_HERE, 0, nullptr, MQID_DISPOSE,
166 new ScopedIncrement(&queues_not_done)); 152 new ScopedIncrement(&queues_not_done));
167 } 153 }
154 processing_--;
168 } 155 }
169 // Note: One of the message queues may have been on this thread, which is why 156 // Note: One of the message queues may have been on this thread, which is why
170 // we can't synchronously wait for queues_not_done to go to 0; we need to 157 // we can't synchronously wait for queues_not_done to go to 0; we need to
171 // process messages as well. 158 // process messages as well.
172 while (AtomicOps::AcquireLoad(&queues_not_done) > 0) { 159 while (AtomicOps::AcquireLoad(&queues_not_done) > 0) {
173 rtc::Thread::Current()->ProcessMessages(0); 160 rtc::Thread::Current()->ProcessMessages(0);
174 } 161 }
175 } 162 }
176 163
177 //------------------------------------------------------------------ 164 //------------------------------------------------------------------
(...skipping 363 matching lines...) Expand 10 before | Expand all | Expand 10 after
541 528
542 void MessageQueue::PostFunctorInternal(const Location& posted_from, 529 void MessageQueue::PostFunctorInternal(const Location& posted_from,
543 RunnableData* message_data) { 530 RunnableData* message_data) {
544 // Use static to ensure it outlives this scope. Safe since 531 // Use static to ensure it outlives this scope. Safe since
545 // FunctorPostMessageHandler keeps no state. 532 // FunctorPostMessageHandler keeps no state.
546 static FunctorPostMessageHandler handler; 533 static FunctorPostMessageHandler handler;
547 Post(posted_from, &handler, 0, message_data); 534 Post(posted_from, &handler, 0, message_data);
548 } 535 }
549 536
550 } // namespace rtc 537 } // namespace rtc
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698