| OLD | NEW |
| 1 /* | 1 /* |
| 2 * Copyright 2004 The WebRTC Project Authors. All rights reserved. | 2 * Copyright 2004 The WebRTC Project Authors. All rights reserved. |
| 3 * | 3 * |
| 4 * Use of this source code is governed by a BSD-style license | 4 * Use of this source code is governed by a BSD-style license |
| 5 * that can be found in the LICENSE file in the root of the source | 5 * that can be found in the LICENSE file in the root of the source |
| 6 * tree. An additional intellectual property rights grant can be found | 6 * tree. An additional intellectual property rights grant can be found |
| 7 * in the file PATENTS. All contributing project authors may | 7 * in the file PATENTS. All contributing project authors may |
| 8 * be found in the AUTHORS file in the root of the source tree. | 8 * be found in the AUTHORS file in the root of the source tree. |
| 9 */ | 9 */ |
| 10 | 10 |
| 11 #ifndef WEBRTC_BASE_MESSAGEQUEUE_H_ | 11 #ifndef WEBRTC_BASE_MESSAGEQUEUE_H_ |
| 12 #define WEBRTC_BASE_MESSAGEQUEUE_H_ | 12 #define WEBRTC_BASE_MESSAGEQUEUE_H_ |
| 13 | 13 |
| 14 #include <string.h> | |
| 15 | 14 |
| 16 #include <algorithm> | 15 // This header is deprecated and is just left here temporarily during |
| 17 #include <list> | 16 // refactoring. See https://bugs.webrtc.org/7634 for more details. |
| 18 #include <memory> | 17 #include "webrtc/rtc_base/messagequeue.h" |
| 19 #include <queue> | |
| 20 #include <utility> | |
| 21 #include <vector> | |
| 22 | |
| 23 #include "webrtc/base/basictypes.h" | |
| 24 #include "webrtc/base/constructormagic.h" | |
| 25 #include "webrtc/base/criticalsection.h" | |
| 26 #include "webrtc/base/location.h" | |
| 27 #include "webrtc/base/messagehandler.h" | |
| 28 #include "webrtc/base/scoped_ref_ptr.h" | |
| 29 #include "webrtc/base/sigslot.h" | |
| 30 #include "webrtc/base/socketserver.h" | |
| 31 #include "webrtc/base/timeutils.h" | |
| 32 #include "webrtc/base/thread_annotations.h" | |
| 33 | |
| 34 namespace rtc { | |
| 35 | |
| 36 struct Message; | |
| 37 class MessageQueue; | |
| 38 | |
| 39 // MessageQueueManager does cleanup of of message queues | |
| 40 | |
| 41 class MessageQueueManager { | |
| 42 public: | |
| 43 static void Add(MessageQueue *message_queue); | |
| 44 static void Remove(MessageQueue *message_queue); | |
| 45 static void Clear(MessageHandler *handler); | |
| 46 | |
| 47 // For testing purposes, we expose whether or not the MessageQueueManager | |
| 48 // instance has been initialized. It has no other use relative to the rest of | |
| 49 // the functions of this class, which auto-initialize the underlying | |
| 50 // MessageQueueManager instance when necessary. | |
| 51 static bool IsInitialized(); | |
| 52 | |
| 53 // Mainly for testing purposes, for use with a simulated clock. | |
| 54 // Ensures that all message queues have processed delayed messages | |
| 55 // up until the current point in time. | |
| 56 static void ProcessAllMessageQueues(); | |
| 57 | |
| 58 private: | |
| 59 static MessageQueueManager* Instance(); | |
| 60 | |
| 61 MessageQueueManager(); | |
| 62 ~MessageQueueManager(); | |
| 63 | |
| 64 void AddInternal(MessageQueue *message_queue); | |
| 65 void RemoveInternal(MessageQueue *message_queue); | |
| 66 void ClearInternal(MessageHandler *handler); | |
| 67 void ProcessAllMessageQueuesInternal(); | |
| 68 | |
| 69 static MessageQueueManager* instance_; | |
| 70 // This list contains all live MessageQueues. | |
| 71 std::vector<MessageQueue*> message_queues_ GUARDED_BY(crit_); | |
| 72 | |
| 73 // Acquire this with DebugNonReentrantCritScope. | |
| 74 CriticalSection crit_; | |
| 75 bool locked_ GUARDED_BY(crit_); | |
| 76 }; | |
| 77 | |
| 78 // Derive from this for specialized data | |
| 79 // App manages lifetime, except when messages are purged | |
| 80 | |
| 81 class MessageData { | |
| 82 public: | |
| 83 MessageData() {} | |
| 84 virtual ~MessageData() {} | |
| 85 }; | |
| 86 | |
| 87 template <class T> | |
| 88 class TypedMessageData : public MessageData { | |
| 89 public: | |
| 90 explicit TypedMessageData(const T& data) : data_(data) { } | |
| 91 const T& data() const { return data_; } | |
| 92 T& data() { return data_; } | |
| 93 private: | |
| 94 T data_; | |
| 95 }; | |
| 96 | |
| 97 // Like TypedMessageData, but for pointers that require a delete. | |
| 98 template <class T> | |
| 99 class ScopedMessageData : public MessageData { | |
| 100 public: | |
| 101 explicit ScopedMessageData(std::unique_ptr<T> data) | |
| 102 : data_(std::move(data)) {} | |
| 103 // Deprecated. | |
| 104 // TODO(deadbeef): Remove this once downstream applications stop using it. | |
| 105 explicit ScopedMessageData(T* data) : data_(data) {} | |
| 106 // Deprecated. | |
| 107 // TODO(deadbeef): Returning a reference to a unique ptr? Why. Get rid of | |
| 108 // this once downstream applications stop using it, then rename inner_data to | |
| 109 // just data. | |
| 110 const std::unique_ptr<T>& data() const { return data_; } | |
| 111 std::unique_ptr<T>& data() { return data_; } | |
| 112 | |
| 113 const T& inner_data() const { return *data_; } | |
| 114 T& inner_data() { return *data_; } | |
| 115 | |
| 116 private: | |
| 117 std::unique_ptr<T> data_; | |
| 118 }; | |
| 119 | |
| 120 // Like ScopedMessageData, but for reference counted pointers. | |
| 121 template <class T> | |
| 122 class ScopedRefMessageData : public MessageData { | |
| 123 public: | |
| 124 explicit ScopedRefMessageData(T* data) : data_(data) { } | |
| 125 const scoped_refptr<T>& data() const { return data_; } | |
| 126 scoped_refptr<T>& data() { return data_; } | |
| 127 private: | |
| 128 scoped_refptr<T> data_; | |
| 129 }; | |
| 130 | |
| 131 template<class T> | |
| 132 inline MessageData* WrapMessageData(const T& data) { | |
| 133 return new TypedMessageData<T>(data); | |
| 134 } | |
| 135 | |
| 136 template<class T> | |
| 137 inline const T& UseMessageData(MessageData* data) { | |
| 138 return static_cast< TypedMessageData<T>* >(data)->data(); | |
| 139 } | |
| 140 | |
| 141 template<class T> | |
| 142 class DisposeData : public MessageData { | |
| 143 public: | |
| 144 explicit DisposeData(T* data) : data_(data) { } | |
| 145 virtual ~DisposeData() { delete data_; } | |
| 146 private: | |
| 147 T* data_; | |
| 148 }; | |
| 149 | |
| 150 const uint32_t MQID_ANY = static_cast<uint32_t>(-1); | |
| 151 const uint32_t MQID_DISPOSE = static_cast<uint32_t>(-2); | |
| 152 | |
| 153 // No destructor | |
| 154 | |
| 155 struct Message { | |
| 156 Message() | |
| 157 : phandler(nullptr), message_id(0), pdata(nullptr), ts_sensitive(0) {} | |
| 158 inline bool Match(MessageHandler* handler, uint32_t id) const { | |
| 159 return (handler == nullptr || handler == phandler) && | |
| 160 (id == MQID_ANY || id == message_id); | |
| 161 } | |
| 162 Location posted_from; | |
| 163 MessageHandler *phandler; | |
| 164 uint32_t message_id; | |
| 165 MessageData *pdata; | |
| 166 int64_t ts_sensitive; | |
| 167 }; | |
| 168 | |
| 169 typedef std::list<Message> MessageList; | |
| 170 | |
| 171 // DelayedMessage goes into a priority queue, sorted by trigger time. Messages | |
| 172 // with the same trigger time are processed in num_ (FIFO) order. | |
| 173 | |
| 174 class DelayedMessage { | |
| 175 public: | |
| 176 DelayedMessage(int64_t delay, | |
| 177 int64_t trigger, | |
| 178 uint32_t num, | |
| 179 const Message& msg) | |
| 180 : cmsDelay_(delay), msTrigger_(trigger), num_(num), msg_(msg) {} | |
| 181 | |
| 182 bool operator< (const DelayedMessage& dmsg) const { | |
| 183 return (dmsg.msTrigger_ < msTrigger_) | |
| 184 || ((dmsg.msTrigger_ == msTrigger_) && (dmsg.num_ < num_)); | |
| 185 } | |
| 186 | |
| 187 int64_t cmsDelay_; // for debugging | |
| 188 int64_t msTrigger_; | |
| 189 uint32_t num_; | |
| 190 Message msg_; | |
| 191 }; | |
| 192 | |
| 193 class MessageQueue { | |
| 194 public: | |
| 195 static const int kForever = -1; | |
| 196 | |
| 197 // Create a new MessageQueue and optionally assign it to the passed | |
| 198 // SocketServer. Subclasses that override Clear should pass false for | |
| 199 // init_queue and call DoInit() from their constructor to prevent races | |
| 200 // with the MessageQueueManager using the object while the vtable is still | |
| 201 // being created. | |
| 202 MessageQueue(SocketServer* ss, bool init_queue); | |
| 203 MessageQueue(std::unique_ptr<SocketServer> ss, bool init_queue); | |
| 204 | |
| 205 // NOTE: SUBCLASSES OF MessageQueue THAT OVERRIDE Clear MUST CALL | |
| 206 // DoDestroy() IN THEIR DESTRUCTORS! This is required to avoid a data race | |
| 207 // between the destructor modifying the vtable, and the MessageQueueManager | |
| 208 // calling Clear on the object from a different thread. | |
| 209 virtual ~MessageQueue(); | |
| 210 | |
| 211 SocketServer* socketserver(); | |
| 212 | |
| 213 // Note: The behavior of MessageQueue has changed. When a MQ is stopped, | |
| 214 // futher Posts and Sends will fail. However, any pending Sends and *ready* | |
| 215 // Posts (as opposed to unexpired delayed Posts) will be delivered before | |
| 216 // Get (or Peek) returns false. By guaranteeing delivery of those messages, | |
| 217 // we eliminate the race condition when an MessageHandler and MessageQueue | |
| 218 // may be destroyed independently of each other. | |
| 219 virtual void Quit(); | |
| 220 virtual bool IsQuitting(); | |
| 221 virtual void Restart(); | |
| 222 // Not all message queues actually process messages (such as SignalThread). | |
| 223 // In those cases, it's important to know, before posting, that it won't be | |
| 224 // Processed. Normally, this would be true until IsQuitting() is true. | |
| 225 virtual bool IsProcessingMessages(); | |
| 226 | |
| 227 // Get() will process I/O until: | |
| 228 // 1) A message is available (returns true) | |
| 229 // 2) cmsWait seconds have elapsed (returns false) | |
| 230 // 3) Stop() is called (returns false) | |
| 231 virtual bool Get(Message *pmsg, int cmsWait = kForever, | |
| 232 bool process_io = true); | |
| 233 virtual bool Peek(Message *pmsg, int cmsWait = 0); | |
| 234 virtual void Post(const Location& posted_from, | |
| 235 MessageHandler* phandler, | |
| 236 uint32_t id = 0, | |
| 237 MessageData* pdata = nullptr, | |
| 238 bool time_sensitive = false); | |
| 239 virtual void PostDelayed(const Location& posted_from, | |
| 240 int cmsDelay, | |
| 241 MessageHandler* phandler, | |
| 242 uint32_t id = 0, | |
| 243 MessageData* pdata = nullptr); | |
| 244 virtual void PostAt(const Location& posted_from, | |
| 245 int64_t tstamp, | |
| 246 MessageHandler* phandler, | |
| 247 uint32_t id = 0, | |
| 248 MessageData* pdata = nullptr); | |
| 249 // TODO(honghaiz): Remove this when all the dependencies are removed. | |
| 250 virtual void PostAt(const Location& posted_from, | |
| 251 uint32_t tstamp, | |
| 252 MessageHandler* phandler, | |
| 253 uint32_t id = 0, | |
| 254 MessageData* pdata = nullptr); | |
| 255 virtual void Clear(MessageHandler* phandler, | |
| 256 uint32_t id = MQID_ANY, | |
| 257 MessageList* removed = nullptr); | |
| 258 virtual void Dispatch(Message *pmsg); | |
| 259 virtual void ReceiveSends(); | |
| 260 | |
| 261 // Amount of time until the next message can be retrieved | |
| 262 virtual int GetDelay(); | |
| 263 | |
| 264 bool empty() const { return size() == 0u; } | |
| 265 size_t size() const { | |
| 266 CritScope cs(&crit_); // msgq_.size() is not thread safe. | |
| 267 return msgq_.size() + dmsgq_.size() + (fPeekKeep_ ? 1u : 0u); | |
| 268 } | |
| 269 | |
| 270 // Internally posts a message which causes the doomed object to be deleted | |
| 271 template<class T> void Dispose(T* doomed) { | |
| 272 if (doomed) { | |
| 273 Post(RTC_FROM_HERE, nullptr, MQID_DISPOSE, new DisposeData<T>(doomed)); | |
| 274 } | |
| 275 } | |
| 276 | |
| 277 // When this signal is sent out, any references to this queue should | |
| 278 // no longer be used. | |
| 279 sigslot::signal0<> SignalQueueDestroyed; | |
| 280 | |
| 281 protected: | |
| 282 class PriorityQueue : public std::priority_queue<DelayedMessage> { | |
| 283 public: | |
| 284 container_type& container() { return c; } | |
| 285 void reheap() { make_heap(c.begin(), c.end(), comp); } | |
| 286 }; | |
| 287 | |
| 288 void DoDelayPost(const Location& posted_from, | |
| 289 int64_t cmsDelay, | |
| 290 int64_t tstamp, | |
| 291 MessageHandler* phandler, | |
| 292 uint32_t id, | |
| 293 MessageData* pdata); | |
| 294 | |
| 295 // Perform initialization, subclasses must call this from their constructor | |
| 296 // if false was passed as init_queue to the MessageQueue constructor. | |
| 297 void DoInit(); | |
| 298 | |
| 299 // Perform cleanup, subclasses that override Clear must call this from the | |
| 300 // destructor. | |
| 301 void DoDestroy(); | |
| 302 | |
| 303 void WakeUpSocketServer(); | |
| 304 | |
| 305 bool fPeekKeep_; | |
| 306 Message msgPeek_; | |
| 307 MessageList msgq_ GUARDED_BY(crit_); | |
| 308 PriorityQueue dmsgq_ GUARDED_BY(crit_); | |
| 309 uint32_t dmsgq_next_num_ GUARDED_BY(crit_); | |
| 310 CriticalSection crit_; | |
| 311 bool fInitialized_; | |
| 312 bool fDestroyed_; | |
| 313 | |
| 314 private: | |
| 315 volatile int stop_; | |
| 316 | |
| 317 // The SocketServer might not be owned by MessageQueue. | |
| 318 SocketServer* const ss_; | |
| 319 // Used if SocketServer ownership lies with |this|. | |
| 320 std::unique_ptr<SocketServer> own_ss_; | |
| 321 | |
| 322 RTC_DISALLOW_IMPLICIT_CONSTRUCTORS(MessageQueue); | |
| 323 }; | |
| 324 | |
| 325 } // namespace rtc | |
| 326 | 18 |
| 327 #endif // WEBRTC_BASE_MESSAGEQUEUE_H_ | 19 #endif // WEBRTC_BASE_MESSAGEQUEUE_H_ |
| OLD | NEW |