Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(242)

Side by Side Diff: webrtc/base/task_queue_win.cc

Issue 2691973002: Add support for multimedia timers to TaskQueue on Windows. (Closed)
Patch Set: Add documentation, WAIT_OBJECT_0 and remove LOG(WARNING) Created 3 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « webrtc/base/task_queue_unittest.cc ('k') | no next file » | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 /* 1 /*
2 * Copyright 2016 The WebRTC Project Authors. All rights reserved. 2 * Copyright 2016 The WebRTC Project Authors. All rights reserved.
3 * 3 *
4 * Use of this source code is governed by a BSD-style license 4 * Use of this source code is governed by a BSD-style license
5 * that can be found in the LICENSE file in the root of the source 5 * that can be found in the LICENSE file in the root of the source
6 * tree. An additional intellectual property rights grant can be found 6 * tree. An additional intellectual property rights grant can be found
7 * in the file PATENTS. All contributing project authors may 7 * in the file PATENTS. All contributing project authors may
8 * be found in the AUTHORS file in the root of the source tree. 8 * be found in the AUTHORS file in the root of the source tree.
9 */ 9 */
10 10
11 #include "webrtc/base/task_queue.h" 11 #include "webrtc/base/task_queue.h"
12 12
13 #include <mmsystem.h>
13 #include <string.h> 14 #include <string.h>
14 15
16 #include <algorithm>
17
18 #include "webrtc/base/arraysize.h"
15 #include "webrtc/base/checks.h" 19 #include "webrtc/base/checks.h"
16 #include "webrtc/base/logging.h" 20 #include "webrtc/base/logging.h"
17 21
18 namespace rtc { 22 namespace rtc {
19 namespace { 23 namespace {
20 #define WM_RUN_TASK WM_USER + 1 24 #define WM_RUN_TASK WM_USER + 1
21 #define WM_QUEUE_DELAYED_TASK WM_USER + 2 25 #define WM_QUEUE_DELAYED_TASK WM_USER + 2
22 26
23 DWORD g_queue_ptr_tls = 0; 27 DWORD g_queue_ptr_tls = 0;
24 28
25 BOOL CALLBACK InitializeTls(PINIT_ONCE init_once, void* param, void** context) { 29 BOOL CALLBACK InitializeTls(PINIT_ONCE init_once, void* param, void** context) {
26 g_queue_ptr_tls = TlsAlloc(); 30 g_queue_ptr_tls = TlsAlloc();
27 return TRUE; 31 return TRUE;
28 } 32 }
29 33
30 DWORD GetQueuePtrTls() { 34 DWORD GetQueuePtrTls() {
31 static INIT_ONCE init_once = INIT_ONCE_STATIC_INIT; 35 static INIT_ONCE init_once = INIT_ONCE_STATIC_INIT;
32 InitOnceExecuteOnce(&init_once, InitializeTls, nullptr, nullptr); 36 ::InitOnceExecuteOnce(&init_once, InitializeTls, nullptr, nullptr);
33 return g_queue_ptr_tls; 37 return g_queue_ptr_tls;
34 } 38 }
35 39
36 struct ThreadStartupData { 40 struct ThreadStartupData {
37 Event* started; 41 Event* started;
38 void* thread_context; 42 void* thread_context;
39 }; 43 };
40 44
41 void CALLBACK InitializeQueueThread(ULONG_PTR param) { 45 void CALLBACK InitializeQueueThread(ULONG_PTR param) {
42 MSG msg; 46 MSG msg;
43 PeekMessage(&msg, NULL, WM_USER, WM_USER, PM_NOREMOVE); 47 ::PeekMessage(&msg, nullptr, WM_USER, WM_USER, PM_NOREMOVE);
44 ThreadStartupData* data = reinterpret_cast<ThreadStartupData*>(param); 48 ThreadStartupData* data = reinterpret_cast<ThreadStartupData*>(param);
45 TlsSetValue(GetQueuePtrTls(), data->thread_context); 49 ::TlsSetValue(GetQueuePtrTls(), data->thread_context);
46 data->started->Set(); 50 data->started->Set();
47 } 51 }
48 } // namespace 52 } // namespace
49 53
54 class TaskQueue::MultimediaTimer {
55 public:
56 // kMaxTimers defines the limit of how many MultimediaTimer instances should
57 // be created.
58 // Background: The maximum number of supported handles for Wait functions, is
59 // MAXIMUM_WAIT_OBJECTS - 1 (63).
60 // There are some ways to work around the limitation but as it turns out, the
61 // limit of concurrently active multimedia timers per process, is much lower,
62 // or 16. So there isn't much value in going to the lenghts required to
63 // overcome the Wait limitations.
64 // kMaxTimers is larger than 16 though since it is possible that 'complete' or
65 // signaled timers that haven't been handled, are counted as part of
66 // kMaxTimers and thus a multimedia timer can actually be queued even though
67 // as far as we're concerned, there are more than 16 that are pending.
68 static const int kMaxTimers = MAXIMUM_WAIT_OBJECTS - 1;
69
70 // Controls how many MultimediaTimer instances a queue can hold before
71 // attempting to garbage collect (GC) timers that aren't in use.
72 static const int kInstanceThresholdGC = 8;
73
74 MultimediaTimer() : event_(::CreateEvent(nullptr, false, false, nullptr)) {}
75
76 MultimediaTimer(MultimediaTimer&& timer)
77 : event_(timer.event_),
78 timer_id_(timer.timer_id_),
79 task_(std::move(timer.task_)) {
80 RTC_DCHECK(event_);
81 timer.event_ = nullptr;
82 timer.timer_id_ = 0;
83 }
84
85 ~MultimediaTimer() { Close(); }
86
87 // Implementing this operator is required because of the way
88 // some stl algorithms work, such as std::rotate().
89 MultimediaTimer& operator=(MultimediaTimer&& timer) {
90 if (this != &timer) {
91 Close();
92 event_ = timer.event_;
93 timer.event_ = nullptr;
94 task_ = std::move(timer.task_);
95 timer_id_ = timer.timer_id_;
96 timer.timer_id_ = 0;
97 }
98 return *this;
99 }
100
101 bool StartOneShotTimer(std::unique_ptr<QueuedTask> task, UINT delay_ms) {
102 RTC_DCHECK_EQ(0, timer_id_);
103 RTC_DCHECK(event_ != nullptr);
104 RTC_DCHECK(!task_.get());
105 RTC_DCHECK(task.get());
106 task_ = std::move(task);
107 timer_id_ =
108 ::timeSetEvent(delay_ms, 0, reinterpret_cast<LPTIMECALLBACK>(event_), 0,
109 TIME_ONESHOT | TIME_CALLBACK_EVENT_SET);
110 return timer_id_ != 0;
111 }
112
113 std::unique_ptr<QueuedTask> Cancel() {
114 if (timer_id_) {
115 ::timeKillEvent(timer_id_);
116 timer_id_ = 0;
117 }
118 return std::move(task_);
119 }
120
121 void OnEventSignaled() {
122 RTC_DCHECK_NE(0, timer_id_);
123 timer_id_ = 0;
124 task_->Run() ? task_.reset() : static_cast<void>(task_.release());
125 }
126
127 HANDLE event() const { return event_; }
128
129 bool is_active() const { return timer_id_ != 0; }
130
131 private:
132 void Close() {
133 Cancel();
134
135 if (event_) {
136 ::CloseHandle(event_);
137 event_ = nullptr;
138 }
139 }
140
141 HANDLE event_ = nullptr;
142 MMRESULT timer_id_ = 0;
143 std::unique_ptr<QueuedTask> task_;
144
145 RTC_DISALLOW_COPY_AND_ASSIGN(MultimediaTimer);
146 };
147
50 TaskQueue::TaskQueue(const char* queue_name) 148 TaskQueue::TaskQueue(const char* queue_name)
51 : thread_(&TaskQueue::ThreadMain, this, queue_name) { 149 : thread_(&TaskQueue::ThreadMain, this, queue_name) {
52 RTC_DCHECK(queue_name); 150 RTC_DCHECK(queue_name);
53 thread_.Start(); 151 thread_.Start();
54 Event event(false, false); 152 Event event(false, false);
55 ThreadStartupData startup = {&event, this}; 153 ThreadStartupData startup = {&event, this};
56 RTC_CHECK(thread_.QueueAPC(&InitializeQueueThread, 154 RTC_CHECK(thread_.QueueAPC(&InitializeQueueThread,
57 reinterpret_cast<ULONG_PTR>(&startup))); 155 reinterpret_cast<ULONG_PTR>(&startup)));
58 event.Wait(Event::kForever); 156 event.Wait(Event::kForever);
59 } 157 }
60 158
61 TaskQueue::~TaskQueue() { 159 TaskQueue::~TaskQueue() {
62 RTC_DCHECK(!IsCurrent()); 160 RTC_DCHECK(!IsCurrent());
63 while (!PostThreadMessage(thread_.GetThreadRef(), WM_QUIT, 0, 0)) { 161 while (!::PostThreadMessage(thread_.GetThreadRef(), WM_QUIT, 0, 0)) {
64 RTC_CHECK_EQ(ERROR_NOT_ENOUGH_QUOTA, ::GetLastError()); 162 RTC_CHECK_EQ(ERROR_NOT_ENOUGH_QUOTA, ::GetLastError());
65 Sleep(1); 163 Sleep(1);
66 } 164 }
67 thread_.Stop(); 165 thread_.Stop();
68 } 166 }
69 167
70 // static 168 // static
71 TaskQueue* TaskQueue::Current() { 169 TaskQueue* TaskQueue::Current() {
72 return static_cast<TaskQueue*>(TlsGetValue(GetQueuePtrTls())); 170 return static_cast<TaskQueue*>(::TlsGetValue(GetQueuePtrTls()));
73 } 171 }
74 172
75 // static 173 // static
76 bool TaskQueue::IsCurrent(const char* queue_name) { 174 bool TaskQueue::IsCurrent(const char* queue_name) {
77 TaskQueue* current = Current(); 175 TaskQueue* current = Current();
78 return current && current->thread_.name().compare(queue_name) == 0; 176 return current && current->thread_.name().compare(queue_name) == 0;
79 } 177 }
80 178
81 bool TaskQueue::IsCurrent() const { 179 bool TaskQueue::IsCurrent() const {
82 return IsThreadRefEqual(thread_.GetThreadRef(), CurrentThreadRef()); 180 return IsThreadRefEqual(thread_.GetThreadRef(), CurrentThreadRef());
83 } 181 }
84 182
85 void TaskQueue::PostTask(std::unique_ptr<QueuedTask> task) { 183 void TaskQueue::PostTask(std::unique_ptr<QueuedTask> task) {
86 if (PostThreadMessage(thread_.GetThreadRef(), WM_RUN_TASK, 0, 184 if (::PostThreadMessage(thread_.GetThreadRef(), WM_RUN_TASK, 0,
87 reinterpret_cast<LPARAM>(task.get()))) { 185 reinterpret_cast<LPARAM>(task.get()))) {
88 task.release(); 186 task.release();
89 } 187 }
90 } 188 }
91 189
92 void TaskQueue::PostDelayedTask(std::unique_ptr<QueuedTask> task, 190 void TaskQueue::PostDelayedTask(std::unique_ptr<QueuedTask> task,
93 uint32_t milliseconds) { 191 uint32_t milliseconds) {
94 WPARAM wparam; 192 WPARAM wparam;
95 #if defined(_WIN64) 193 #if defined(_WIN64)
96 // GetTickCount() returns a fairly coarse tick count (resolution or about 8ms) 194 // GetTickCount() returns a fairly coarse tick count (resolution or about 8ms)
97 // so this compensation isn't that accurate, but since we have unused 32 bits 195 // so this compensation isn't that accurate, but since we have unused 32 bits
98 // on Win64, we might as well use them. 196 // on Win64, we might as well use them.
99 wparam = (static_cast<WPARAM>(::GetTickCount()) << 32) | milliseconds; 197 wparam = (static_cast<WPARAM>(::GetTickCount()) << 32) | milliseconds;
100 #else 198 #else
101 wparam = milliseconds; 199 wparam = milliseconds;
102 #endif 200 #endif
103 if (PostThreadMessage(thread_.GetThreadRef(), WM_QUEUE_DELAYED_TASK, wparam, 201 if (::PostThreadMessage(thread_.GetThreadRef(), WM_QUEUE_DELAYED_TASK, wparam,
104 reinterpret_cast<LPARAM>(task.get()))) { 202 reinterpret_cast<LPARAM>(task.get()))) {
105 task.release(); 203 task.release();
106 } 204 }
107 } 205 }
108 206
109 void TaskQueue::PostTaskAndReply(std::unique_ptr<QueuedTask> task, 207 void TaskQueue::PostTaskAndReply(std::unique_ptr<QueuedTask> task,
110 std::unique_ptr<QueuedTask> reply, 208 std::unique_ptr<QueuedTask> reply,
111 TaskQueue* reply_queue) { 209 TaskQueue* reply_queue) {
112 QueuedTask* task_ptr = task.release(); 210 QueuedTask* task_ptr = task.release();
113 QueuedTask* reply_task_ptr = reply.release(); 211 QueuedTask* reply_task_ptr = reply.release();
114 DWORD reply_thread_id = reply_queue->thread_.GetThreadRef(); 212 DWORD reply_thread_id = reply_queue->thread_.GetThreadRef();
115 PostTask([task_ptr, reply_task_ptr, reply_thread_id]() { 213 PostTask([task_ptr, reply_task_ptr, reply_thread_id]() {
116 if (task_ptr->Run()) 214 if (task_ptr->Run())
117 delete task_ptr; 215 delete task_ptr;
118 // If the thread's message queue is full, we can't queue the task and will 216 // If the thread's message queue is full, we can't queue the task and will
119 // have to drop it (i.e. delete). 217 // have to drop it (i.e. delete).
120 if (!PostThreadMessage(reply_thread_id, WM_RUN_TASK, 0, 218 if (!::PostThreadMessage(reply_thread_id, WM_RUN_TASK, 0,
121 reinterpret_cast<LPARAM>(reply_task_ptr))) { 219 reinterpret_cast<LPARAM>(reply_task_ptr))) {
122 delete reply_task_ptr; 220 delete reply_task_ptr;
123 } 221 }
124 }); 222 });
125 } 223 }
126 224
127 void TaskQueue::PostTaskAndReply(std::unique_ptr<QueuedTask> task, 225 void TaskQueue::PostTaskAndReply(std::unique_ptr<QueuedTask> task,
128 std::unique_ptr<QueuedTask> reply) { 226 std::unique_ptr<QueuedTask> reply) {
129 return PostTaskAndReply(std::move(task), std::move(reply), Current()); 227 return PostTaskAndReply(std::move(task), std::move(reply), Current());
130 } 228 }
131 229
132 // static 230 // static
133 bool TaskQueue::ThreadMain(void* context) { 231 bool TaskQueue::ThreadMain(void* context) {
232 HANDLE timer_handles[MultimediaTimer::kMaxTimers];
233 // Active multimedia timers.
234 std::vector<MultimediaTimer> mm_timers;
235 // Tasks that have been queued by using SetTimer/WM_TIMER.
134 DelayedTasks delayed_tasks; 236 DelayedTasks delayed_tasks;
237
135 while (true) { 238 while (true) {
136 DWORD result = ::MsgWaitForMultipleObjectsEx(0, nullptr, INFINITE, 239 RTC_DCHECK(mm_timers.size() <= arraysize(timer_handles));
240 DWORD count = 0;
241 for (const auto& t : mm_timers) {
242 if (!t.is_active())
243 break;
244 timer_handles[count++] = t.event();
245 }
246 // Make sure we do an alertable wait as that's required to allow APCs to run
247 // (e.g. required for InitializeQueueThread and stopping the thread in
248 // PlatformThread).
249 DWORD result = ::MsgWaitForMultipleObjectsEx(count, timer_handles, INFINITE,
137 QS_ALLEVENTS, MWMO_ALERTABLE); 250 QS_ALLEVENTS, MWMO_ALERTABLE);
138 RTC_CHECK_NE(WAIT_FAILED, result); 251 RTC_CHECK_NE(WAIT_FAILED, result);
139 if (result == WAIT_OBJECT_0) { 252 // If we're not waiting for any timers, then count will be equal to
140 if (!ProcessQueuedMessages(&delayed_tasks)) 253 // WAIT_OBJECT_0. If we're waiting for timers, then |count| represents
254 // "One more than the number of timers", which means that there's a
255 // message in the queue that needs to be handled.
256 // If |result| is less than |count|, then its value will be the index of the
257 // timer that has been signaled.
258 if (result == (WAIT_OBJECT_0 + count)) {
259 if (!ProcessQueuedMessages(&delayed_tasks, &mm_timers))
141 break; 260 break;
261 } else if (result < (WAIT_OBJECT_0 + count)) {
262 mm_timers[result].OnEventSignaled();
263 RTC_DCHECK(!mm_timers[result].is_active());
264 // Reuse timer events by moving inactive timers to the back of the vector.
265 // When new delayed tasks are queued, they'll get reused.
266 if (mm_timers.size() > 1) {
267 auto it = mm_timers.begin() + result;
268 std::rotate(it, it + 1, mm_timers.end());
269 }
270
271 // Collect some garbage.
272 if (mm_timers.size() > MultimediaTimer::kInstanceThresholdGC) {
273 const auto inactive = std::find_if(
274 mm_timers.begin(), mm_timers.end(),
275 [](const MultimediaTimer& t) { return !t.is_active(); });
276 if (inactive != mm_timers.end()) {
277 // Since inactive timers are always moved to the back, we can
278 // safely delete all timers following the first inactive one.
279 mm_timers.erase(inactive, mm_timers.end());
280 }
281 }
142 } else { 282 } else {
143 RTC_DCHECK_EQ(WAIT_IO_COMPLETION, result); 283 RTC_DCHECK_EQ(WAIT_IO_COMPLETION, result);
144 } 284 }
145 } 285 }
286
146 return false; 287 return false;
147 } 288 }
148 289
149 // static 290 // static
150 bool TaskQueue::ProcessQueuedMessages(DelayedTasks* delayed_tasks) { 291 bool TaskQueue::ProcessQueuedMessages(DelayedTasks* delayed_tasks,
292 std::vector<MultimediaTimer>* timers) {
151 MSG msg = {}; 293 MSG msg = {};
152 while (PeekMessage(&msg, nullptr, 0, 0, PM_REMOVE) && 294 while (::PeekMessage(&msg, nullptr, 0, 0, PM_REMOVE) &&
153 msg.message != WM_QUIT) { 295 msg.message != WM_QUIT) {
154 if (!msg.hwnd) { 296 if (!msg.hwnd) {
155 switch (msg.message) { 297 switch (msg.message) {
156 case WM_RUN_TASK: { 298 case WM_RUN_TASK: {
157 QueuedTask* task = reinterpret_cast<QueuedTask*>(msg.lParam); 299 QueuedTask* task = reinterpret_cast<QueuedTask*>(msg.lParam);
158 if (task->Run()) 300 if (task->Run())
159 delete task; 301 delete task;
160 break; 302 break;
161 } 303 }
162 case WM_QUEUE_DELAYED_TASK: { 304 case WM_QUEUE_DELAYED_TASK: {
163 QueuedTask* task = reinterpret_cast<QueuedTask*>(msg.lParam); 305 std::unique_ptr<QueuedTask> task(
306 reinterpret_cast<QueuedTask*>(msg.lParam));
164 uint32_t milliseconds = msg.wParam & 0xFFFFFFFF; 307 uint32_t milliseconds = msg.wParam & 0xFFFFFFFF;
165 #if defined(_WIN64) 308 #if defined(_WIN64)
166 // Subtract the time it took to queue the timer. 309 // Subtract the time it took to queue the timer.
167 const DWORD now = GetTickCount(); 310 const DWORD now = GetTickCount();
168 DWORD post_time = now - (msg.wParam >> 32); 311 DWORD post_time = now - (msg.wParam >> 32);
169 milliseconds = 312 milliseconds =
170 post_time > milliseconds ? 0 : milliseconds - post_time; 313 post_time > milliseconds ? 0 : milliseconds - post_time;
171 #endif 314 #endif
172 UINT_PTR timer_id = SetTimer(nullptr, 0, milliseconds, nullptr); 315 bool timer_queued = false;
173 delayed_tasks->insert(std::make_pair(timer_id, task)); 316 if (timers->size() < MultimediaTimer::kMaxTimers) {
317 MultimediaTimer* timer = nullptr;
318 auto available = std::find_if(
319 timers->begin(), timers->end(),
320 [](const MultimediaTimer& t) { return !t.is_active(); });
321 if (available != timers->end()) {
322 timer = &(*available);
323 } else {
324 timers->emplace_back();
325 timer = &timers->back();
326 }
327
328 timer_queued =
329 timer->StartOneShotTimer(std::move(task), milliseconds);
330 if (!timer_queued) {
331 // No more multimedia timers can be queued.
332 // Detach the task and fall back on SetTimer.
333 task = timer->Cancel();
334 }
335 }
336
337 // When we fail to use multimedia timers, we fall back on the more
338 // coarse SetTimer/WM_TIMER approach.
339 if (!timer_queued) {
340 UINT_PTR timer_id = ::SetTimer(nullptr, 0, milliseconds, nullptr);
341 delayed_tasks->insert(std::make_pair(timer_id, task.release()));
342 }
174 break; 343 break;
175 } 344 }
176 case WM_TIMER: { 345 case WM_TIMER: {
177 KillTimer(nullptr, msg.wParam); 346 ::KillTimer(nullptr, msg.wParam);
178 auto found = delayed_tasks->find(msg.wParam); 347 auto found = delayed_tasks->find(msg.wParam);
179 RTC_DCHECK(found != delayed_tasks->end()); 348 RTC_DCHECK(found != delayed_tasks->end());
180 if (!found->second->Run()) 349 if (!found->second->Run())
181 found->second.release(); 350 found->second.release();
182 delayed_tasks->erase(found); 351 delayed_tasks->erase(found);
183 break; 352 break;
184 } 353 }
185 default: 354 default:
186 RTC_NOTREACHED(); 355 RTC_NOTREACHED();
187 break; 356 break;
188 } 357 }
189 } else { 358 } else {
190 TranslateMessage(&msg); 359 ::TranslateMessage(&msg);
191 DispatchMessage(&msg); 360 ::DispatchMessage(&msg);
192 } 361 }
193 } 362 }
194 return msg.message != WM_QUIT; 363 return msg.message != WM_QUIT;
195 } 364 }
196 365
197 } // namespace rtc 366 } // namespace rtc
OLDNEW
« no previous file with comments | « webrtc/base/task_queue_unittest.cc ('k') | no next file » | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698