Chromium Code Reviews| Index: webrtc/rtc_base/task_queue_win.cc |
| diff --git a/webrtc/rtc_base/task_queue_win.cc b/webrtc/rtc_base/task_queue_win.cc |
| index 9b4d1a2308cebf8348881f7c27f2a33252fa3c72..de4eab27e792fa6c22b01d376a8e80753e41b0ed 100644 |
| --- a/webrtc/rtc_base/task_queue_win.cc |
| +++ b/webrtc/rtc_base/task_queue_win.cc |
| @@ -18,7 +18,11 @@ |
| #include "webrtc/rtc_base/arraysize.h" |
| #include "webrtc/rtc_base/checks.h" |
| +#include "webrtc/rtc_base/event.h" |
| #include "webrtc/rtc_base/logging.h" |
| +#include "webrtc/rtc_base/platform_thread.h" |
| +#include "webrtc/rtc_base/refcount.h" |
| +#include "webrtc/rtc_base/refcountedobject.h" |
| #include "webrtc/rtc_base/safe_conversions.h" |
| #include "webrtc/rtc_base/timeutils.h" |
| @@ -154,44 +158,96 @@ class MultimediaTimer { |
| } // namespace |
| -class TaskQueue::ThreadState { |
| +class TaskQueue::Impl : public RefCountInterface { |
| public: |
| - explicit ThreadState(HANDLE in_queue) : in_queue_(in_queue) {} |
| - ~ThreadState() {} |
| + Impl(const char* queue_name, TaskQueue* queue, Priority priority); |
| + ~Impl() override; |
| - void RunThreadMain(); |
| + static TaskQueue::Impl* Current(); |
| + static TaskQueue* CurrentQueue(); |
| - private: |
| - bool ProcessQueuedMessages(); |
| - void RunDueTasks(); |
| - void ScheduleNextTimer(); |
| - void CancelTimers(); |
| - |
| - // Since priority_queue<> by defult orders items in terms of |
| - // largest->smallest, using std::less<>, and we want smallest->largest, |
| - // we would like to use std::greater<> here. Alas it's only available in |
| - // C++14 and later, so we roll our own compare template that that relies on |
| - // operator<(). |
| - template <typename T> |
| - struct greater { |
| - bool operator()(const T& l, const T& r) { return l > r; } |
| + // Used for DCHECKing the current queue. |
| + bool IsCurrent() const; |
| + |
| + template <class Closure, |
| + typename std::enable_if< |
| + std::is_copy_constructible<Closure>::value>::type* = nullptr> |
| + void PostTask(const Closure& closure) { |
|
perkj_webrtc
2017/09/05 13:30:18
This method is not used and not needed.
nisse-webrtc
2017/09/05 14:39:33
It's used in TaskQueue::Impl::PostTaskAndReply, fo
perkj_webrtc
2017/09/05 15:14:04
ok
|
| + PostTask(std::unique_ptr<QueuedTask>(new ClosureTask<Closure>(closure))); |
| + } |
| + |
| + void PostTask(std::unique_ptr<QueuedTask> task); |
| + void PostTaskAndReply(std::unique_ptr<QueuedTask> task, |
| + std::unique_ptr<QueuedTask> reply, |
| + TaskQueue::Impl* reply_queue); |
| + |
| + void PostDelayedTask(std::unique_ptr<QueuedTask> task, uint32_t milliseconds); |
| + |
| + void RunPendingTasks(); |
|
perkj_webrtc
2017/09/05 13:30:18
Everything from here can be made private.
nisse-webrtc
2017/09/05 14:39:33
Done.
|
| + static void ThreadMain(void* context); |
| + |
| + class WorkerThread : public PlatformThread { |
| + public: |
| + WorkerThread(ThreadRunFunction func, |
| + void* obj, |
| + const char* thread_name, |
| + ThreadPriority priority) |
| + : PlatformThread(func, obj, thread_name, priority) {} |
| + |
| + bool QueueAPC(PAPCFUNC apc_function, ULONG_PTR data) { |
| + return PlatformThread::QueueAPC(apc_function, data); |
| + } |
| + }; |
| + |
| + class ThreadState { |
| + public: |
| + explicit ThreadState(HANDLE in_queue) : in_queue_(in_queue) {} |
| + ~ThreadState() {} |
| + |
| + void RunThreadMain(); |
| + |
| + private: |
| + bool ProcessQueuedMessages(); |
| + void RunDueTasks(); |
| + void ScheduleNextTimer(); |
| + void CancelTimers(); |
| + |
| + // Since priority_queue<> by defult orders items in terms of |
| + // largest->smallest, using std::less<>, and we want smallest->largest, |
| + // we would like to use std::greater<> here. Alas it's only available in |
| + // C++14 and later, so we roll our own compare template that that relies on |
| + // operator<(). |
| + template <typename T> |
| + struct greater { |
| + bool operator()(const T& l, const T& r) { return l > r; } |
| + }; |
| + |
| + MultimediaTimer timer_; |
| + std::priority_queue<DelayedTaskInfo, |
| + std::vector<DelayedTaskInfo>, |
| + greater<DelayedTaskInfo>> |
| + timer_tasks_; |
| + UINT_PTR timer_id_ = 0; |
| + HANDLE in_queue_; |
| }; |
| - MultimediaTimer timer_; |
| - std::priority_queue<DelayedTaskInfo, |
| - std::vector<DelayedTaskInfo>, |
| - greater<DelayedTaskInfo>> |
| - timer_tasks_; |
| - UINT_PTR timer_id_ = 0; |
| + private: |
| + TaskQueue* const queue_; |
| + WorkerThread thread_; |
| + rtc::CriticalSection pending_lock_; |
| + std::queue<std::unique_ptr<QueuedTask>> pending_ GUARDED_BY(pending_lock_); |
| HANDLE in_queue_; |
| }; |
| -TaskQueue::TaskQueue(const char* queue_name, Priority priority /*= NORMAL*/) |
| - : thread_(&TaskQueue::ThreadMain, |
| +TaskQueue::Impl::Impl(const char* queue_name, |
| + TaskQueue* queue, |
| + Priority priority) |
| + : queue_(queue), |
| + thread_(&TaskQueue::Impl::ThreadMain, |
| this, |
| queue_name, |
| TaskQueuePriorityToThreadPriority(priority)), |
| - in_queue_(::CreateEvent(nullptr, true, false, nullptr)) { |
| + in_queue_(::CreateEvent(nullptr, true, false, nullptr)) { |
| RTC_DCHECK(queue_name); |
| RTC_DCHECK(in_queue_); |
| thread_.Start(); |
| @@ -202,7 +258,7 @@ TaskQueue::TaskQueue(const char* queue_name, Priority priority /*= NORMAL*/) |
| event.Wait(Event::kForever); |
| } |
| -TaskQueue::~TaskQueue() { |
| +TaskQueue::Impl::~Impl() { |
| RTC_DCHECK(!IsCurrent()); |
| while (!::PostThreadMessage(thread_.GetThreadRef(), WM_QUIT, 0, 0)) { |
| RTC_CHECK_EQ(ERROR_NOT_ENOUGH_QUOTA, ::GetLastError()); |
| @@ -213,22 +269,28 @@ TaskQueue::~TaskQueue() { |
| } |
| // static |
| -TaskQueue* TaskQueue::Current() { |
| - return static_cast<TaskQueue*>(::TlsGetValue(GetQueuePtrTls())); |
| +TaskQueue::Impl* TaskQueue::Impl::Current() { |
| + return static_cast<TaskQueue::Impl*>(::TlsGetValue(GetQueuePtrTls())); |
| } |
| -bool TaskQueue::IsCurrent() const { |
| +// static |
| +TaskQueue* TaskQueue::Impl::CurrentQueue() { |
| + TaskQueue::Impl* current = Current(); |
| + return current ? current->queue_ : nullptr; |
| +} |
| + |
| +bool TaskQueue::Impl::IsCurrent() const { |
| return IsThreadRefEqual(thread_.GetThreadRef(), CurrentThreadRef()); |
| } |
| -void TaskQueue::PostTask(std::unique_ptr<QueuedTask> task) { |
| +void TaskQueue::Impl::PostTask(std::unique_ptr<QueuedTask> task) { |
| rtc::CritScope lock(&pending_lock_); |
| pending_.push(std::move(task)); |
| ::SetEvent(in_queue_); |
| } |
| -void TaskQueue::PostDelayedTask(std::unique_ptr<QueuedTask> task, |
| - uint32_t milliseconds) { |
| +void TaskQueue::Impl::PostDelayedTask(std::unique_ptr<QueuedTask> task, |
| + uint32_t milliseconds) { |
| if (!milliseconds) { |
| PostTask(std::move(task)); |
| return; |
| @@ -245,9 +307,9 @@ void TaskQueue::PostDelayedTask(std::unique_ptr<QueuedTask> task, |
| } |
| } |
| -void TaskQueue::PostTaskAndReply(std::unique_ptr<QueuedTask> task, |
| - std::unique_ptr<QueuedTask> reply, |
| - TaskQueue* reply_queue) { |
| +void TaskQueue::Impl::PostTaskAndReply(std::unique_ptr<QueuedTask> task, |
| + std::unique_ptr<QueuedTask> reply, |
| + TaskQueue::Impl* reply_queue) { |
| QueuedTask* task_ptr = task.release(); |
| QueuedTask* reply_task_ptr = reply.release(); |
| DWORD reply_thread_id = reply_queue->thread_.GetThreadRef(); |
| @@ -263,12 +325,7 @@ void TaskQueue::PostTaskAndReply(std::unique_ptr<QueuedTask> task, |
| }); |
| } |
| -void TaskQueue::PostTaskAndReply(std::unique_ptr<QueuedTask> task, |
| - std::unique_ptr<QueuedTask> reply) { |
| - return PostTaskAndReply(std::move(task), std::move(reply), Current()); |
| -} |
| - |
| -void TaskQueue::RunPendingTasks() { |
| +void TaskQueue::Impl::RunPendingTasks() { |
| while (true) { |
| std::unique_ptr<QueuedTask> task; |
| { |
| @@ -285,12 +342,12 @@ void TaskQueue::RunPendingTasks() { |
| } |
| // static |
| -void TaskQueue::ThreadMain(void* context) { |
| - ThreadState state(static_cast<TaskQueue*>(context)->in_queue_); |
| +void TaskQueue::Impl::ThreadMain(void* context) { |
| + ThreadState state(static_cast<TaskQueue::Impl*>(context)->in_queue_); |
| state.RunThreadMain(); |
| } |
| -void TaskQueue::ThreadState::RunThreadMain() { |
| +void TaskQueue::Impl::ThreadState::RunThreadMain() { |
| HANDLE handles[2] = { *timer_.event_for_wait(), in_queue_ }; |
| while (true) { |
| // Make sure we do an alertable wait as that's required to allow APCs to run |
| @@ -315,12 +372,12 @@ void TaskQueue::ThreadState::RunThreadMain() { |
| if (result == (WAIT_OBJECT_0 + 1)) { |
| ::ResetEvent(in_queue_); |
| - TaskQueue::Current()->RunPendingTasks(); |
| + TaskQueue::Impl::Current()->RunPendingTasks(); |
| } |
| } |
| } |
| -bool TaskQueue::ThreadState::ProcessQueuedMessages() { |
| +bool TaskQueue::Impl::ThreadState::ProcessQueuedMessages() { |
| MSG msg = {}; |
| // To protect against overly busy message queues, we limit the time |
| // we process tasks to a few milliseconds. If we don't do that, there's |
| @@ -374,7 +431,7 @@ bool TaskQueue::ThreadState::ProcessQueuedMessages() { |
| return msg.message != WM_QUIT; |
| } |
| -void TaskQueue::ThreadState::RunDueTasks() { |
| +void TaskQueue::Impl::ThreadState::RunDueTasks() { |
| RTC_DCHECK(!timer_tasks_.empty()); |
| auto now = GetTick(); |
| do { |
| @@ -386,7 +443,7 @@ void TaskQueue::ThreadState::RunDueTasks() { |
| } while (!timer_tasks_.empty()); |
| } |
| -void TaskQueue::ThreadState::ScheduleNextTimer() { |
| +void TaskQueue::Impl::ThreadState::ScheduleNextTimer() { |
| RTC_DCHECK_EQ(timer_id_, 0); |
| if (timer_tasks_.empty()) |
| return; |
| @@ -398,7 +455,7 @@ void TaskQueue::ThreadState::ScheduleNextTimer() { |
| timer_id_ = ::SetTimer(nullptr, 0, milliseconds, nullptr); |
| } |
| -void TaskQueue::ThreadState::CancelTimers() { |
| +void TaskQueue::Impl::ThreadState::CancelTimers() { |
| timer_.Cancel(); |
| if (timer_id_) { |
| ::KillTimer(nullptr, timer_id_); |
| @@ -406,4 +463,43 @@ void TaskQueue::ThreadState::CancelTimers() { |
| } |
| } |
| +// Boilerplate for the PIMPL pattern. |
| +TaskQueue::TaskQueue(const char* queue_name, Priority priority) |
| + : impl_(new RefCountedObject<TaskQueue::Impl>(queue_name, this, priority)) { |
| +} |
| + |
| +TaskQueue::~TaskQueue() {} |
| + |
| +// static |
| +TaskQueue* TaskQueue::Current() { |
| + return TaskQueue::Impl::CurrentQueue(); |
| +} |
| + |
| +// Used for DCHECKing the current queue. |
| +bool TaskQueue::IsCurrent() const { |
| + return impl_->IsCurrent(); |
| +} |
| + |
| +void TaskQueue::PostTask(std::unique_ptr<QueuedTask> task) { |
| + return TaskQueue::impl_->PostTask(std::move(task)); |
| +} |
| + |
| +void TaskQueue::PostTaskAndReply(std::unique_ptr<QueuedTask> task, |
| + std::unique_ptr<QueuedTask> reply, |
| + TaskQueue* reply_queue) { |
| + return TaskQueue::impl_->PostTaskAndReply(std::move(task), std::move(reply), |
| + reply_queue->impl_.get()); |
| +} |
| + |
| +void TaskQueue::PostTaskAndReply(std::unique_ptr<QueuedTask> task, |
| + std::unique_ptr<QueuedTask> reply) { |
| + return TaskQueue::impl_->PostTaskAndReply(std::move(task), std::move(reply), |
| + impl_.get()); |
| +} |
| + |
| +void TaskQueue::PostDelayedTask(std::unique_ptr<QueuedTask> task, |
| + uint32_t milliseconds) { |
| + return TaskQueue::impl_->PostDelayedTask(std::move(task), milliseconds); |
| +} |
| + |
| } // namespace rtc |