Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(196)

Side by Side Diff: webrtc/rtc_base/task_queue_win.cc

Issue 3009133002: Convert windows TaskQueue implementation to pimpl convention. (Closed)
Patch Set: Mark more of implementation private. Created 3 years, 3 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « webrtc/rtc_base/task_queue.h ('k') | no next file » | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 /* 1 /*
2 * Copyright 2016 The WebRTC Project Authors. All rights reserved. 2 * Copyright 2016 The WebRTC Project Authors. All rights reserved.
3 * 3 *
4 * Use of this source code is governed by a BSD-style license 4 * Use of this source code is governed by a BSD-style license
5 * that can be found in the LICENSE file in the root of the source 5 * that can be found in the LICENSE file in the root of the source
6 * tree. An additional intellectual property rights grant can be found 6 * tree. An additional intellectual property rights grant can be found
7 * in the file PATENTS. All contributing project authors may 7 * in the file PATENTS. All contributing project authors may
8 * be found in the AUTHORS file in the root of the source tree. 8 * be found in the AUTHORS file in the root of the source tree.
9 */ 9 */
10 10
11 #include "webrtc/rtc_base/task_queue.h" 11 #include "webrtc/rtc_base/task_queue.h"
12 12
13 #include <mmsystem.h> 13 #include <mmsystem.h>
14 #include <string.h> 14 #include <string.h>
15 15
16 #include <algorithm> 16 #include <algorithm>
17 #include <queue> 17 #include <queue>
18 18
19 #include "webrtc/rtc_base/arraysize.h" 19 #include "webrtc/rtc_base/arraysize.h"
20 #include "webrtc/rtc_base/checks.h" 20 #include "webrtc/rtc_base/checks.h"
21 #include "webrtc/rtc_base/event.h"
21 #include "webrtc/rtc_base/logging.h" 22 #include "webrtc/rtc_base/logging.h"
23 #include "webrtc/rtc_base/platform_thread.h"
24 #include "webrtc/rtc_base/refcount.h"
25 #include "webrtc/rtc_base/refcountedobject.h"
22 #include "webrtc/rtc_base/safe_conversions.h" 26 #include "webrtc/rtc_base/safe_conversions.h"
23 #include "webrtc/rtc_base/timeutils.h" 27 #include "webrtc/rtc_base/timeutils.h"
24 28
25 namespace rtc { 29 namespace rtc {
26 namespace { 30 namespace {
27 #define WM_RUN_TASK WM_USER + 1 31 #define WM_RUN_TASK WM_USER + 1
28 #define WM_QUEUE_DELAYED_TASK WM_USER + 2 32 #define WM_QUEUE_DELAYED_TASK WM_USER + 2
29 33
30 using Priority = TaskQueue::Priority; 34 using Priority = TaskQueue::Priority;
31 35
(...skipping 115 matching lines...) Expand 10 before | Expand all | Expand 10 after
147 151
148 private: 152 private:
149 HANDLE event_ = nullptr; 153 HANDLE event_ = nullptr;
150 MMRESULT timer_id_ = 0; 154 MMRESULT timer_id_ = 0;
151 155
152 RTC_DISALLOW_COPY_AND_ASSIGN(MultimediaTimer); 156 RTC_DISALLOW_COPY_AND_ASSIGN(MultimediaTimer);
153 }; 157 };
154 158
155 } // namespace 159 } // namespace
156 160
157 class TaskQueue::ThreadState { 161 class TaskQueue::Impl : public RefCountInterface {
158 public: 162 public:
159 explicit ThreadState(HANDLE in_queue) : in_queue_(in_queue) {} 163 Impl(const char* queue_name, TaskQueue* queue, Priority priority);
160 ~ThreadState() {} 164 ~Impl() override;
161 165
162 void RunThreadMain(); 166 static TaskQueue::Impl* Current();
167 static TaskQueue* CurrentQueue();
168
169 // Used for DCHECKing the current queue.
170 bool IsCurrent() const;
171
172 template <class Closure,
173 typename std::enable_if<
174 std::is_copy_constructible<Closure>::value>::type* = nullptr>
175 void PostTask(const Closure& closure) {
176 PostTask(std::unique_ptr<QueuedTask>(new ClosureTask<Closure>(closure)));
177 }
178
179 void PostTask(std::unique_ptr<QueuedTask> task);
180 void PostTaskAndReply(std::unique_ptr<QueuedTask> task,
181 std::unique_ptr<QueuedTask> reply,
182 TaskQueue::Impl* reply_queue);
183
184 void PostDelayedTask(std::unique_ptr<QueuedTask> task, uint32_t milliseconds);
185
186 void RunPendingTasks();
163 187
164 private: 188 private:
165 bool ProcessQueuedMessages(); 189 static void ThreadMain(void* context);
166 void RunDueTasks();
167 void ScheduleNextTimer();
168 void CancelTimers();
169 190
170 // Since priority_queue<> by defult orders items in terms of 191 class WorkerThread : public PlatformThread {
171 // largest->smallest, using std::less<>, and we want smallest->largest, 192 public:
172 // we would like to use std::greater<> here. Alas it's only available in 193 WorkerThread(ThreadRunFunction func,
173 // C++14 and later, so we roll our own compare template that that relies on 194 void* obj,
174 // operator<(). 195 const char* thread_name,
175 template <typename T> 196 ThreadPriority priority)
176 struct greater { 197 : PlatformThread(func, obj, thread_name, priority) {}
177 bool operator()(const T& l, const T& r) { return l > r; } 198
199 bool QueueAPC(PAPCFUNC apc_function, ULONG_PTR data) {
200 return PlatformThread::QueueAPC(apc_function, data);
201 }
178 }; 202 };
179 203
180 MultimediaTimer timer_; 204 class ThreadState {
181 std::priority_queue<DelayedTaskInfo, 205 public:
182 std::vector<DelayedTaskInfo>, 206 explicit ThreadState(HANDLE in_queue) : in_queue_(in_queue) {}
183 greater<DelayedTaskInfo>> 207 ~ThreadState() {}
184 timer_tasks_; 208
185 UINT_PTR timer_id_ = 0; 209 void RunThreadMain();
210
211 private:
212 bool ProcessQueuedMessages();
213 void RunDueTasks();
214 void ScheduleNextTimer();
215 void CancelTimers();
216
217 // Since priority_queue<> by defult orders items in terms of
218 // largest->smallest, using std::less<>, and we want smallest->largest,
219 // we would like to use std::greater<> here. Alas it's only available in
220 // C++14 and later, so we roll our own compare template that that relies on
221 // operator<().
222 template <typename T>
223 struct greater {
224 bool operator()(const T& l, const T& r) { return l > r; }
225 };
226
227 MultimediaTimer timer_;
228 std::priority_queue<DelayedTaskInfo,
229 std::vector<DelayedTaskInfo>,
230 greater<DelayedTaskInfo>>
231 timer_tasks_;
232 UINT_PTR timer_id_ = 0;
233 HANDLE in_queue_;
234 };
235
236 TaskQueue* const queue_;
237 WorkerThread thread_;
238 rtc::CriticalSection pending_lock_;
239 std::queue<std::unique_ptr<QueuedTask>> pending_ GUARDED_BY(pending_lock_);
186 HANDLE in_queue_; 240 HANDLE in_queue_;
187 }; 241 };
188 242
189 TaskQueue::TaskQueue(const char* queue_name, Priority priority /*= NORMAL*/) 243 TaskQueue::Impl::Impl(const char* queue_name,
190 : thread_(&TaskQueue::ThreadMain, 244 TaskQueue* queue,
245 Priority priority)
246 : queue_(queue),
247 thread_(&TaskQueue::Impl::ThreadMain,
191 this, 248 this,
192 queue_name, 249 queue_name,
193 TaskQueuePriorityToThreadPriority(priority)), 250 TaskQueuePriorityToThreadPriority(priority)),
194 in_queue_(::CreateEvent(nullptr, true, false, nullptr)) { 251 in_queue_(::CreateEvent(nullptr, true, false, nullptr)) {
195 RTC_DCHECK(queue_name); 252 RTC_DCHECK(queue_name);
196 RTC_DCHECK(in_queue_); 253 RTC_DCHECK(in_queue_);
197 thread_.Start(); 254 thread_.Start();
198 Event event(false, false); 255 Event event(false, false);
199 ThreadStartupData startup = {&event, this}; 256 ThreadStartupData startup = {&event, this};
200 RTC_CHECK(thread_.QueueAPC(&InitializeQueueThread, 257 RTC_CHECK(thread_.QueueAPC(&InitializeQueueThread,
201 reinterpret_cast<ULONG_PTR>(&startup))); 258 reinterpret_cast<ULONG_PTR>(&startup)));
202 event.Wait(Event::kForever); 259 event.Wait(Event::kForever);
203 } 260 }
204 261
205 TaskQueue::~TaskQueue() { 262 TaskQueue::Impl::~Impl() {
206 RTC_DCHECK(!IsCurrent()); 263 RTC_DCHECK(!IsCurrent());
207 while (!::PostThreadMessage(thread_.GetThreadRef(), WM_QUIT, 0, 0)) { 264 while (!::PostThreadMessage(thread_.GetThreadRef(), WM_QUIT, 0, 0)) {
208 RTC_CHECK_EQ(ERROR_NOT_ENOUGH_QUOTA, ::GetLastError()); 265 RTC_CHECK_EQ(ERROR_NOT_ENOUGH_QUOTA, ::GetLastError());
209 Sleep(1); 266 Sleep(1);
210 } 267 }
211 thread_.Stop(); 268 thread_.Stop();
212 ::CloseHandle(in_queue_); 269 ::CloseHandle(in_queue_);
213 } 270 }
214 271
215 // static 272 // static
216 TaskQueue* TaskQueue::Current() { 273 TaskQueue::Impl* TaskQueue::Impl::Current() {
217 return static_cast<TaskQueue*>(::TlsGetValue(GetQueuePtrTls())); 274 return static_cast<TaskQueue::Impl*>(::TlsGetValue(GetQueuePtrTls()));
218 } 275 }
219 276
220 bool TaskQueue::IsCurrent() const { 277 // static
278 TaskQueue* TaskQueue::Impl::CurrentQueue() {
279 TaskQueue::Impl* current = Current();
280 return current ? current->queue_ : nullptr;
281 }
282
283 bool TaskQueue::Impl::IsCurrent() const {
221 return IsThreadRefEqual(thread_.GetThreadRef(), CurrentThreadRef()); 284 return IsThreadRefEqual(thread_.GetThreadRef(), CurrentThreadRef());
222 } 285 }
223 286
224 void TaskQueue::PostTask(std::unique_ptr<QueuedTask> task) { 287 void TaskQueue::Impl::PostTask(std::unique_ptr<QueuedTask> task) {
225 rtc::CritScope lock(&pending_lock_); 288 rtc::CritScope lock(&pending_lock_);
226 pending_.push(std::move(task)); 289 pending_.push(std::move(task));
227 ::SetEvent(in_queue_); 290 ::SetEvent(in_queue_);
228 } 291 }
229 292
230 void TaskQueue::PostDelayedTask(std::unique_ptr<QueuedTask> task, 293 void TaskQueue::Impl::PostDelayedTask(std::unique_ptr<QueuedTask> task,
231 uint32_t milliseconds) { 294 uint32_t milliseconds) {
232 if (!milliseconds) { 295 if (!milliseconds) {
233 PostTask(std::move(task)); 296 PostTask(std::move(task));
234 return; 297 return;
235 } 298 }
236 299
237 // TODO(tommi): Avoid this allocation. It is currently here since 300 // TODO(tommi): Avoid this allocation. It is currently here since
238 // the timestamp stored in the task info object, is a 64bit timestamp 301 // the timestamp stored in the task info object, is a 64bit timestamp
239 // and WPARAM is 32bits in 32bit builds. Otherwise, we could pass the 302 // and WPARAM is 32bits in 32bit builds. Otherwise, we could pass the
240 // task pointer and timestamp as LPARAM and WPARAM. 303 // task pointer and timestamp as LPARAM and WPARAM.
241 auto* task_info = new DelayedTaskInfo(milliseconds, std::move(task)); 304 auto* task_info = new DelayedTaskInfo(milliseconds, std::move(task));
242 if (!::PostThreadMessage(thread_.GetThreadRef(), WM_QUEUE_DELAYED_TASK, 0, 305 if (!::PostThreadMessage(thread_.GetThreadRef(), WM_QUEUE_DELAYED_TASK, 0,
243 reinterpret_cast<LPARAM>(task_info))) { 306 reinterpret_cast<LPARAM>(task_info))) {
244 delete task_info; 307 delete task_info;
245 } 308 }
246 } 309 }
247 310
248 void TaskQueue::PostTaskAndReply(std::unique_ptr<QueuedTask> task, 311 void TaskQueue::Impl::PostTaskAndReply(std::unique_ptr<QueuedTask> task,
249 std::unique_ptr<QueuedTask> reply, 312 std::unique_ptr<QueuedTask> reply,
250 TaskQueue* reply_queue) { 313 TaskQueue::Impl* reply_queue) {
251 QueuedTask* task_ptr = task.release(); 314 QueuedTask* task_ptr = task.release();
252 QueuedTask* reply_task_ptr = reply.release(); 315 QueuedTask* reply_task_ptr = reply.release();
253 DWORD reply_thread_id = reply_queue->thread_.GetThreadRef(); 316 DWORD reply_thread_id = reply_queue->thread_.GetThreadRef();
254 PostTask([task_ptr, reply_task_ptr, reply_thread_id]() { 317 PostTask([task_ptr, reply_task_ptr, reply_thread_id]() {
255 if (task_ptr->Run()) 318 if (task_ptr->Run())
256 delete task_ptr; 319 delete task_ptr;
257 // If the thread's message queue is full, we can't queue the task and will 320 // If the thread's message queue is full, we can't queue the task and will
258 // have to drop it (i.e. delete). 321 // have to drop it (i.e. delete).
259 if (!::PostThreadMessage(reply_thread_id, WM_RUN_TASK, 0, 322 if (!::PostThreadMessage(reply_thread_id, WM_RUN_TASK, 0,
260 reinterpret_cast<LPARAM>(reply_task_ptr))) { 323 reinterpret_cast<LPARAM>(reply_task_ptr))) {
261 delete reply_task_ptr; 324 delete reply_task_ptr;
262 } 325 }
263 }); 326 });
264 } 327 }
265 328
266 void TaskQueue::PostTaskAndReply(std::unique_ptr<QueuedTask> task, 329 void TaskQueue::Impl::RunPendingTasks() {
267 std::unique_ptr<QueuedTask> reply) {
268 return PostTaskAndReply(std::move(task), std::move(reply), Current());
269 }
270
271 void TaskQueue::RunPendingTasks() {
272 while (true) { 330 while (true) {
273 std::unique_ptr<QueuedTask> task; 331 std::unique_ptr<QueuedTask> task;
274 { 332 {
275 rtc::CritScope lock(&pending_lock_); 333 rtc::CritScope lock(&pending_lock_);
276 if (pending_.empty()) 334 if (pending_.empty())
277 break; 335 break;
278 task = std::move(pending_.front()); 336 task = std::move(pending_.front());
279 pending_.pop(); 337 pending_.pop();
280 } 338 }
281 339
282 if (!task->Run()) 340 if (!task->Run())
283 task.release(); 341 task.release();
284 } 342 }
285 } 343 }
286 344
287 // static 345 // static
288 void TaskQueue::ThreadMain(void* context) { 346 void TaskQueue::Impl::ThreadMain(void* context) {
289 ThreadState state(static_cast<TaskQueue*>(context)->in_queue_); 347 ThreadState state(static_cast<TaskQueue::Impl*>(context)->in_queue_);
290 state.RunThreadMain(); 348 state.RunThreadMain();
291 } 349 }
292 350
293 void TaskQueue::ThreadState::RunThreadMain() { 351 void TaskQueue::Impl::ThreadState::RunThreadMain() {
294 HANDLE handles[2] = { *timer_.event_for_wait(), in_queue_ }; 352 HANDLE handles[2] = { *timer_.event_for_wait(), in_queue_ };
295 while (true) { 353 while (true) {
296 // Make sure we do an alertable wait as that's required to allow APCs to run 354 // Make sure we do an alertable wait as that's required to allow APCs to run
297 // (e.g. required for InitializeQueueThread and stopping the thread in 355 // (e.g. required for InitializeQueueThread and stopping the thread in
298 // PlatformThread). 356 // PlatformThread).
299 DWORD result = ::MsgWaitForMultipleObjectsEx( 357 DWORD result = ::MsgWaitForMultipleObjectsEx(
300 arraysize(handles), handles, INFINITE, QS_ALLEVENTS, MWMO_ALERTABLE); 358 arraysize(handles), handles, INFINITE, QS_ALLEVENTS, MWMO_ALERTABLE);
301 RTC_CHECK_NE(WAIT_FAILED, result); 359 RTC_CHECK_NE(WAIT_FAILED, result);
302 if (result == (WAIT_OBJECT_0 + 2)) { 360 if (result == (WAIT_OBJECT_0 + 2)) {
303 // There are messages in the message queue that need to be handled. 361 // There are messages in the message queue that need to be handled.
304 if (!ProcessQueuedMessages()) 362 if (!ProcessQueuedMessages())
305 break; 363 break;
306 } 364 }
307 365
308 if (result == WAIT_OBJECT_0 || (!timer_tasks_.empty() && 366 if (result == WAIT_OBJECT_0 || (!timer_tasks_.empty() &&
309 ::WaitForSingleObject(*timer_.event_for_wait(), 0) == WAIT_OBJECT_0)) { 367 ::WaitForSingleObject(*timer_.event_for_wait(), 0) == WAIT_OBJECT_0)) {
310 // The multimedia timer was signaled. 368 // The multimedia timer was signaled.
311 timer_.Cancel(); 369 timer_.Cancel();
312 RunDueTasks(); 370 RunDueTasks();
313 ScheduleNextTimer(); 371 ScheduleNextTimer();
314 } 372 }
315 373
316 if (result == (WAIT_OBJECT_0 + 1)) { 374 if (result == (WAIT_OBJECT_0 + 1)) {
317 ::ResetEvent(in_queue_); 375 ::ResetEvent(in_queue_);
318 TaskQueue::Current()->RunPendingTasks(); 376 TaskQueue::Impl::Current()->RunPendingTasks();
319 } 377 }
320 } 378 }
321 } 379 }
322 380
323 bool TaskQueue::ThreadState::ProcessQueuedMessages() { 381 bool TaskQueue::Impl::ThreadState::ProcessQueuedMessages() {
324 MSG msg = {}; 382 MSG msg = {};
325 // To protect against overly busy message queues, we limit the time 383 // To protect against overly busy message queues, we limit the time
326 // we process tasks to a few milliseconds. If we don't do that, there's 384 // we process tasks to a few milliseconds. If we don't do that, there's
327 // a chance that timer tasks won't ever run. 385 // a chance that timer tasks won't ever run.
328 static const int kMaxTaskProcessingTimeMs = 500; 386 static const int kMaxTaskProcessingTimeMs = 500;
329 auto start = GetTick(); 387 auto start = GetTick();
330 while (::PeekMessage(&msg, nullptr, 0, 0, PM_REMOVE) && 388 while (::PeekMessage(&msg, nullptr, 0, 0, PM_REMOVE) &&
331 msg.message != WM_QUIT) { 389 msg.message != WM_QUIT) {
332 if (!msg.hwnd) { 390 if (!msg.hwnd) {
333 switch (msg.message) { 391 switch (msg.message) {
(...skipping 33 matching lines...) Expand 10 before | Expand all | Expand 10 after
367 ::TranslateMessage(&msg); 425 ::TranslateMessage(&msg);
368 ::DispatchMessage(&msg); 426 ::DispatchMessage(&msg);
369 } 427 }
370 428
371 if (GetTick() > start + kMaxTaskProcessingTimeMs) 429 if (GetTick() > start + kMaxTaskProcessingTimeMs)
372 break; 430 break;
373 } 431 }
374 return msg.message != WM_QUIT; 432 return msg.message != WM_QUIT;
375 } 433 }
376 434
377 void TaskQueue::ThreadState::RunDueTasks() { 435 void TaskQueue::Impl::ThreadState::RunDueTasks() {
378 RTC_DCHECK(!timer_tasks_.empty()); 436 RTC_DCHECK(!timer_tasks_.empty());
379 auto now = GetTick(); 437 auto now = GetTick();
380 do { 438 do {
381 const auto& top = timer_tasks_.top(); 439 const auto& top = timer_tasks_.top();
382 if (top.due_time() > now) 440 if (top.due_time() > now)
383 break; 441 break;
384 top.Run(); 442 top.Run();
385 timer_tasks_.pop(); 443 timer_tasks_.pop();
386 } while (!timer_tasks_.empty()); 444 } while (!timer_tasks_.empty());
387 } 445 }
388 446
389 void TaskQueue::ThreadState::ScheduleNextTimer() { 447 void TaskQueue::Impl::ThreadState::ScheduleNextTimer() {
390 RTC_DCHECK_EQ(timer_id_, 0); 448 RTC_DCHECK_EQ(timer_id_, 0);
391 if (timer_tasks_.empty()) 449 if (timer_tasks_.empty())
392 return; 450 return;
393 451
394 const auto& next_task = timer_tasks_.top(); 452 const auto& next_task = timer_tasks_.top();
395 int64_t delay_ms = std::max(0ll, next_task.due_time() - GetTick()); 453 int64_t delay_ms = std::max(0ll, next_task.due_time() - GetTick());
396 uint32_t milliseconds = rtc::dchecked_cast<uint32_t>(delay_ms); 454 uint32_t milliseconds = rtc::dchecked_cast<uint32_t>(delay_ms);
397 if (!timer_.StartOneShotTimer(milliseconds)) 455 if (!timer_.StartOneShotTimer(milliseconds))
398 timer_id_ = ::SetTimer(nullptr, 0, milliseconds, nullptr); 456 timer_id_ = ::SetTimer(nullptr, 0, milliseconds, nullptr);
399 } 457 }
400 458
401 void TaskQueue::ThreadState::CancelTimers() { 459 void TaskQueue::Impl::ThreadState::CancelTimers() {
402 timer_.Cancel(); 460 timer_.Cancel();
403 if (timer_id_) { 461 if (timer_id_) {
404 ::KillTimer(nullptr, timer_id_); 462 ::KillTimer(nullptr, timer_id_);
405 timer_id_ = 0; 463 timer_id_ = 0;
406 } 464 }
407 } 465 }
408 466
467 // Boilerplate for the PIMPL pattern.
468 TaskQueue::TaskQueue(const char* queue_name, Priority priority)
469 : impl_(new RefCountedObject<TaskQueue::Impl>(queue_name, this, priority)) {
470 }
471
472 TaskQueue::~TaskQueue() {}
473
474 // static
475 TaskQueue* TaskQueue::Current() {
476 return TaskQueue::Impl::CurrentQueue();
477 }
478
479 // Used for DCHECKing the current queue.
480 bool TaskQueue::IsCurrent() const {
481 return impl_->IsCurrent();
482 }
483
484 void TaskQueue::PostTask(std::unique_ptr<QueuedTask> task) {
485 return TaskQueue::impl_->PostTask(std::move(task));
486 }
487
488 void TaskQueue::PostTaskAndReply(std::unique_ptr<QueuedTask> task,
489 std::unique_ptr<QueuedTask> reply,
490 TaskQueue* reply_queue) {
491 return TaskQueue::impl_->PostTaskAndReply(std::move(task), std::move(reply),
492 reply_queue->impl_.get());
493 }
494
495 void TaskQueue::PostTaskAndReply(std::unique_ptr<QueuedTask> task,
496 std::unique_ptr<QueuedTask> reply) {
497 return TaskQueue::impl_->PostTaskAndReply(std::move(task), std::move(reply),
498 impl_.get());
499 }
500
501 void TaskQueue::PostDelayedTask(std::unique_ptr<QueuedTask> task,
502 uint32_t milliseconds) {
503 return TaskQueue::impl_->PostDelayedTask(std::move(task), milliseconds);
504 }
505
409 } // namespace rtc 506 } // namespace rtc
OLDNEW
« no previous file with comments | « webrtc/rtc_base/task_queue.h ('k') | no next file » | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698