| Index: webrtc/base/task_queue_win.cc
|
| diff --git a/webrtc/base/task_queue_win.cc b/webrtc/base/task_queue_win.cc
|
| index f7332d76ed8e99a1acc235127d0c92e2e314bb96..a149dd88a6b05e0ee402ce5c78713c4042db2c6c 100644
|
| --- a/webrtc/base/task_queue_win.cc
|
| +++ b/webrtc/base/task_queue_win.cc
|
| @@ -16,6 +16,7 @@
|
| #include <algorithm>
|
| #include <queue>
|
|
|
| +#include "webrtc/base/arraysize.h"
|
| #include "webrtc/base/checks.h"
|
| #include "webrtc/base/logging.h"
|
| #include "webrtc/base/safe_conversions.h"
|
| @@ -117,7 +118,8 @@ class DelayedTaskInfo {
|
|
|
| class MultimediaTimer {
|
| public:
|
| - MultimediaTimer() : event_(::CreateEvent(nullptr, false, false, nullptr)) {}
|
| + // Note: We create an event that requires manual reset.
|
| + MultimediaTimer() : event_(::CreateEvent(nullptr, true, false, nullptr)) {}
|
|
|
| ~MultimediaTimer() {
|
| Cancel();
|
| @@ -134,6 +136,7 @@ class MultimediaTimer {
|
| }
|
|
|
| void Cancel() {
|
| + ::ResetEvent(event_);
|
| if (timer_id_) {
|
| ::timeKillEvent(timer_id_);
|
| timer_id_ = 0;
|
| @@ -153,7 +156,7 @@ class MultimediaTimer {
|
|
|
| class TaskQueue::ThreadState {
|
| public:
|
| - ThreadState() {}
|
| + explicit ThreadState(HANDLE in_queue) : in_queue_(in_queue) {}
|
| ~ThreadState() {}
|
|
|
| void RunThreadMain();
|
| @@ -180,14 +183,17 @@ class TaskQueue::ThreadState {
|
| greater<DelayedTaskInfo>>
|
| timer_tasks_;
|
| UINT_PTR timer_id_ = 0;
|
| + HANDLE in_queue_;
|
| };
|
|
|
| TaskQueue::TaskQueue(const char* queue_name, Priority priority /*= NORMAL*/)
|
| : thread_(&TaskQueue::ThreadMain,
|
| this,
|
| queue_name,
|
| - TaskQueuePriorityToThreadPriority(priority)) {
|
| + TaskQueuePriorityToThreadPriority(priority)),
|
| + in_queue_(::CreateEvent(nullptr, true, false, nullptr)) {
|
| RTC_DCHECK(queue_name);
|
| + RTC_DCHECK(in_queue_);
|
| thread_.Start();
|
| Event event(false, false);
|
| ThreadStartupData startup = {&event, this};
|
| @@ -203,6 +209,7 @@ TaskQueue::~TaskQueue() {
|
| Sleep(1);
|
| }
|
| thread_.Stop();
|
| + ::CloseHandle(in_queue_);
|
| }
|
|
|
| // static
|
| @@ -221,10 +228,9 @@ bool TaskQueue::IsCurrent() const {
|
| }
|
|
|
| void TaskQueue::PostTask(std::unique_ptr<QueuedTask> task) {
|
| - if (::PostThreadMessage(thread_.GetThreadRef(), WM_RUN_TASK, 0,
|
| - reinterpret_cast<LPARAM>(task.get()))) {
|
| - task.release();
|
| - }
|
| + rtc::CritScope lock(&pending_lock_);
|
| + pending_.push(std::move(task));
|
| + ::SetEvent(in_queue_);
|
| }
|
|
|
| void TaskQueue::PostDelayedTask(std::unique_ptr<QueuedTask> task,
|
| @@ -268,42 +274,70 @@ void TaskQueue::PostTaskAndReply(std::unique_ptr<QueuedTask> task,
|
| return PostTaskAndReply(std::move(task), std::move(reply), Current());
|
| }
|
|
|
| +void TaskQueue::RunPendingTasks() {
|
| + while (true) {
|
| + std::unique_ptr<QueuedTask> task;
|
| + {
|
| + rtc::CritScope lock(&pending_lock_);
|
| + if (pending_.empty())
|
| + break;
|
| + task = std::move(pending_.front());
|
| + pending_.pop();
|
| + }
|
| +
|
| + if (!task->Run())
|
| + task.release();
|
| + }
|
| +}
|
| +
|
| // static
|
| void TaskQueue::ThreadMain(void* context) {
|
| - ThreadState state;
|
| + ThreadState state(static_cast<TaskQueue*>(context)->in_queue_);
|
| state.RunThreadMain();
|
| }
|
|
|
| void TaskQueue::ThreadState::RunThreadMain() {
|
| + HANDLE handles[2] = { *timer_.event_for_wait(), in_queue_ };
|
| while (true) {
|
| // Make sure we do an alertable wait as that's required to allow APCs to run
|
| // (e.g. required for InitializeQueueThread and stopping the thread in
|
| // PlatformThread).
|
| DWORD result = ::MsgWaitForMultipleObjectsEx(
|
| - 1, timer_.event_for_wait(), INFINITE, QS_ALLEVENTS, MWMO_ALERTABLE);
|
| + arraysize(handles), handles, INFINITE, QS_ALLEVENTS, MWMO_ALERTABLE);
|
| RTC_CHECK_NE(WAIT_FAILED, result);
|
| - if (result == (WAIT_OBJECT_0 + 1)) {
|
| + if (result == (WAIT_OBJECT_0 + 2)) {
|
| // There are messages in the message queue that need to be handled.
|
| if (!ProcessQueuedMessages())
|
| break;
|
| - } else if (result == WAIT_OBJECT_0) {
|
| + }
|
| +
|
| + if (result == WAIT_OBJECT_0 || (!timer_tasks_.empty() &&
|
| + ::WaitForSingleObject(*timer_.event_for_wait(), 0) == WAIT_OBJECT_0)) {
|
| // The multimedia timer was signaled.
|
| timer_.Cancel();
|
| - RTC_DCHECK(!timer_tasks_.empty());
|
| RunDueTasks();
|
| ScheduleNextTimer();
|
| - } else {
|
| - RTC_DCHECK_EQ(WAIT_IO_COMPLETION, result);
|
| + }
|
| +
|
| + if (result == (WAIT_OBJECT_0 + 1)) {
|
| + ::ResetEvent(in_queue_);
|
| + TaskQueue::Current()->RunPendingTasks();
|
| }
|
| }
|
| }
|
|
|
| bool TaskQueue::ThreadState::ProcessQueuedMessages() {
|
| MSG msg = {};
|
| + // To protect against overly busy message queues, we limit the time
|
| + // we process tasks to a few milliseconds. If we don't do that, there's
|
| + // a chance that timer tasks won't ever run.
|
| + static const int kMaxTaskProcessingTimeMs = 500;
|
| + auto start = GetTick();
|
| while (::PeekMessage(&msg, nullptr, 0, 0, PM_REMOVE) &&
|
| msg.message != WM_QUIT) {
|
| if (!msg.hwnd) {
|
| switch (msg.message) {
|
| + // TODO(tommi): Stop using this way of queueing tasks.
|
| case WM_RUN_TASK: {
|
| QueuedTask* task = reinterpret_cast<QueuedTask*>(msg.lParam);
|
| if (task->Run())
|
| @@ -339,6 +373,9 @@ bool TaskQueue::ThreadState::ProcessQueuedMessages() {
|
| ::TranslateMessage(&msg);
|
| ::DispatchMessage(&msg);
|
| }
|
| +
|
| + if (GetTick() > start + kMaxTaskProcessingTimeMs)
|
| + break;
|
| }
|
| return msg.message != WM_QUIT;
|
| }
|
|
|