Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(88)

Unified Diff: webrtc/base/task_queue_win.cc

Issue 2750853002: TaskQueue[Win] DOS handling (Closed)
Patch Set: Cleanup Created 3 years, 9 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « webrtc/base/task_queue.h ('k') | no next file » | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: webrtc/base/task_queue_win.cc
diff --git a/webrtc/base/task_queue_win.cc b/webrtc/base/task_queue_win.cc
index f7332d76ed8e99a1acc235127d0c92e2e314bb96..a149dd88a6b05e0ee402ce5c78713c4042db2c6c 100644
--- a/webrtc/base/task_queue_win.cc
+++ b/webrtc/base/task_queue_win.cc
@@ -16,6 +16,7 @@
#include <algorithm>
#include <queue>
+#include "webrtc/base/arraysize.h"
#include "webrtc/base/checks.h"
#include "webrtc/base/logging.h"
#include "webrtc/base/safe_conversions.h"
@@ -117,7 +118,8 @@ class DelayedTaskInfo {
class MultimediaTimer {
public:
- MultimediaTimer() : event_(::CreateEvent(nullptr, false, false, nullptr)) {}
+ // Note: We create an event that requires manual reset.
+ MultimediaTimer() : event_(::CreateEvent(nullptr, true, false, nullptr)) {}
~MultimediaTimer() {
Cancel();
@@ -134,6 +136,7 @@ class MultimediaTimer {
}
void Cancel() {
+ ::ResetEvent(event_);
if (timer_id_) {
::timeKillEvent(timer_id_);
timer_id_ = 0;
@@ -153,7 +156,7 @@ class MultimediaTimer {
class TaskQueue::ThreadState {
public:
- ThreadState() {}
+ explicit ThreadState(HANDLE in_queue) : in_queue_(in_queue) {}
~ThreadState() {}
void RunThreadMain();
@@ -180,14 +183,17 @@ class TaskQueue::ThreadState {
greater<DelayedTaskInfo>>
timer_tasks_;
UINT_PTR timer_id_ = 0;
+ HANDLE in_queue_;
};
TaskQueue::TaskQueue(const char* queue_name, Priority priority /*= NORMAL*/)
: thread_(&TaskQueue::ThreadMain,
this,
queue_name,
- TaskQueuePriorityToThreadPriority(priority)) {
+ TaskQueuePriorityToThreadPriority(priority)),
+ in_queue_(::CreateEvent(nullptr, true, false, nullptr)) {
RTC_DCHECK(queue_name);
+ RTC_DCHECK(in_queue_);
thread_.Start();
Event event(false, false);
ThreadStartupData startup = {&event, this};
@@ -203,6 +209,7 @@ TaskQueue::~TaskQueue() {
Sleep(1);
}
thread_.Stop();
+ ::CloseHandle(in_queue_);
}
// static
@@ -221,10 +228,9 @@ bool TaskQueue::IsCurrent() const {
}
void TaskQueue::PostTask(std::unique_ptr<QueuedTask> task) {
- if (::PostThreadMessage(thread_.GetThreadRef(), WM_RUN_TASK, 0,
- reinterpret_cast<LPARAM>(task.get()))) {
- task.release();
- }
+ rtc::CritScope lock(&pending_lock_);
+ pending_.push(std::move(task));
+ ::SetEvent(in_queue_);
}
void TaskQueue::PostDelayedTask(std::unique_ptr<QueuedTask> task,
@@ -268,42 +274,70 @@ void TaskQueue::PostTaskAndReply(std::unique_ptr<QueuedTask> task,
return PostTaskAndReply(std::move(task), std::move(reply), Current());
}
+void TaskQueue::RunPendingTasks() {
+ while (true) {
+ std::unique_ptr<QueuedTask> task;
+ {
+ rtc::CritScope lock(&pending_lock_);
+ if (pending_.empty())
+ break;
+ task = std::move(pending_.front());
+ pending_.pop();
+ }
+
+ if (!task->Run())
+ task.release();
+ }
+}
+
// static
void TaskQueue::ThreadMain(void* context) {
- ThreadState state;
+ ThreadState state(static_cast<TaskQueue*>(context)->in_queue_);
state.RunThreadMain();
}
void TaskQueue::ThreadState::RunThreadMain() {
+ HANDLE handles[2] = { *timer_.event_for_wait(), in_queue_ };
while (true) {
// Make sure we do an alertable wait as that's required to allow APCs to run
// (e.g. required for InitializeQueueThread and stopping the thread in
// PlatformThread).
DWORD result = ::MsgWaitForMultipleObjectsEx(
- 1, timer_.event_for_wait(), INFINITE, QS_ALLEVENTS, MWMO_ALERTABLE);
+ arraysize(handles), handles, INFINITE, QS_ALLEVENTS, MWMO_ALERTABLE);
RTC_CHECK_NE(WAIT_FAILED, result);
- if (result == (WAIT_OBJECT_0 + 1)) {
+ if (result == (WAIT_OBJECT_0 + 2)) {
// There are messages in the message queue that need to be handled.
if (!ProcessQueuedMessages())
break;
- } else if (result == WAIT_OBJECT_0) {
+ }
+
+ if (result == WAIT_OBJECT_0 || (!timer_tasks_.empty() &&
+ ::WaitForSingleObject(*timer_.event_for_wait(), 0) == WAIT_OBJECT_0)) {
// The multimedia timer was signaled.
timer_.Cancel();
- RTC_DCHECK(!timer_tasks_.empty());
RunDueTasks();
ScheduleNextTimer();
- } else {
- RTC_DCHECK_EQ(WAIT_IO_COMPLETION, result);
+ }
+
+ if (result == (WAIT_OBJECT_0 + 1)) {
+ ::ResetEvent(in_queue_);
+ TaskQueue::Current()->RunPendingTasks();
}
}
}
bool TaskQueue::ThreadState::ProcessQueuedMessages() {
MSG msg = {};
+ // To protect against overly busy message queues, we limit the time
+ // we process tasks to a few milliseconds. If we don't do that, there's
+ // a chance that timer tasks won't ever run.
+ static const int kMaxTaskProcessingTimeMs = 500;
+ auto start = GetTick();
while (::PeekMessage(&msg, nullptr, 0, 0, PM_REMOVE) &&
msg.message != WM_QUIT) {
if (!msg.hwnd) {
switch (msg.message) {
+ // TODO(tommi): Stop using this way of queueing tasks.
case WM_RUN_TASK: {
QueuedTask* task = reinterpret_cast<QueuedTask*>(msg.lParam);
if (task->Run())
@@ -339,6 +373,9 @@ bool TaskQueue::ThreadState::ProcessQueuedMessages() {
::TranslateMessage(&msg);
::DispatchMessage(&msg);
}
+
+ if (GetTick() > start + kMaxTaskProcessingTimeMs)
+ break;
}
return msg.message != WM_QUIT;
}
« no previous file with comments | « webrtc/base/task_queue.h ('k') | no next file » | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698