OLD | NEW |
1 /* | 1 /* |
2 * Copyright 2016 The WebRTC Project Authors. All rights reserved. | 2 * Copyright 2016 The WebRTC Project Authors. All rights reserved. |
3 * | 3 * |
4 * Use of this source code is governed by a BSD-style license | 4 * Use of this source code is governed by a BSD-style license |
5 * that can be found in the LICENSE file in the root of the source | 5 * that can be found in the LICENSE file in the root of the source |
6 * tree. An additional intellectual property rights grant can be found | 6 * tree. An additional intellectual property rights grant can be found |
7 * in the file PATENTS. All contributing project authors may | 7 * in the file PATENTS. All contributing project authors may |
8 * be found in the AUTHORS file in the root of the source tree. | 8 * be found in the AUTHORS file in the root of the source tree. |
9 */ | 9 */ |
10 | 10 |
11 #include "webrtc/base/task_queue.h" | 11 #include "webrtc/base/task_queue.h" |
12 | 12 |
| 13 #include <mmsystem.h> |
13 #include <string.h> | 14 #include <string.h> |
14 | 15 |
| 16 #include <algorithm> |
| 17 |
| 18 #include "webrtc/base/arraysize.h" |
15 #include "webrtc/base/checks.h" | 19 #include "webrtc/base/checks.h" |
16 #include "webrtc/base/logging.h" | 20 #include "webrtc/base/logging.h" |
17 | 21 |
18 namespace rtc { | 22 namespace rtc { |
19 namespace { | 23 namespace { |
20 #define WM_RUN_TASK WM_USER + 1 | 24 #define WM_RUN_TASK WM_USER + 1 |
21 #define WM_QUEUE_DELAYED_TASK WM_USER + 2 | 25 #define WM_QUEUE_DELAYED_TASK WM_USER + 2 |
22 | 26 |
23 DWORD g_queue_ptr_tls = 0; | 27 DWORD g_queue_ptr_tls = 0; |
24 | 28 |
25 BOOL CALLBACK InitializeTls(PINIT_ONCE init_once, void* param, void** context) { | 29 BOOL CALLBACK InitializeTls(PINIT_ONCE init_once, void* param, void** context) { |
26 g_queue_ptr_tls = TlsAlloc(); | 30 g_queue_ptr_tls = TlsAlloc(); |
27 return TRUE; | 31 return TRUE; |
28 } | 32 } |
29 | 33 |
30 DWORD GetQueuePtrTls() { | 34 DWORD GetQueuePtrTls() { |
31 static INIT_ONCE init_once = INIT_ONCE_STATIC_INIT; | 35 static INIT_ONCE init_once = INIT_ONCE_STATIC_INIT; |
32 InitOnceExecuteOnce(&init_once, InitializeTls, nullptr, nullptr); | 36 ::InitOnceExecuteOnce(&init_once, InitializeTls, nullptr, nullptr); |
33 return g_queue_ptr_tls; | 37 return g_queue_ptr_tls; |
34 } | 38 } |
35 | 39 |
36 struct ThreadStartupData { | 40 struct ThreadStartupData { |
37 Event* started; | 41 Event* started; |
38 void* thread_context; | 42 void* thread_context; |
39 }; | 43 }; |
40 | 44 |
41 void CALLBACK InitializeQueueThread(ULONG_PTR param) { | 45 void CALLBACK InitializeQueueThread(ULONG_PTR param) { |
42 MSG msg; | 46 MSG msg; |
43 PeekMessage(&msg, NULL, WM_USER, WM_USER, PM_NOREMOVE); | 47 ::PeekMessage(&msg, nullptr, WM_USER, WM_USER, PM_NOREMOVE); |
44 ThreadStartupData* data = reinterpret_cast<ThreadStartupData*>(param); | 48 ThreadStartupData* data = reinterpret_cast<ThreadStartupData*>(param); |
45 TlsSetValue(GetQueuePtrTls(), data->thread_context); | 49 ::TlsSetValue(GetQueuePtrTls(), data->thread_context); |
46 data->started->Set(); | 50 data->started->Set(); |
47 } | 51 } |
48 } // namespace | 52 } // namespace |
49 | 53 |
| 54 class TaskQueue::MultimediaTimer { |
| 55 public: |
| 56 // kMaxTimers defines the limit of how many MultimediaTimer instances should |
| 57 // be created. |
| 58 // Background: The maximum number of supported handles for Wait functions, is |
| 59 // MAXIMUM_WAIT_OBJECTS - 1 (63). |
| 60 // There are some ways to work around the limitation but as it turns out, the |
| 61 // limit of concurrently active multimedia timers per process, is much lower, |
| 62 // or 16. So there isn't much value in going to the lenghts required to |
| 63 // overcome the Wait limitations. |
| 64 // kMaxTimers is larger than 16 though since it is possible that 'complete' or |
| 65 // signaled timers that haven't been handled, are counted as part of |
| 66 // kMaxTimers and thus a multimedia timer can actually be queued even though |
| 67 // as far as we're concerned, there are more than 16 that are pending. |
| 68 static const int kMaxTimers = MAXIMUM_WAIT_OBJECTS - 1; |
| 69 |
| 70 // Controls how many MultimediaTimer instances a queue can hold before |
| 71 // attempting to garbage collect (GC) timers that aren't in use. |
| 72 static const int kInstanceThresholdGC = 8; |
| 73 |
| 74 MultimediaTimer() : event_(::CreateEvent(nullptr, false, false, nullptr)) {} |
| 75 |
| 76 MultimediaTimer(MultimediaTimer&& timer) |
| 77 : event_(timer.event_), |
| 78 timer_id_(timer.timer_id_), |
| 79 task_(std::move(timer.task_)) { |
| 80 RTC_DCHECK(event_); |
| 81 timer.event_ = nullptr; |
| 82 timer.timer_id_ = 0; |
| 83 } |
| 84 |
| 85 ~MultimediaTimer() { Close(); } |
| 86 |
| 87 // Implementing this operator is required because of the way |
| 88 // some stl algorithms work, such as std::rotate(). |
| 89 MultimediaTimer& operator=(MultimediaTimer&& timer) { |
| 90 if (this != &timer) { |
| 91 Close(); |
| 92 event_ = timer.event_; |
| 93 timer.event_ = nullptr; |
| 94 task_ = std::move(timer.task_); |
| 95 timer_id_ = timer.timer_id_; |
| 96 timer.timer_id_ = 0; |
| 97 } |
| 98 return *this; |
| 99 } |
| 100 |
| 101 bool StartOneShotTimer(std::unique_ptr<QueuedTask> task, UINT delay_ms) { |
| 102 RTC_DCHECK_EQ(0, timer_id_); |
| 103 RTC_DCHECK(event_ != nullptr); |
| 104 RTC_DCHECK(!task_.get()); |
| 105 RTC_DCHECK(task.get()); |
| 106 task_ = std::move(task); |
| 107 timer_id_ = |
| 108 ::timeSetEvent(delay_ms, 0, reinterpret_cast<LPTIMECALLBACK>(event_), 0, |
| 109 TIME_ONESHOT | TIME_CALLBACK_EVENT_SET); |
| 110 return timer_id_ != 0; |
| 111 } |
| 112 |
| 113 std::unique_ptr<QueuedTask> Cancel() { |
| 114 if (timer_id_) { |
| 115 ::timeKillEvent(timer_id_); |
| 116 timer_id_ = 0; |
| 117 } |
| 118 return std::move(task_); |
| 119 } |
| 120 |
| 121 void OnEventSignaled() { |
| 122 RTC_DCHECK_NE(0, timer_id_); |
| 123 timer_id_ = 0; |
| 124 task_->Run() ? task_.reset() : static_cast<void>(task_.release()); |
| 125 } |
| 126 |
| 127 HANDLE event() const { return event_; } |
| 128 |
| 129 bool is_active() const { return timer_id_ != 0; } |
| 130 |
| 131 private: |
| 132 void Close() { |
| 133 Cancel(); |
| 134 |
| 135 if (event_) { |
| 136 ::CloseHandle(event_); |
| 137 event_ = nullptr; |
| 138 } |
| 139 } |
| 140 |
| 141 HANDLE event_ = nullptr; |
| 142 MMRESULT timer_id_ = 0; |
| 143 std::unique_ptr<QueuedTask> task_; |
| 144 |
| 145 RTC_DISALLOW_COPY_AND_ASSIGN(MultimediaTimer); |
| 146 }; |
| 147 |
50 TaskQueue::TaskQueue(const char* queue_name) | 148 TaskQueue::TaskQueue(const char* queue_name) |
51 : thread_(&TaskQueue::ThreadMain, this, queue_name) { | 149 : thread_(&TaskQueue::ThreadMain, this, queue_name) { |
52 RTC_DCHECK(queue_name); | 150 RTC_DCHECK(queue_name); |
53 thread_.Start(); | 151 thread_.Start(); |
54 Event event(false, false); | 152 Event event(false, false); |
55 ThreadStartupData startup = {&event, this}; | 153 ThreadStartupData startup = {&event, this}; |
56 RTC_CHECK(thread_.QueueAPC(&InitializeQueueThread, | 154 RTC_CHECK(thread_.QueueAPC(&InitializeQueueThread, |
57 reinterpret_cast<ULONG_PTR>(&startup))); | 155 reinterpret_cast<ULONG_PTR>(&startup))); |
58 event.Wait(Event::kForever); | 156 event.Wait(Event::kForever); |
59 } | 157 } |
60 | 158 |
61 TaskQueue::~TaskQueue() { | 159 TaskQueue::~TaskQueue() { |
62 RTC_DCHECK(!IsCurrent()); | 160 RTC_DCHECK(!IsCurrent()); |
63 while (!PostThreadMessage(thread_.GetThreadRef(), WM_QUIT, 0, 0)) { | 161 while (!::PostThreadMessage(thread_.GetThreadRef(), WM_QUIT, 0, 0)) { |
64 RTC_CHECK_EQ(ERROR_NOT_ENOUGH_QUOTA, ::GetLastError()); | 162 RTC_CHECK_EQ(ERROR_NOT_ENOUGH_QUOTA, ::GetLastError()); |
65 Sleep(1); | 163 Sleep(1); |
66 } | 164 } |
67 thread_.Stop(); | 165 thread_.Stop(); |
68 } | 166 } |
69 | 167 |
70 // static | 168 // static |
71 TaskQueue* TaskQueue::Current() { | 169 TaskQueue* TaskQueue::Current() { |
72 return static_cast<TaskQueue*>(TlsGetValue(GetQueuePtrTls())); | 170 return static_cast<TaskQueue*>(::TlsGetValue(GetQueuePtrTls())); |
73 } | 171 } |
74 | 172 |
75 // static | 173 // static |
76 bool TaskQueue::IsCurrent(const char* queue_name) { | 174 bool TaskQueue::IsCurrent(const char* queue_name) { |
77 TaskQueue* current = Current(); | 175 TaskQueue* current = Current(); |
78 return current && current->thread_.name().compare(queue_name) == 0; | 176 return current && current->thread_.name().compare(queue_name) == 0; |
79 } | 177 } |
80 | 178 |
81 bool TaskQueue::IsCurrent() const { | 179 bool TaskQueue::IsCurrent() const { |
82 return IsThreadRefEqual(thread_.GetThreadRef(), CurrentThreadRef()); | 180 return IsThreadRefEqual(thread_.GetThreadRef(), CurrentThreadRef()); |
83 } | 181 } |
84 | 182 |
85 void TaskQueue::PostTask(std::unique_ptr<QueuedTask> task) { | 183 void TaskQueue::PostTask(std::unique_ptr<QueuedTask> task) { |
86 if (PostThreadMessage(thread_.GetThreadRef(), WM_RUN_TASK, 0, | 184 if (::PostThreadMessage(thread_.GetThreadRef(), WM_RUN_TASK, 0, |
87 reinterpret_cast<LPARAM>(task.get()))) { | 185 reinterpret_cast<LPARAM>(task.get()))) { |
88 task.release(); | 186 task.release(); |
89 } | 187 } |
90 } | 188 } |
91 | 189 |
92 void TaskQueue::PostDelayedTask(std::unique_ptr<QueuedTask> task, | 190 void TaskQueue::PostDelayedTask(std::unique_ptr<QueuedTask> task, |
93 uint32_t milliseconds) { | 191 uint32_t milliseconds) { |
94 WPARAM wparam; | 192 WPARAM wparam; |
95 #if defined(_WIN64) | 193 #if defined(_WIN64) |
96 // GetTickCount() returns a fairly coarse tick count (resolution or about 8ms) | 194 // GetTickCount() returns a fairly coarse tick count (resolution or about 8ms) |
97 // so this compensation isn't that accurate, but since we have unused 32 bits | 195 // so this compensation isn't that accurate, but since we have unused 32 bits |
98 // on Win64, we might as well use them. | 196 // on Win64, we might as well use them. |
99 wparam = (static_cast<WPARAM>(::GetTickCount()) << 32) | milliseconds; | 197 wparam = (static_cast<WPARAM>(::GetTickCount()) << 32) | milliseconds; |
100 #else | 198 #else |
101 wparam = milliseconds; | 199 wparam = milliseconds; |
102 #endif | 200 #endif |
103 if (PostThreadMessage(thread_.GetThreadRef(), WM_QUEUE_DELAYED_TASK, wparam, | 201 if (::PostThreadMessage(thread_.GetThreadRef(), WM_QUEUE_DELAYED_TASK, wparam, |
104 reinterpret_cast<LPARAM>(task.get()))) { | 202 reinterpret_cast<LPARAM>(task.get()))) { |
105 task.release(); | 203 task.release(); |
106 } | 204 } |
107 } | 205 } |
108 | 206 |
109 void TaskQueue::PostTaskAndReply(std::unique_ptr<QueuedTask> task, | 207 void TaskQueue::PostTaskAndReply(std::unique_ptr<QueuedTask> task, |
110 std::unique_ptr<QueuedTask> reply, | 208 std::unique_ptr<QueuedTask> reply, |
111 TaskQueue* reply_queue) { | 209 TaskQueue* reply_queue) { |
112 QueuedTask* task_ptr = task.release(); | 210 QueuedTask* task_ptr = task.release(); |
113 QueuedTask* reply_task_ptr = reply.release(); | 211 QueuedTask* reply_task_ptr = reply.release(); |
114 DWORD reply_thread_id = reply_queue->thread_.GetThreadRef(); | 212 DWORD reply_thread_id = reply_queue->thread_.GetThreadRef(); |
115 PostTask([task_ptr, reply_task_ptr, reply_thread_id]() { | 213 PostTask([task_ptr, reply_task_ptr, reply_thread_id]() { |
116 if (task_ptr->Run()) | 214 if (task_ptr->Run()) |
117 delete task_ptr; | 215 delete task_ptr; |
118 // If the thread's message queue is full, we can't queue the task and will | 216 // If the thread's message queue is full, we can't queue the task and will |
119 // have to drop it (i.e. delete). | 217 // have to drop it (i.e. delete). |
120 if (!PostThreadMessage(reply_thread_id, WM_RUN_TASK, 0, | 218 if (!::PostThreadMessage(reply_thread_id, WM_RUN_TASK, 0, |
121 reinterpret_cast<LPARAM>(reply_task_ptr))) { | 219 reinterpret_cast<LPARAM>(reply_task_ptr))) { |
122 delete reply_task_ptr; | 220 delete reply_task_ptr; |
123 } | 221 } |
124 }); | 222 }); |
125 } | 223 } |
126 | 224 |
127 void TaskQueue::PostTaskAndReply(std::unique_ptr<QueuedTask> task, | 225 void TaskQueue::PostTaskAndReply(std::unique_ptr<QueuedTask> task, |
128 std::unique_ptr<QueuedTask> reply) { | 226 std::unique_ptr<QueuedTask> reply) { |
129 return PostTaskAndReply(std::move(task), std::move(reply), Current()); | 227 return PostTaskAndReply(std::move(task), std::move(reply), Current()); |
130 } | 228 } |
131 | 229 |
132 // static | 230 // static |
133 bool TaskQueue::ThreadMain(void* context) { | 231 bool TaskQueue::ThreadMain(void* context) { |
| 232 HANDLE timer_handles[MultimediaTimer::kMaxTimers]; |
| 233 // Active multimedia timers. |
| 234 std::vector<MultimediaTimer> mm_timers; |
| 235 // Tasks that have been queued by using SetTimer/WM_TIMER. |
134 DelayedTasks delayed_tasks; | 236 DelayedTasks delayed_tasks; |
| 237 |
135 while (true) { | 238 while (true) { |
136 DWORD result = ::MsgWaitForMultipleObjectsEx(0, nullptr, INFINITE, | 239 RTC_DCHECK(mm_timers.size() <= arraysize(timer_handles)); |
| 240 DWORD count = 0; |
| 241 for (const auto& t : mm_timers) { |
| 242 if (!t.is_active()) |
| 243 break; |
| 244 timer_handles[count++] = t.event(); |
| 245 } |
| 246 // Make sure we do an alertable wait as that's required to allow APCs to run |
| 247 // (e.g. required for InitializeQueueThread and stopping the thread in |
| 248 // PlatformThread). |
| 249 DWORD result = ::MsgWaitForMultipleObjectsEx(count, timer_handles, INFINITE, |
137 QS_ALLEVENTS, MWMO_ALERTABLE); | 250 QS_ALLEVENTS, MWMO_ALERTABLE); |
138 RTC_CHECK_NE(WAIT_FAILED, result); | 251 RTC_CHECK_NE(WAIT_FAILED, result); |
139 if (result == WAIT_OBJECT_0) { | 252 // If we're not waiting for any timers, then count will be equal to |
140 if (!ProcessQueuedMessages(&delayed_tasks)) | 253 // WAIT_OBJECT_0. If we're waiting for timers, then |count| represents |
| 254 // "One more than the number of timers", which means that there's a |
| 255 // message in the queue that needs to be handled. |
| 256 // If |result| is less than |count|, then its value will be the index of the |
| 257 // timer that has been signaled. |
| 258 if (result == (WAIT_OBJECT_0 + count)) { |
| 259 if (!ProcessQueuedMessages(&delayed_tasks, &mm_timers)) |
141 break; | 260 break; |
| 261 } else if (result < (WAIT_OBJECT_0 + count)) { |
| 262 mm_timers[result].OnEventSignaled(); |
| 263 RTC_DCHECK(!mm_timers[result].is_active()); |
| 264 // Reuse timer events by moving inactive timers to the back of the vector. |
| 265 // When new delayed tasks are queued, they'll get reused. |
| 266 if (mm_timers.size() > 1) { |
| 267 auto it = mm_timers.begin() + result; |
| 268 std::rotate(it, it + 1, mm_timers.end()); |
| 269 } |
| 270 |
| 271 // Collect some garbage. |
| 272 if (mm_timers.size() > MultimediaTimer::kInstanceThresholdGC) { |
| 273 const auto inactive = std::find_if( |
| 274 mm_timers.begin(), mm_timers.end(), |
| 275 [](const MultimediaTimer& t) { return !t.is_active(); }); |
| 276 if (inactive != mm_timers.end()) { |
| 277 // Since inactive timers are always moved to the back, we can |
| 278 // safely delete all timers following the first inactive one. |
| 279 mm_timers.erase(inactive, mm_timers.end()); |
| 280 } |
| 281 } |
142 } else { | 282 } else { |
143 RTC_DCHECK_EQ(WAIT_IO_COMPLETION, result); | 283 RTC_DCHECK_EQ(WAIT_IO_COMPLETION, result); |
144 } | 284 } |
145 } | 285 } |
| 286 |
146 return false; | 287 return false; |
147 } | 288 } |
148 | 289 |
149 // static | 290 // static |
150 bool TaskQueue::ProcessQueuedMessages(DelayedTasks* delayed_tasks) { | 291 bool TaskQueue::ProcessQueuedMessages(DelayedTasks* delayed_tasks, |
| 292 std::vector<MultimediaTimer>* timers) { |
151 MSG msg = {}; | 293 MSG msg = {}; |
152 while (PeekMessage(&msg, nullptr, 0, 0, PM_REMOVE) && | 294 while (::PeekMessage(&msg, nullptr, 0, 0, PM_REMOVE) && |
153 msg.message != WM_QUIT) { | 295 msg.message != WM_QUIT) { |
154 if (!msg.hwnd) { | 296 if (!msg.hwnd) { |
155 switch (msg.message) { | 297 switch (msg.message) { |
156 case WM_RUN_TASK: { | 298 case WM_RUN_TASK: { |
157 QueuedTask* task = reinterpret_cast<QueuedTask*>(msg.lParam); | 299 QueuedTask* task = reinterpret_cast<QueuedTask*>(msg.lParam); |
158 if (task->Run()) | 300 if (task->Run()) |
159 delete task; | 301 delete task; |
160 break; | 302 break; |
161 } | 303 } |
162 case WM_QUEUE_DELAYED_TASK: { | 304 case WM_QUEUE_DELAYED_TASK: { |
163 QueuedTask* task = reinterpret_cast<QueuedTask*>(msg.lParam); | 305 std::unique_ptr<QueuedTask> task( |
| 306 reinterpret_cast<QueuedTask*>(msg.lParam)); |
164 uint32_t milliseconds = msg.wParam & 0xFFFFFFFF; | 307 uint32_t milliseconds = msg.wParam & 0xFFFFFFFF; |
165 #if defined(_WIN64) | 308 #if defined(_WIN64) |
166 // Subtract the time it took to queue the timer. | 309 // Subtract the time it took to queue the timer. |
167 const DWORD now = GetTickCount(); | 310 const DWORD now = GetTickCount(); |
168 DWORD post_time = now - (msg.wParam >> 32); | 311 DWORD post_time = now - (msg.wParam >> 32); |
169 milliseconds = | 312 milliseconds = |
170 post_time > milliseconds ? 0 : milliseconds - post_time; | 313 post_time > milliseconds ? 0 : milliseconds - post_time; |
171 #endif | 314 #endif |
172 UINT_PTR timer_id = SetTimer(nullptr, 0, milliseconds, nullptr); | 315 bool timer_queued = false; |
173 delayed_tasks->insert(std::make_pair(timer_id, task)); | 316 if (timers->size() < MultimediaTimer::kMaxTimers) { |
| 317 MultimediaTimer* timer = nullptr; |
| 318 auto available = std::find_if( |
| 319 timers->begin(), timers->end(), |
| 320 [](const MultimediaTimer& t) { return !t.is_active(); }); |
| 321 if (available != timers->end()) { |
| 322 timer = &(*available); |
| 323 } else { |
| 324 timers->emplace_back(); |
| 325 timer = &timers->back(); |
| 326 } |
| 327 |
| 328 timer_queued = |
| 329 timer->StartOneShotTimer(std::move(task), milliseconds); |
| 330 if (!timer_queued) { |
| 331 // No more multimedia timers can be queued. |
| 332 // Detach the task and fall back on SetTimer. |
| 333 task = timer->Cancel(); |
| 334 } |
| 335 } |
| 336 |
| 337 // When we fail to use multimedia timers, we fall back on the more |
| 338 // coarse SetTimer/WM_TIMER approach. |
| 339 if (!timer_queued) { |
| 340 UINT_PTR timer_id = ::SetTimer(nullptr, 0, milliseconds, nullptr); |
| 341 delayed_tasks->insert(std::make_pair(timer_id, task.release())); |
| 342 } |
174 break; | 343 break; |
175 } | 344 } |
176 case WM_TIMER: { | 345 case WM_TIMER: { |
177 KillTimer(nullptr, msg.wParam); | 346 ::KillTimer(nullptr, msg.wParam); |
178 auto found = delayed_tasks->find(msg.wParam); | 347 auto found = delayed_tasks->find(msg.wParam); |
179 RTC_DCHECK(found != delayed_tasks->end()); | 348 RTC_DCHECK(found != delayed_tasks->end()); |
180 if (!found->second->Run()) | 349 if (!found->second->Run()) |
181 found->second.release(); | 350 found->second.release(); |
182 delayed_tasks->erase(found); | 351 delayed_tasks->erase(found); |
183 break; | 352 break; |
184 } | 353 } |
185 default: | 354 default: |
186 RTC_NOTREACHED(); | 355 RTC_NOTREACHED(); |
187 break; | 356 break; |
188 } | 357 } |
189 } else { | 358 } else { |
190 TranslateMessage(&msg); | 359 ::TranslateMessage(&msg); |
191 DispatchMessage(&msg); | 360 ::DispatchMessage(&msg); |
192 } | 361 } |
193 } | 362 } |
194 return msg.message != WM_QUIT; | 363 return msg.message != WM_QUIT; |
195 } | 364 } |
196 | 365 |
197 } // namespace rtc | 366 } // namespace rtc |
OLD | NEW |