Index: webrtc/rtc_base/task_queue_win.cc |
diff --git a/webrtc/rtc_base/task_queue_win.cc b/webrtc/rtc_base/task_queue_win.cc |
index 9b4d1a2308cebf8348881f7c27f2a33252fa3c72..00a8c79827573b7fde17d894879763430bce5c3c 100644 |
--- a/webrtc/rtc_base/task_queue_win.cc |
+++ b/webrtc/rtc_base/task_queue_win.cc |
@@ -18,7 +18,11 @@ |
#include "webrtc/rtc_base/arraysize.h" |
#include "webrtc/rtc_base/checks.h" |
+#include "webrtc/rtc_base/event.h" |
#include "webrtc/rtc_base/logging.h" |
+#include "webrtc/rtc_base/platform_thread.h" |
+#include "webrtc/rtc_base/refcount.h" |
+#include "webrtc/rtc_base/refcountedobject.h" |
#include "webrtc/rtc_base/safe_conversions.h" |
#include "webrtc/rtc_base/timeutils.h" |
@@ -154,44 +158,97 @@ class MultimediaTimer { |
} // namespace |
-class TaskQueue::ThreadState { |
+class TaskQueue::Impl : public RefCountInterface { |
public: |
- explicit ThreadState(HANDLE in_queue) : in_queue_(in_queue) {} |
- ~ThreadState() {} |
+ Impl(const char* queue_name, TaskQueue* queue, Priority priority); |
+ ~Impl() override; |
- void RunThreadMain(); |
+ static TaskQueue::Impl* Current(); |
+ static TaskQueue* CurrentQueue(); |
+ |
+ // Used for DCHECKing the current queue. |
+ bool IsCurrent() const; |
+ |
+ template <class Closure, |
+ typename std::enable_if< |
+ std::is_copy_constructible<Closure>::value>::type* = nullptr> |
+ void PostTask(const Closure& closure) { |
+ PostTask(std::unique_ptr<QueuedTask>(new ClosureTask<Closure>(closure))); |
+ } |
+ |
+ void PostTask(std::unique_ptr<QueuedTask> task); |
+ void PostTaskAndReply(std::unique_ptr<QueuedTask> task, |
+ std::unique_ptr<QueuedTask> reply, |
+ TaskQueue::Impl* reply_queue); |
+ |
+ void PostDelayedTask(std::unique_ptr<QueuedTask> task, uint32_t milliseconds); |
+ |
+ void RunPendingTasks(); |
private: |
- bool ProcessQueuedMessages(); |
- void RunDueTasks(); |
- void ScheduleNextTimer(); |
- void CancelTimers(); |
- |
- // Since priority_queue<> by defult orders items in terms of |
- // largest->smallest, using std::less<>, and we want smallest->largest, |
- // we would like to use std::greater<> here. Alas it's only available in |
- // C++14 and later, so we roll our own compare template that that relies on |
- // operator<(). |
- template <typename T> |
- struct greater { |
- bool operator()(const T& l, const T& r) { return l > r; } |
+ static void ThreadMain(void* context); |
+ |
+ class WorkerThread : public PlatformThread { |
+ public: |
+ WorkerThread(ThreadRunFunction func, |
+ void* obj, |
+ const char* thread_name, |
+ ThreadPriority priority) |
+ : PlatformThread(func, obj, thread_name, priority) {} |
+ |
+ bool QueueAPC(PAPCFUNC apc_function, ULONG_PTR data) { |
+ return PlatformThread::QueueAPC(apc_function, data); |
+ } |
}; |
- MultimediaTimer timer_; |
- std::priority_queue<DelayedTaskInfo, |
- std::vector<DelayedTaskInfo>, |
- greater<DelayedTaskInfo>> |
- timer_tasks_; |
- UINT_PTR timer_id_ = 0; |
+ class ThreadState { |
+ public: |
+ explicit ThreadState(HANDLE in_queue) : in_queue_(in_queue) {} |
+ ~ThreadState() {} |
+ |
+ void RunThreadMain(); |
+ |
+ private: |
+ bool ProcessQueuedMessages(); |
+ void RunDueTasks(); |
+ void ScheduleNextTimer(); |
+ void CancelTimers(); |
+ |
+ // Since priority_queue<> by defult orders items in terms of |
+ // largest->smallest, using std::less<>, and we want smallest->largest, |
+ // we would like to use std::greater<> here. Alas it's only available in |
+ // C++14 and later, so we roll our own compare template that that relies on |
+ // operator<(). |
+ template <typename T> |
+ struct greater { |
+ bool operator()(const T& l, const T& r) { return l > r; } |
+ }; |
+ |
+ MultimediaTimer timer_; |
+ std::priority_queue<DelayedTaskInfo, |
+ std::vector<DelayedTaskInfo>, |
+ greater<DelayedTaskInfo>> |
+ timer_tasks_; |
+ UINT_PTR timer_id_ = 0; |
+ HANDLE in_queue_; |
+ }; |
+ |
+ TaskQueue* const queue_; |
+ WorkerThread thread_; |
+ rtc::CriticalSection pending_lock_; |
+ std::queue<std::unique_ptr<QueuedTask>> pending_ GUARDED_BY(pending_lock_); |
HANDLE in_queue_; |
}; |
-TaskQueue::TaskQueue(const char* queue_name, Priority priority /*= NORMAL*/) |
- : thread_(&TaskQueue::ThreadMain, |
+TaskQueue::Impl::Impl(const char* queue_name, |
+ TaskQueue* queue, |
+ Priority priority) |
+ : queue_(queue), |
+ thread_(&TaskQueue::Impl::ThreadMain, |
this, |
queue_name, |
TaskQueuePriorityToThreadPriority(priority)), |
- in_queue_(::CreateEvent(nullptr, true, false, nullptr)) { |
+ in_queue_(::CreateEvent(nullptr, true, false, nullptr)) { |
RTC_DCHECK(queue_name); |
RTC_DCHECK(in_queue_); |
thread_.Start(); |
@@ -202,7 +259,7 @@ TaskQueue::TaskQueue(const char* queue_name, Priority priority /*= NORMAL*/) |
event.Wait(Event::kForever); |
} |
-TaskQueue::~TaskQueue() { |
+TaskQueue::Impl::~Impl() { |
RTC_DCHECK(!IsCurrent()); |
while (!::PostThreadMessage(thread_.GetThreadRef(), WM_QUIT, 0, 0)) { |
RTC_CHECK_EQ(ERROR_NOT_ENOUGH_QUOTA, ::GetLastError()); |
@@ -213,22 +270,28 @@ TaskQueue::~TaskQueue() { |
} |
// static |
-TaskQueue* TaskQueue::Current() { |
- return static_cast<TaskQueue*>(::TlsGetValue(GetQueuePtrTls())); |
+TaskQueue::Impl* TaskQueue::Impl::Current() { |
+ return static_cast<TaskQueue::Impl*>(::TlsGetValue(GetQueuePtrTls())); |
} |
-bool TaskQueue::IsCurrent() const { |
+// static |
+TaskQueue* TaskQueue::Impl::CurrentQueue() { |
+ TaskQueue::Impl* current = Current(); |
+ return current ? current->queue_ : nullptr; |
+} |
+ |
+bool TaskQueue::Impl::IsCurrent() const { |
return IsThreadRefEqual(thread_.GetThreadRef(), CurrentThreadRef()); |
} |
-void TaskQueue::PostTask(std::unique_ptr<QueuedTask> task) { |
+void TaskQueue::Impl::PostTask(std::unique_ptr<QueuedTask> task) { |
rtc::CritScope lock(&pending_lock_); |
pending_.push(std::move(task)); |
::SetEvent(in_queue_); |
} |
-void TaskQueue::PostDelayedTask(std::unique_ptr<QueuedTask> task, |
- uint32_t milliseconds) { |
+void TaskQueue::Impl::PostDelayedTask(std::unique_ptr<QueuedTask> task, |
+ uint32_t milliseconds) { |
if (!milliseconds) { |
PostTask(std::move(task)); |
return; |
@@ -245,9 +308,9 @@ void TaskQueue::PostDelayedTask(std::unique_ptr<QueuedTask> task, |
} |
} |
-void TaskQueue::PostTaskAndReply(std::unique_ptr<QueuedTask> task, |
- std::unique_ptr<QueuedTask> reply, |
- TaskQueue* reply_queue) { |
+void TaskQueue::Impl::PostTaskAndReply(std::unique_ptr<QueuedTask> task, |
+ std::unique_ptr<QueuedTask> reply, |
+ TaskQueue::Impl* reply_queue) { |
QueuedTask* task_ptr = task.release(); |
QueuedTask* reply_task_ptr = reply.release(); |
DWORD reply_thread_id = reply_queue->thread_.GetThreadRef(); |
@@ -263,12 +326,7 @@ void TaskQueue::PostTaskAndReply(std::unique_ptr<QueuedTask> task, |
}); |
} |
-void TaskQueue::PostTaskAndReply(std::unique_ptr<QueuedTask> task, |
- std::unique_ptr<QueuedTask> reply) { |
- return PostTaskAndReply(std::move(task), std::move(reply), Current()); |
-} |
- |
-void TaskQueue::RunPendingTasks() { |
+void TaskQueue::Impl::RunPendingTasks() { |
while (true) { |
std::unique_ptr<QueuedTask> task; |
{ |
@@ -285,12 +343,12 @@ void TaskQueue::RunPendingTasks() { |
} |
// static |
-void TaskQueue::ThreadMain(void* context) { |
- ThreadState state(static_cast<TaskQueue*>(context)->in_queue_); |
+void TaskQueue::Impl::ThreadMain(void* context) { |
+ ThreadState state(static_cast<TaskQueue::Impl*>(context)->in_queue_); |
state.RunThreadMain(); |
} |
-void TaskQueue::ThreadState::RunThreadMain() { |
+void TaskQueue::Impl::ThreadState::RunThreadMain() { |
HANDLE handles[2] = { *timer_.event_for_wait(), in_queue_ }; |
while (true) { |
// Make sure we do an alertable wait as that's required to allow APCs to run |
@@ -315,12 +373,12 @@ void TaskQueue::ThreadState::RunThreadMain() { |
if (result == (WAIT_OBJECT_0 + 1)) { |
::ResetEvent(in_queue_); |
- TaskQueue::Current()->RunPendingTasks(); |
+ TaskQueue::Impl::Current()->RunPendingTasks(); |
} |
} |
} |
-bool TaskQueue::ThreadState::ProcessQueuedMessages() { |
+bool TaskQueue::Impl::ThreadState::ProcessQueuedMessages() { |
MSG msg = {}; |
// To protect against overly busy message queues, we limit the time |
// we process tasks to a few milliseconds. If we don't do that, there's |
@@ -374,7 +432,7 @@ bool TaskQueue::ThreadState::ProcessQueuedMessages() { |
return msg.message != WM_QUIT; |
} |
-void TaskQueue::ThreadState::RunDueTasks() { |
+void TaskQueue::Impl::ThreadState::RunDueTasks() { |
RTC_DCHECK(!timer_tasks_.empty()); |
auto now = GetTick(); |
do { |
@@ -386,7 +444,7 @@ void TaskQueue::ThreadState::RunDueTasks() { |
} while (!timer_tasks_.empty()); |
} |
-void TaskQueue::ThreadState::ScheduleNextTimer() { |
+void TaskQueue::Impl::ThreadState::ScheduleNextTimer() { |
RTC_DCHECK_EQ(timer_id_, 0); |
if (timer_tasks_.empty()) |
return; |
@@ -398,7 +456,7 @@ void TaskQueue::ThreadState::ScheduleNextTimer() { |
timer_id_ = ::SetTimer(nullptr, 0, milliseconds, nullptr); |
} |
-void TaskQueue::ThreadState::CancelTimers() { |
+void TaskQueue::Impl::ThreadState::CancelTimers() { |
timer_.Cancel(); |
if (timer_id_) { |
::KillTimer(nullptr, timer_id_); |
@@ -406,4 +464,43 @@ void TaskQueue::ThreadState::CancelTimers() { |
} |
} |
+// Boilerplate for the PIMPL pattern. |
+TaskQueue::TaskQueue(const char* queue_name, Priority priority) |
+ : impl_(new RefCountedObject<TaskQueue::Impl>(queue_name, this, priority)) { |
+} |
+ |
+TaskQueue::~TaskQueue() {} |
+ |
+// static |
+TaskQueue* TaskQueue::Current() { |
+ return TaskQueue::Impl::CurrentQueue(); |
+} |
+ |
+// Used for DCHECKing the current queue. |
+bool TaskQueue::IsCurrent() const { |
+ return impl_->IsCurrent(); |
+} |
+ |
+void TaskQueue::PostTask(std::unique_ptr<QueuedTask> task) { |
+ return TaskQueue::impl_->PostTask(std::move(task)); |
+} |
+ |
+void TaskQueue::PostTaskAndReply(std::unique_ptr<QueuedTask> task, |
+ std::unique_ptr<QueuedTask> reply, |
+ TaskQueue* reply_queue) { |
+ return TaskQueue::impl_->PostTaskAndReply(std::move(task), std::move(reply), |
+ reply_queue->impl_.get()); |
+} |
+ |
+void TaskQueue::PostTaskAndReply(std::unique_ptr<QueuedTask> task, |
+ std::unique_ptr<QueuedTask> reply) { |
+ return TaskQueue::impl_->PostTaskAndReply(std::move(task), std::move(reply), |
+ impl_.get()); |
+} |
+ |
+void TaskQueue::PostDelayedTask(std::unique_ptr<QueuedTask> task, |
+ uint32_t milliseconds) { |
+ return TaskQueue::impl_->PostDelayedTask(std::move(task), milliseconds); |
+} |
+ |
} // namespace rtc |