Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(409)

Side by Side Diff: webrtc/rtc_base/messagequeue.cc

Issue 2968753002: Support re-entrant calls to MessageQueueManager::Clear. (Closed)
Patch Set: Also Post inner handler to verify re-entrant clearing in the message queue works. Created 3 years, 5 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « webrtc/rtc_base/messagequeue.h ('k') | webrtc/rtc_base/messagequeue_unittest.cc » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 /* 1 /*
2 * Copyright 2004 The WebRTC Project Authors. All rights reserved. 2 * Copyright 2004 The WebRTC Project Authors. All rights reserved.
3 * 3 *
4 * Use of this source code is governed by a BSD-style license 4 * Use of this source code is governed by a BSD-style license
5 * that can be found in the LICENSE file in the root of the source 5 * that can be found in the LICENSE file in the root of the source
6 * tree. An additional intellectual property rights grant can be found 6 * tree. An additional intellectual property rights grant can be found
7 * in the file PATENTS. All contributing project authors may 7 * in the file PATENTS. All contributing project authors may
8 * be found in the AUTHORS file in the root of the source tree. 8 * be found in the AUTHORS file in the root of the source tree.
9 */ 9 */
10 #include <algorithm> 10 #include <algorithm>
11 11
12 #include "webrtc/rtc_base/atomicops.h" 12 #include "webrtc/rtc_base/atomicops.h"
13 #include "webrtc/rtc_base/checks.h" 13 #include "webrtc/rtc_base/checks.h"
14 #include "webrtc/rtc_base/logging.h" 14 #include "webrtc/rtc_base/logging.h"
15 #include "webrtc/rtc_base/messagequeue.h" 15 #include "webrtc/rtc_base/messagequeue.h"
16 #include "webrtc/rtc_base/stringencode.h" 16 #include "webrtc/rtc_base/stringencode.h"
17 #include "webrtc/rtc_base/thread.h" 17 #include "webrtc/rtc_base/thread.h"
18 #include "webrtc/rtc_base/trace_event.h" 18 #include "webrtc/rtc_base/trace_event.h"
19 19
20 namespace rtc { 20 namespace rtc {
21 namespace { 21 namespace {
22 22
23 const int kMaxMsgLatency = 150; // 150 ms 23 const int kMaxMsgLatency = 150; // 150 ms
24 const int kSlowDispatchLoggingThreshold = 50; // 50 ms 24 const int kSlowDispatchLoggingThreshold = 50; // 50 ms
25 25
26 class SCOPED_LOCKABLE DebugNonReentrantCritScope { 26 class SCOPED_LOCKABLE MarkProcessingCritScope {
27 public: 27 public:
28 DebugNonReentrantCritScope(const CriticalSection* cs, bool* locked) 28 MarkProcessingCritScope(const CriticalSection* cs, size_t* processing)
29 EXCLUSIVE_LOCK_FUNCTION(cs) 29 EXCLUSIVE_LOCK_FUNCTION(cs)
30 : cs_(cs), locked_(locked) { 30 : cs_(cs), processing_(processing) {
31 cs_->Enter(); 31 cs_->Enter();
32 RTC_DCHECK(!*locked_); 32 *processing_ += 1;
33 *locked_ = true;
34 } 33 }
35 34
36 ~DebugNonReentrantCritScope() UNLOCK_FUNCTION() { 35 ~MarkProcessingCritScope() UNLOCK_FUNCTION() {
37 *locked_ = false; 36 *processing_ -= 1;
38 cs_->Leave(); 37 cs_->Leave();
39 } 38 }
40 39
41 private: 40 private:
42 const CriticalSection* const cs_; 41 const CriticalSection* const cs_;
43 bool* locked_; 42 size_t* processing_;
44 43
45 RTC_DISALLOW_COPY_AND_ASSIGN(DebugNonReentrantCritScope); 44 RTC_DISALLOW_COPY_AND_ASSIGN(MarkProcessingCritScope);
46 }; 45 };
47 46
48 class FunctorPostMessageHandler : public MessageHandler { 47 class FunctorPostMessageHandler : public MessageHandler {
49 public: 48 public:
50 void OnMessage(Message* msg) override { 49 void OnMessage(Message* msg) override {
51 RunnableData* data = static_cast<RunnableData*>(msg->pdata); 50 RunnableData* data = static_cast<RunnableData*>(msg->pdata);
52 data->Run(); 51 data->Run();
53 delete data; 52 delete data;
54 } 53 }
55 }; 54 };
(...skipping 10 matching lines...) Expand all
66 // spawned. 65 // spawned.
67 if (!instance_) 66 if (!instance_)
68 instance_ = new MessageQueueManager; 67 instance_ = new MessageQueueManager;
69 return instance_; 68 return instance_;
70 } 69 }
71 70
72 bool MessageQueueManager::IsInitialized() { 71 bool MessageQueueManager::IsInitialized() {
73 return instance_ != nullptr; 72 return instance_ != nullptr;
74 } 73 }
75 74
76 MessageQueueManager::MessageQueueManager() : locked_(false) {} 75 MessageQueueManager::MessageQueueManager() : processing_(0) {}
77 76
78 MessageQueueManager::~MessageQueueManager() { 77 MessageQueueManager::~MessageQueueManager() {
79 } 78 }
80 79
81 void MessageQueueManager::Add(MessageQueue *message_queue) { 80 void MessageQueueManager::Add(MessageQueue *message_queue) {
82 return Instance()->AddInternal(message_queue); 81 return Instance()->AddInternal(message_queue);
83 } 82 }
84 void MessageQueueManager::AddInternal(MessageQueue *message_queue) { 83 void MessageQueueManager::AddInternal(MessageQueue *message_queue) {
85 DebugNonReentrantCritScope cs(&crit_, &locked_); 84 CritScope cs(&crit_);
85 // Prevent changes while the list of message queues is processed.
86 RTC_DCHECK_EQ(processing_, 0);
86 message_queues_.push_back(message_queue); 87 message_queues_.push_back(message_queue);
87 } 88 }
88 89
89 void MessageQueueManager::Remove(MessageQueue *message_queue) { 90 void MessageQueueManager::Remove(MessageQueue *message_queue) {
90 // If there isn't a message queue manager instance, then there isn't a queue 91 // If there isn't a message queue manager instance, then there isn't a queue
91 // to remove. 92 // to remove.
92 if (!instance_) return; 93 if (!instance_) return;
93 return Instance()->RemoveInternal(message_queue); 94 return Instance()->RemoveInternal(message_queue);
94 } 95 }
95 void MessageQueueManager::RemoveInternal(MessageQueue *message_queue) { 96 void MessageQueueManager::RemoveInternal(MessageQueue *message_queue) {
96 // If this is the last MessageQueue, destroy the manager as well so that 97 // If this is the last MessageQueue, destroy the manager as well so that
97 // we don't leak this object at program shutdown. As mentioned above, this is 98 // we don't leak this object at program shutdown. As mentioned above, this is
98 // not thread-safe, but this should only happen at program termination (when 99 // not thread-safe, but this should only happen at program termination (when
99 // the ThreadManager is destroyed, and threads are no longer active). 100 // the ThreadManager is destroyed, and threads are no longer active).
100 bool destroy = false; 101 bool destroy = false;
101 { 102 {
102 DebugNonReentrantCritScope cs(&crit_, &locked_); 103 CritScope cs(&crit_);
104 // Prevent changes while the list of message queues is processed.
105 RTC_DCHECK_EQ(processing_, 0);
103 std::vector<MessageQueue *>::iterator iter; 106 std::vector<MessageQueue *>::iterator iter;
104 iter = std::find(message_queues_.begin(), message_queues_.end(), 107 iter = std::find(message_queues_.begin(), message_queues_.end(),
105 message_queue); 108 message_queue);
106 if (iter != message_queues_.end()) { 109 if (iter != message_queues_.end()) {
107 message_queues_.erase(iter); 110 message_queues_.erase(iter);
108 } 111 }
109 destroy = message_queues_.empty(); 112 destroy = message_queues_.empty();
110 } 113 }
111 if (destroy) { 114 if (destroy) {
112 instance_ = nullptr; 115 instance_ = nullptr;
113 delete this; 116 delete this;
114 } 117 }
115 } 118 }
116 119
117 void MessageQueueManager::Clear(MessageHandler *handler) { 120 void MessageQueueManager::Clear(MessageHandler *handler) {
118 // If there isn't a message queue manager instance, then there aren't any 121 // If there isn't a message queue manager instance, then there aren't any
119 // queues to remove this handler from. 122 // queues to remove this handler from.
120 if (!instance_) return; 123 if (!instance_) return;
121 return Instance()->ClearInternal(handler); 124 return Instance()->ClearInternal(handler);
122 } 125 }
123 void MessageQueueManager::ClearInternal(MessageHandler *handler) { 126 void MessageQueueManager::ClearInternal(MessageHandler *handler) {
124 DebugNonReentrantCritScope cs(&crit_, &locked_); 127 // Deleted objects may cause re-entrant calls to ClearInternal. This is
128 // allowed as the list of message queues does not change while queues are
129 // cleared.
130 MarkProcessingCritScope cs(&crit_, &processing_);
125 std::vector<MessageQueue *>::iterator iter; 131 std::vector<MessageQueue *>::iterator iter;
126 for (iter = message_queues_.begin(); iter != message_queues_.end(); iter++) 132 for (MessageQueue* queue : message_queues_) {
127 (*iter)->Clear(handler); 133 queue->Clear(handler);
134 }
128 } 135 }
129 136
130 void MessageQueueManager::ProcessAllMessageQueues() { 137 void MessageQueueManager::ProcessAllMessageQueues() {
131 if (!instance_) { 138 if (!instance_) {
132 return; 139 return;
133 } 140 }
134 return Instance()->ProcessAllMessageQueuesInternal(); 141 return Instance()->ProcessAllMessageQueuesInternal();
135 } 142 }
136 143
137 void MessageQueueManager::ProcessAllMessageQueuesInternal() { 144 void MessageQueueManager::ProcessAllMessageQueuesInternal() {
138 // This works by posting a delayed message at the current time and waiting 145 // This works by posting a delayed message at the current time and waiting
139 // for it to be dispatched on all queues, which will ensure that all messages 146 // for it to be dispatched on all queues, which will ensure that all messages
140 // that came before it were also dispatched. 147 // that came before it were also dispatched.
141 volatile int queues_not_done = 0; 148 volatile int queues_not_done = 0;
142 149
143 // This class is used so that whether the posted message is processed, or the 150 // This class is used so that whether the posted message is processed, or the
144 // message queue is simply cleared, queues_not_done gets decremented. 151 // message queue is simply cleared, queues_not_done gets decremented.
145 class ScopedIncrement : public MessageData { 152 class ScopedIncrement : public MessageData {
146 public: 153 public:
147 ScopedIncrement(volatile int* value) : value_(value) { 154 ScopedIncrement(volatile int* value) : value_(value) {
148 AtomicOps::Increment(value_); 155 AtomicOps::Increment(value_);
149 } 156 }
150 ~ScopedIncrement() override { AtomicOps::Decrement(value_); } 157 ~ScopedIncrement() override { AtomicOps::Decrement(value_); }
151 158
152 private: 159 private:
153 volatile int* value_; 160 volatile int* value_;
154 }; 161 };
155 162
156 { 163 {
157 DebugNonReentrantCritScope cs(&crit_, &locked_); 164 MarkProcessingCritScope cs(&crit_, &processing_);
158 for (MessageQueue* queue : message_queues_) { 165 for (MessageQueue* queue : message_queues_) {
159 if (!queue->IsProcessingMessages()) { 166 if (!queue->IsProcessingMessages()) {
160 // If the queue is not processing messages, it can 167 // If the queue is not processing messages, it can
161 // be ignored. If we tried to post a message to it, it would be dropped 168 // be ignored. If we tried to post a message to it, it would be dropped
162 // or ignored. 169 // or ignored.
163 continue; 170 continue;
164 } 171 }
165 queue->PostDelayed(RTC_FROM_HERE, 0, nullptr, MQID_DISPOSE, 172 queue->PostDelayed(RTC_FROM_HERE, 0, nullptr, MQID_DISPOSE,
166 new ScopedIncrement(&queues_not_done)); 173 new ScopedIncrement(&queues_not_done));
167 } 174 }
(...skipping 373 matching lines...) Expand 10 before | Expand all | Expand 10 after
541 548
542 void MessageQueue::PostFunctorInternal(const Location& posted_from, 549 void MessageQueue::PostFunctorInternal(const Location& posted_from,
543 RunnableData* message_data) { 550 RunnableData* message_data) {
544 // Use static to ensure it outlives this scope. Safe since 551 // Use static to ensure it outlives this scope. Safe since
545 // FunctorPostMessageHandler keeps no state. 552 // FunctorPostMessageHandler keeps no state.
546 static FunctorPostMessageHandler handler; 553 static FunctorPostMessageHandler handler;
547 Post(posted_from, &handler, 0, message_data); 554 Post(posted_from, &handler, 0, message_data);
548 } 555 }
549 556
550 } // namespace rtc 557 } // namespace rtc
OLDNEW
« no previous file with comments | « webrtc/rtc_base/messagequeue.h ('k') | webrtc/rtc_base/messagequeue_unittest.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698