OLD | NEW |
1 /* | 1 /* |
2 * Copyright 2004 The WebRTC Project Authors. All rights reserved. | 2 * Copyright 2004 The WebRTC Project Authors. All rights reserved. |
3 * | 3 * |
4 * Use of this source code is governed by a BSD-style license | 4 * Use of this source code is governed by a BSD-style license |
5 * that can be found in the LICENSE file in the root of the source | 5 * that can be found in the LICENSE file in the root of the source |
6 * tree. An additional intellectual property rights grant can be found | 6 * tree. An additional intellectual property rights grant can be found |
7 * in the file PATENTS. All contributing project authors may | 7 * in the file PATENTS. All contributing project authors may |
8 * be found in the AUTHORS file in the root of the source tree. | 8 * be found in the AUTHORS file in the root of the source tree. |
9 */ | 9 */ |
10 | |
11 #if defined(WEBRTC_POSIX) | |
12 #include <sys/time.h> | |
13 #endif | |
14 | |
15 #include <algorithm> | 10 #include <algorithm> |
16 | 11 |
| 12 #include "webrtc/base/checks.h" |
17 #include "webrtc/base/common.h" | 13 #include "webrtc/base/common.h" |
18 #include "webrtc/base/logging.h" | 14 #include "webrtc/base/logging.h" |
19 #include "webrtc/base/messagequeue.h" | 15 #include "webrtc/base/messagequeue.h" |
20 #if defined(__native_client__) | |
21 #include "webrtc/base/nullsocketserver.h" | |
22 typedef rtc::NullSocketServer DefaultSocketServer; | |
23 #else | |
24 #include "webrtc/base/physicalsocketserver.h" | |
25 typedef rtc::PhysicalSocketServer DefaultSocketServer; | |
26 #endif | |
27 | 16 |
28 namespace rtc { | 17 namespace rtc { |
29 | 18 |
30 const uint32_t kMaxMsgLatency = 150; // 150 ms | 19 const uint32_t kMaxMsgLatency = 150; // 150 ms |
31 | 20 |
32 //------------------------------------------------------------------ | 21 //------------------------------------------------------------------ |
33 // MessageQueueManager | 22 // MessageQueueManager |
34 | 23 |
35 MessageQueueManager* MessageQueueManager::instance_ = NULL; | 24 MessageQueueManager* MessageQueueManager::instance_ = NULL; |
36 | 25 |
(...skipping 71 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
108 ASSERT(!crit_.CurrentThreadIsOwner()); // See note above. | 97 ASSERT(!crit_.CurrentThreadIsOwner()); // See note above. |
109 #endif | 98 #endif |
110 CritScope cs(&crit_); | 99 CritScope cs(&crit_); |
111 std::vector<MessageQueue *>::iterator iter; | 100 std::vector<MessageQueue *>::iterator iter; |
112 for (iter = message_queues_.begin(); iter != message_queues_.end(); iter++) | 101 for (iter = message_queues_.begin(); iter != message_queues_.end(); iter++) |
113 (*iter)->Clear(handler); | 102 (*iter)->Clear(handler); |
114 } | 103 } |
115 | 104 |
116 //------------------------------------------------------------------ | 105 //------------------------------------------------------------------ |
117 // MessageQueue | 106 // MessageQueue |
118 | |
119 MessageQueue::MessageQueue(SocketServer* ss, bool init_queue) | 107 MessageQueue::MessageQueue(SocketServer* ss, bool init_queue) |
120 : fStop_(false), fPeekKeep_(false), | 108 : fStop_(false), fPeekKeep_(false), |
121 dmsgq_next_num_(0), fInitialized_(false), fDestroyed_(false), ss_(ss) { | 109 dmsgq_next_num_(0), fInitialized_(false), fDestroyed_(false), ss_(ss) { |
122 if (!ss_) { | 110 RTC_DCHECK(ss); |
123 // Currently, MessageQueue holds a socket server, and is the base class for | 111 // Currently, MessageQueue holds a socket server, and is the base class for |
124 // Thread. It seems like it makes more sense for Thread to hold the socket | 112 // Thread. It seems like it makes more sense for Thread to hold the socket |
125 // server, and provide it to the MessageQueue, since the Thread controls | 113 // server, and provide it to the MessageQueue, since the Thread controls |
126 // the I/O model, and MQ is agnostic to those details. Anyway, this causes | 114 // the I/O model, and MQ is agnostic to those details. Anyway, this causes |
127 // messagequeue_unittest to depend on network libraries... yuck. | 115 // messagequeue_unittest to depend on network libraries... yuck. |
128 default_ss_.reset(new DefaultSocketServer()); | |
129 ss_ = default_ss_.get(); | |
130 } | |
131 ss_->SetMessageQueue(this); | 116 ss_->SetMessageQueue(this); |
132 if (init_queue) { | 117 if (init_queue) { |
133 DoInit(); | 118 DoInit(); |
134 } | 119 } |
135 } | 120 } |
136 | 121 |
| 122 MessageQueue::MessageQueue(std::unique_ptr<SocketServer> ss, bool init_queue) |
| 123 : MessageQueue(ss.get(), init_queue) { |
| 124 own_ss_ = std::move(ss); |
| 125 } |
| 126 |
137 MessageQueue::~MessageQueue() { | 127 MessageQueue::~MessageQueue() { |
138 DoDestroy(); | 128 DoDestroy(); |
139 } | 129 } |
140 | 130 |
141 void MessageQueue::DoInit() { | 131 void MessageQueue::DoInit() { |
142 if (fInitialized_) { | 132 if (fInitialized_) { |
143 return; | 133 return; |
144 } | 134 } |
145 | 135 |
146 fInitialized_ = true; | 136 fInitialized_ = true; |
(...skipping 24 matching lines...) Expand all Loading... |
171 return ss_; | 161 return ss_; |
172 } | 162 } |
173 | 163 |
174 void MessageQueue::set_socketserver(SocketServer* ss) { | 164 void MessageQueue::set_socketserver(SocketServer* ss) { |
175 // Need to lock exclusively here to prevent simultaneous modifications from | 165 // Need to lock exclusively here to prevent simultaneous modifications from |
176 // other threads. Can't be a shared lock to prevent races with other reading | 166 // other threads. Can't be a shared lock to prevent races with other reading |
177 // threads. | 167 // threads. |
178 // Other places that only read "ss_" can use a shared lock as simultaneous | 168 // Other places that only read "ss_" can use a shared lock as simultaneous |
179 // read access is allowed. | 169 // read access is allowed. |
180 ExclusiveScope es(&ss_lock_); | 170 ExclusiveScope es(&ss_lock_); |
181 ss_ = ss ? ss : default_ss_.get(); | 171 ss_ = ss ? ss : own_ss_.get(); |
182 ss_->SetMessageQueue(this); | 172 ss_->SetMessageQueue(this); |
183 } | 173 } |
184 | 174 |
185 void MessageQueue::WakeUpSocketServer() { | 175 void MessageQueue::WakeUpSocketServer() { |
186 SharedScope ss(&ss_lock_); | 176 SharedScope ss(&ss_lock_); |
187 ss_->WakeUp(); | 177 ss_->WakeUp(); |
188 } | 178 } |
189 | 179 |
190 void MessageQueue::Quit() { | 180 void MessageQueue::Quit() { |
191 fStop_ = true; | 181 fStop_ = true; |
(...skipping 256 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
448 } | 438 } |
449 dmsgq_.container().erase(new_end, dmsgq_.container().end()); | 439 dmsgq_.container().erase(new_end, dmsgq_.container().end()); |
450 dmsgq_.reheap(); | 440 dmsgq_.reheap(); |
451 } | 441 } |
452 | 442 |
453 void MessageQueue::Dispatch(Message *pmsg) { | 443 void MessageQueue::Dispatch(Message *pmsg) { |
454 pmsg->phandler->OnMessage(pmsg); | 444 pmsg->phandler->OnMessage(pmsg); |
455 } | 445 } |
456 | 446 |
457 } // namespace rtc | 447 } // namespace rtc |
OLD | NEW |