Index: webrtc/base/task_queue_win.cc |
diff --git a/webrtc/base/task_queue_win.cc b/webrtc/base/task_queue_win.cc |
index f7332d76ed8e99a1acc235127d0c92e2e314bb96..a149dd88a6b05e0ee402ce5c78713c4042db2c6c 100644 |
--- a/webrtc/base/task_queue_win.cc |
+++ b/webrtc/base/task_queue_win.cc |
@@ -16,6 +16,7 @@ |
#include <algorithm> |
#include <queue> |
+#include "webrtc/base/arraysize.h" |
#include "webrtc/base/checks.h" |
#include "webrtc/base/logging.h" |
#include "webrtc/base/safe_conversions.h" |
@@ -117,7 +118,8 @@ class DelayedTaskInfo { |
class MultimediaTimer { |
public: |
- MultimediaTimer() : event_(::CreateEvent(nullptr, false, false, nullptr)) {} |
+ // Note: We create an event that requires manual reset. |
+ MultimediaTimer() : event_(::CreateEvent(nullptr, true, false, nullptr)) {} |
~MultimediaTimer() { |
Cancel(); |
@@ -134,6 +136,7 @@ class MultimediaTimer { |
} |
void Cancel() { |
+ ::ResetEvent(event_); |
if (timer_id_) { |
::timeKillEvent(timer_id_); |
timer_id_ = 0; |
@@ -153,7 +156,7 @@ class MultimediaTimer { |
class TaskQueue::ThreadState { |
public: |
- ThreadState() {} |
+ explicit ThreadState(HANDLE in_queue) : in_queue_(in_queue) {} |
~ThreadState() {} |
void RunThreadMain(); |
@@ -180,14 +183,17 @@ class TaskQueue::ThreadState { |
greater<DelayedTaskInfo>> |
timer_tasks_; |
UINT_PTR timer_id_ = 0; |
+ HANDLE in_queue_; |
}; |
TaskQueue::TaskQueue(const char* queue_name, Priority priority /*= NORMAL*/) |
: thread_(&TaskQueue::ThreadMain, |
this, |
queue_name, |
- TaskQueuePriorityToThreadPriority(priority)) { |
+ TaskQueuePriorityToThreadPriority(priority)), |
+ in_queue_(::CreateEvent(nullptr, true, false, nullptr)) { |
RTC_DCHECK(queue_name); |
+ RTC_DCHECK(in_queue_); |
thread_.Start(); |
Event event(false, false); |
ThreadStartupData startup = {&event, this}; |
@@ -203,6 +209,7 @@ TaskQueue::~TaskQueue() { |
Sleep(1); |
} |
thread_.Stop(); |
+ ::CloseHandle(in_queue_); |
} |
// static |
@@ -221,10 +228,9 @@ bool TaskQueue::IsCurrent() const { |
} |
void TaskQueue::PostTask(std::unique_ptr<QueuedTask> task) { |
- if (::PostThreadMessage(thread_.GetThreadRef(), WM_RUN_TASK, 0, |
- reinterpret_cast<LPARAM>(task.get()))) { |
- task.release(); |
- } |
+ rtc::CritScope lock(&pending_lock_); |
+ pending_.push(std::move(task)); |
+ ::SetEvent(in_queue_); |
} |
void TaskQueue::PostDelayedTask(std::unique_ptr<QueuedTask> task, |
@@ -268,42 +274,70 @@ void TaskQueue::PostTaskAndReply(std::unique_ptr<QueuedTask> task, |
return PostTaskAndReply(std::move(task), std::move(reply), Current()); |
} |
+void TaskQueue::RunPendingTasks() { |
+ while (true) { |
+ std::unique_ptr<QueuedTask> task; |
+ { |
+ rtc::CritScope lock(&pending_lock_); |
+ if (pending_.empty()) |
+ break; |
+ task = std::move(pending_.front()); |
+ pending_.pop(); |
+ } |
+ |
+ if (!task->Run()) |
+ task.release(); |
+ } |
+} |
+ |
// static |
void TaskQueue::ThreadMain(void* context) { |
- ThreadState state; |
+ ThreadState state(static_cast<TaskQueue*>(context)->in_queue_); |
state.RunThreadMain(); |
} |
void TaskQueue::ThreadState::RunThreadMain() { |
+ HANDLE handles[2] = { *timer_.event_for_wait(), in_queue_ }; |
while (true) { |
// Make sure we do an alertable wait as that's required to allow APCs to run |
// (e.g. required for InitializeQueueThread and stopping the thread in |
// PlatformThread). |
DWORD result = ::MsgWaitForMultipleObjectsEx( |
- 1, timer_.event_for_wait(), INFINITE, QS_ALLEVENTS, MWMO_ALERTABLE); |
+ arraysize(handles), handles, INFINITE, QS_ALLEVENTS, MWMO_ALERTABLE); |
RTC_CHECK_NE(WAIT_FAILED, result); |
- if (result == (WAIT_OBJECT_0 + 1)) { |
+ if (result == (WAIT_OBJECT_0 + 2)) { |
// There are messages in the message queue that need to be handled. |
if (!ProcessQueuedMessages()) |
break; |
- } else if (result == WAIT_OBJECT_0) { |
+ } |
+ |
+ if (result == WAIT_OBJECT_0 || (!timer_tasks_.empty() && |
+ ::WaitForSingleObject(*timer_.event_for_wait(), 0) == WAIT_OBJECT_0)) { |
// The multimedia timer was signaled. |
timer_.Cancel(); |
- RTC_DCHECK(!timer_tasks_.empty()); |
RunDueTasks(); |
ScheduleNextTimer(); |
- } else { |
- RTC_DCHECK_EQ(WAIT_IO_COMPLETION, result); |
+ } |
+ |
+ if (result == (WAIT_OBJECT_0 + 1)) { |
+ ::ResetEvent(in_queue_); |
+ TaskQueue::Current()->RunPendingTasks(); |
} |
} |
} |
bool TaskQueue::ThreadState::ProcessQueuedMessages() { |
MSG msg = {}; |
+ // To protect against overly busy message queues, we limit the time |
+ // we process tasks to a few milliseconds. If we don't do that, there's |
+ // a chance that timer tasks won't ever run. |
+ static const int kMaxTaskProcessingTimeMs = 500; |
+ auto start = GetTick(); |
while (::PeekMessage(&msg, nullptr, 0, 0, PM_REMOVE) && |
msg.message != WM_QUIT) { |
if (!msg.hwnd) { |
switch (msg.message) { |
+ // TODO(tommi): Stop using this way of queueing tasks. |
case WM_RUN_TASK: { |
QueuedTask* task = reinterpret_cast<QueuedTask*>(msg.lParam); |
if (task->Run()) |
@@ -339,6 +373,9 @@ bool TaskQueue::ThreadState::ProcessQueuedMessages() { |
::TranslateMessage(&msg); |
::DispatchMessage(&msg); |
} |
+ |
+ if (GetTick() > start + kMaxTaskProcessingTimeMs) |
+ break; |
} |
return msg.message != WM_QUIT; |
} |