OLD | NEW |
1 /* | 1 /* |
2 * Copyright 2004 The WebRTC Project Authors. All rights reserved. | 2 * Copyright 2004 The WebRTC Project Authors. All rights reserved. |
3 * | 3 * |
4 * Use of this source code is governed by a BSD-style license | 4 * Use of this source code is governed by a BSD-style license |
5 * that can be found in the LICENSE file in the root of the source | 5 * that can be found in the LICENSE file in the root of the source |
6 * tree. An additional intellectual property rights grant can be found | 6 * tree. An additional intellectual property rights grant can be found |
7 * in the file PATENTS. All contributing project authors may | 7 * in the file PATENTS. All contributing project authors may |
8 * be found in the AUTHORS file in the root of the source tree. | 8 * be found in the AUTHORS file in the root of the source tree. |
9 */ | 9 */ |
10 | 10 |
11 #ifndef WEBRTC_BASE_MESSAGEQUEUE_H_ | 11 #ifndef WEBRTC_BASE_MESSAGEQUEUE_H_ |
12 #define WEBRTC_BASE_MESSAGEQUEUE_H_ | 12 #define WEBRTC_BASE_MESSAGEQUEUE_H_ |
13 | 13 |
14 #include <string.h> | |
15 | 14 |
16 #include <algorithm> | 15 // This header is deprecated and is just left here temporarily during |
17 #include <list> | 16 // refactoring. See https://bugs.webrtc.org/7634 for more details. |
18 #include <memory> | 17 #include "webrtc/rtc_base/messagequeue.h" |
19 #include <queue> | |
20 #include <utility> | |
21 #include <vector> | |
22 | |
23 #include "webrtc/base/basictypes.h" | |
24 #include "webrtc/base/constructormagic.h" | |
25 #include "webrtc/base/criticalsection.h" | |
26 #include "webrtc/base/location.h" | |
27 #include "webrtc/base/messagehandler.h" | |
28 #include "webrtc/base/scoped_ref_ptr.h" | |
29 #include "webrtc/base/sigslot.h" | |
30 #include "webrtc/base/socketserver.h" | |
31 #include "webrtc/base/timeutils.h" | |
32 #include "webrtc/base/thread_annotations.h" | |
33 | |
34 namespace rtc { | |
35 | |
36 struct Message; | |
37 class MessageQueue; | |
38 | |
39 // MessageQueueManager does cleanup of of message queues | |
40 | |
41 class MessageQueueManager { | |
42 public: | |
43 static void Add(MessageQueue *message_queue); | |
44 static void Remove(MessageQueue *message_queue); | |
45 static void Clear(MessageHandler *handler); | |
46 | |
47 // For testing purposes, we expose whether or not the MessageQueueManager | |
48 // instance has been initialized. It has no other use relative to the rest of | |
49 // the functions of this class, which auto-initialize the underlying | |
50 // MessageQueueManager instance when necessary. | |
51 static bool IsInitialized(); | |
52 | |
53 // Mainly for testing purposes, for use with a simulated clock. | |
54 // Ensures that all message queues have processed delayed messages | |
55 // up until the current point in time. | |
56 static void ProcessAllMessageQueues(); | |
57 | |
58 private: | |
59 static MessageQueueManager* Instance(); | |
60 | |
61 MessageQueueManager(); | |
62 ~MessageQueueManager(); | |
63 | |
64 void AddInternal(MessageQueue *message_queue); | |
65 void RemoveInternal(MessageQueue *message_queue); | |
66 void ClearInternal(MessageHandler *handler); | |
67 void ProcessAllMessageQueuesInternal(); | |
68 | |
69 static MessageQueueManager* instance_; | |
70 // This list contains all live MessageQueues. | |
71 std::vector<MessageQueue*> message_queues_ GUARDED_BY(crit_); | |
72 | |
73 // Acquire this with DebugNonReentrantCritScope. | |
74 CriticalSection crit_; | |
75 bool locked_ GUARDED_BY(crit_); | |
76 }; | |
77 | |
78 // Derive from this for specialized data | |
79 // App manages lifetime, except when messages are purged | |
80 | |
81 class MessageData { | |
82 public: | |
83 MessageData() {} | |
84 virtual ~MessageData() {} | |
85 }; | |
86 | |
87 template <class T> | |
88 class TypedMessageData : public MessageData { | |
89 public: | |
90 explicit TypedMessageData(const T& data) : data_(data) { } | |
91 const T& data() const { return data_; } | |
92 T& data() { return data_; } | |
93 private: | |
94 T data_; | |
95 }; | |
96 | |
97 // Like TypedMessageData, but for pointers that require a delete. | |
98 template <class T> | |
99 class ScopedMessageData : public MessageData { | |
100 public: | |
101 explicit ScopedMessageData(std::unique_ptr<T> data) | |
102 : data_(std::move(data)) {} | |
103 // Deprecated. | |
104 // TODO(deadbeef): Remove this once downstream applications stop using it. | |
105 explicit ScopedMessageData(T* data) : data_(data) {} | |
106 // Deprecated. | |
107 // TODO(deadbeef): Returning a reference to a unique ptr? Why. Get rid of | |
108 // this once downstream applications stop using it, then rename inner_data to | |
109 // just data. | |
110 const std::unique_ptr<T>& data() const { return data_; } | |
111 std::unique_ptr<T>& data() { return data_; } | |
112 | |
113 const T& inner_data() const { return *data_; } | |
114 T& inner_data() { return *data_; } | |
115 | |
116 private: | |
117 std::unique_ptr<T> data_; | |
118 }; | |
119 | |
120 // Like ScopedMessageData, but for reference counted pointers. | |
121 template <class T> | |
122 class ScopedRefMessageData : public MessageData { | |
123 public: | |
124 explicit ScopedRefMessageData(T* data) : data_(data) { } | |
125 const scoped_refptr<T>& data() const { return data_; } | |
126 scoped_refptr<T>& data() { return data_; } | |
127 private: | |
128 scoped_refptr<T> data_; | |
129 }; | |
130 | |
131 template<class T> | |
132 inline MessageData* WrapMessageData(const T& data) { | |
133 return new TypedMessageData<T>(data); | |
134 } | |
135 | |
136 template<class T> | |
137 inline const T& UseMessageData(MessageData* data) { | |
138 return static_cast< TypedMessageData<T>* >(data)->data(); | |
139 } | |
140 | |
141 template<class T> | |
142 class DisposeData : public MessageData { | |
143 public: | |
144 explicit DisposeData(T* data) : data_(data) { } | |
145 virtual ~DisposeData() { delete data_; } | |
146 private: | |
147 T* data_; | |
148 }; | |
149 | |
150 const uint32_t MQID_ANY = static_cast<uint32_t>(-1); | |
151 const uint32_t MQID_DISPOSE = static_cast<uint32_t>(-2); | |
152 | |
153 // No destructor | |
154 | |
155 struct Message { | |
156 Message() | |
157 : phandler(nullptr), message_id(0), pdata(nullptr), ts_sensitive(0) {} | |
158 inline bool Match(MessageHandler* handler, uint32_t id) const { | |
159 return (handler == nullptr || handler == phandler) && | |
160 (id == MQID_ANY || id == message_id); | |
161 } | |
162 Location posted_from; | |
163 MessageHandler *phandler; | |
164 uint32_t message_id; | |
165 MessageData *pdata; | |
166 int64_t ts_sensitive; | |
167 }; | |
168 | |
169 typedef std::list<Message> MessageList; | |
170 | |
171 // DelayedMessage goes into a priority queue, sorted by trigger time. Messages | |
172 // with the same trigger time are processed in num_ (FIFO) order. | |
173 | |
174 class DelayedMessage { | |
175 public: | |
176 DelayedMessage(int64_t delay, | |
177 int64_t trigger, | |
178 uint32_t num, | |
179 const Message& msg) | |
180 : cmsDelay_(delay), msTrigger_(trigger), num_(num), msg_(msg) {} | |
181 | |
182 bool operator< (const DelayedMessage& dmsg) const { | |
183 return (dmsg.msTrigger_ < msTrigger_) | |
184 || ((dmsg.msTrigger_ == msTrigger_) && (dmsg.num_ < num_)); | |
185 } | |
186 | |
187 int64_t cmsDelay_; // for debugging | |
188 int64_t msTrigger_; | |
189 uint32_t num_; | |
190 Message msg_; | |
191 }; | |
192 | |
193 class MessageQueue { | |
194 public: | |
195 static const int kForever = -1; | |
196 | |
197 // Create a new MessageQueue and optionally assign it to the passed | |
198 // SocketServer. Subclasses that override Clear should pass false for | |
199 // init_queue and call DoInit() from their constructor to prevent races | |
200 // with the MessageQueueManager using the object while the vtable is still | |
201 // being created. | |
202 MessageQueue(SocketServer* ss, bool init_queue); | |
203 MessageQueue(std::unique_ptr<SocketServer> ss, bool init_queue); | |
204 | |
205 // NOTE: SUBCLASSES OF MessageQueue THAT OVERRIDE Clear MUST CALL | |
206 // DoDestroy() IN THEIR DESTRUCTORS! This is required to avoid a data race | |
207 // between the destructor modifying the vtable, and the MessageQueueManager | |
208 // calling Clear on the object from a different thread. | |
209 virtual ~MessageQueue(); | |
210 | |
211 SocketServer* socketserver(); | |
212 | |
213 // Note: The behavior of MessageQueue has changed. When a MQ is stopped, | |
214 // futher Posts and Sends will fail. However, any pending Sends and *ready* | |
215 // Posts (as opposed to unexpired delayed Posts) will be delivered before | |
216 // Get (or Peek) returns false. By guaranteeing delivery of those messages, | |
217 // we eliminate the race condition when an MessageHandler and MessageQueue | |
218 // may be destroyed independently of each other. | |
219 virtual void Quit(); | |
220 virtual bool IsQuitting(); | |
221 virtual void Restart(); | |
222 // Not all message queues actually process messages (such as SignalThread). | |
223 // In those cases, it's important to know, before posting, that it won't be | |
224 // Processed. Normally, this would be true until IsQuitting() is true. | |
225 virtual bool IsProcessingMessages(); | |
226 | |
227 // Get() will process I/O until: | |
228 // 1) A message is available (returns true) | |
229 // 2) cmsWait seconds have elapsed (returns false) | |
230 // 3) Stop() is called (returns false) | |
231 virtual bool Get(Message *pmsg, int cmsWait = kForever, | |
232 bool process_io = true); | |
233 virtual bool Peek(Message *pmsg, int cmsWait = 0); | |
234 virtual void Post(const Location& posted_from, | |
235 MessageHandler* phandler, | |
236 uint32_t id = 0, | |
237 MessageData* pdata = nullptr, | |
238 bool time_sensitive = false); | |
239 virtual void PostDelayed(const Location& posted_from, | |
240 int cmsDelay, | |
241 MessageHandler* phandler, | |
242 uint32_t id = 0, | |
243 MessageData* pdata = nullptr); | |
244 virtual void PostAt(const Location& posted_from, | |
245 int64_t tstamp, | |
246 MessageHandler* phandler, | |
247 uint32_t id = 0, | |
248 MessageData* pdata = nullptr); | |
249 // TODO(honghaiz): Remove this when all the dependencies are removed. | |
250 virtual void PostAt(const Location& posted_from, | |
251 uint32_t tstamp, | |
252 MessageHandler* phandler, | |
253 uint32_t id = 0, | |
254 MessageData* pdata = nullptr); | |
255 virtual void Clear(MessageHandler* phandler, | |
256 uint32_t id = MQID_ANY, | |
257 MessageList* removed = nullptr); | |
258 virtual void Dispatch(Message *pmsg); | |
259 virtual void ReceiveSends(); | |
260 | |
261 // Amount of time until the next message can be retrieved | |
262 virtual int GetDelay(); | |
263 | |
264 bool empty() const { return size() == 0u; } | |
265 size_t size() const { | |
266 CritScope cs(&crit_); // msgq_.size() is not thread safe. | |
267 return msgq_.size() + dmsgq_.size() + (fPeekKeep_ ? 1u : 0u); | |
268 } | |
269 | |
270 // Internally posts a message which causes the doomed object to be deleted | |
271 template<class T> void Dispose(T* doomed) { | |
272 if (doomed) { | |
273 Post(RTC_FROM_HERE, nullptr, MQID_DISPOSE, new DisposeData<T>(doomed)); | |
274 } | |
275 } | |
276 | |
277 // When this signal is sent out, any references to this queue should | |
278 // no longer be used. | |
279 sigslot::signal0<> SignalQueueDestroyed; | |
280 | |
281 protected: | |
282 class PriorityQueue : public std::priority_queue<DelayedMessage> { | |
283 public: | |
284 container_type& container() { return c; } | |
285 void reheap() { make_heap(c.begin(), c.end(), comp); } | |
286 }; | |
287 | |
288 void DoDelayPost(const Location& posted_from, | |
289 int64_t cmsDelay, | |
290 int64_t tstamp, | |
291 MessageHandler* phandler, | |
292 uint32_t id, | |
293 MessageData* pdata); | |
294 | |
295 // Perform initialization, subclasses must call this from their constructor | |
296 // if false was passed as init_queue to the MessageQueue constructor. | |
297 void DoInit(); | |
298 | |
299 // Perform cleanup, subclasses that override Clear must call this from the | |
300 // destructor. | |
301 void DoDestroy(); | |
302 | |
303 void WakeUpSocketServer(); | |
304 | |
305 bool fPeekKeep_; | |
306 Message msgPeek_; | |
307 MessageList msgq_ GUARDED_BY(crit_); | |
308 PriorityQueue dmsgq_ GUARDED_BY(crit_); | |
309 uint32_t dmsgq_next_num_ GUARDED_BY(crit_); | |
310 CriticalSection crit_; | |
311 bool fInitialized_; | |
312 bool fDestroyed_; | |
313 | |
314 private: | |
315 volatile int stop_; | |
316 | |
317 // The SocketServer might not be owned by MessageQueue. | |
318 SocketServer* const ss_; | |
319 // Used if SocketServer ownership lies with |this|. | |
320 std::unique_ptr<SocketServer> own_ss_; | |
321 | |
322 RTC_DISALLOW_IMPLICIT_CONSTRUCTORS(MessageQueue); | |
323 }; | |
324 | |
325 } // namespace rtc | |
326 | 18 |
327 #endif // WEBRTC_BASE_MESSAGEQUEUE_H_ | 19 #endif // WEBRTC_BASE_MESSAGEQUEUE_H_ |
OLD | NEW |