Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(78)

Unified Diff: webrtc/base/task_queue_win.cc

Issue 2733723002: Refactor Windows TaskQueue code to only need a single high res timer. (Closed)
Patch Set: use greater<> and not greater_or_equal Created 3 years, 9 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « webrtc/base/task_queue_unittest.cc ('k') | no next file » | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: webrtc/base/task_queue_win.cc
diff --git a/webrtc/base/task_queue_win.cc b/webrtc/base/task_queue_win.cc
index bbaf7b9f44b4383feff79fd08697fea4e08c7e6b..f7332d76ed8e99a1acc235127d0c92e2e314bb96 100644
--- a/webrtc/base/task_queue_win.cc
+++ b/webrtc/base/task_queue_win.cc
@@ -14,10 +14,12 @@
#include <string.h>
#include <algorithm>
+#include <queue>
-#include "webrtc/base/arraysize.h"
#include "webrtc/base/checks.h"
#include "webrtc/base/logging.h"
+#include "webrtc/base/safe_conversions.h"
+#include "webrtc/base/timeutils.h"
namespace rtc {
namespace {
@@ -67,112 +69,119 @@ ThreadPriority TaskQueuePriorityToThreadPriority(Priority priority) {
return kNormalPriority;
}
-#if defined(_WIN64)
-DWORD GetTick() {
+int64_t GetTick() {
static const UINT kPeriod = 1;
bool high_res = (timeBeginPeriod(kPeriod) == TIMERR_NOERROR);
- DWORD ret = timeGetTime();
+ int64_t ret = TimeMillis();
if (high_res)
timeEndPeriod(kPeriod);
return ret;
}
-#endif
-} // namespace
-class TaskQueue::MultimediaTimer {
+class DelayedTaskInfo {
public:
- // kMaxTimers defines the limit of how many MultimediaTimer instances should
- // be created.
- // Background: The maximum number of supported handles for Wait functions, is
- // MAXIMUM_WAIT_OBJECTS - 1 (63).
- // There are some ways to work around the limitation but as it turns out, the
- // limit of concurrently active multimedia timers per process, is much lower,
- // or 16. So there isn't much value in going to the lenghts required to
- // overcome the Wait limitations.
- // kMaxTimers is larger than 16 though since it is possible that 'complete' or
- // signaled timers that haven't been handled, are counted as part of
- // kMaxTimers and thus a multimedia timer can actually be queued even though
- // as far as we're concerned, there are more than 16 that are pending.
- static const int kMaxTimers = MAXIMUM_WAIT_OBJECTS - 1;
-
- // Controls how many MultimediaTimer instances a queue can hold before
- // attempting to garbage collect (GC) timers that aren't in use.
- static const int kInstanceThresholdGC = 8;
+ // Default ctor needed to support priority_queue::pop().
+ DelayedTaskInfo() {}
+ DelayedTaskInfo(uint32_t milliseconds, std::unique_ptr<QueuedTask> task)
+ : due_time_(GetTick() + milliseconds), task_(std::move(task)) {}
+ DelayedTaskInfo(DelayedTaskInfo&&) = default;
+
+ // Implement for priority_queue.
+ bool operator>(const DelayedTaskInfo& other) const {
+ return due_time_ > other.due_time_;
+ }
- MultimediaTimer() : event_(::CreateEvent(nullptr, false, false, nullptr)) {}
+ // Required by priority_queue::pop().
+ DelayedTaskInfo& operator=(DelayedTaskInfo&& other) = default;
- MultimediaTimer(MultimediaTimer&& timer)
- : event_(timer.event_),
- timer_id_(timer.timer_id_),
- task_(std::move(timer.task_)) {
- RTC_DCHECK(event_);
- timer.event_ = nullptr;
- timer.timer_id_ = 0;
+ // See below for why this method is const.
+ void Run() const {
+ RTC_DCHECK(due_time_);
+ task_->Run() ? task_.reset() : static_cast<void>(task_.release());
}
- ~MultimediaTimer() { Close(); }
-
- // Implementing this operator is required because of the way
- // some stl algorithms work, such as std::rotate().
- MultimediaTimer& operator=(MultimediaTimer&& timer) {
- if (this != &timer) {
- Close();
- event_ = timer.event_;
- timer.event_ = nullptr;
- task_ = std::move(timer.task_);
- timer_id_ = timer.timer_id_;
- timer.timer_id_ = 0;
- }
- return *this;
+ int64_t due_time() const { return due_time_; }
+
+ private:
+ int64_t due_time_ = 0; // Absolute timestamp in milliseconds.
+
+ // |task| needs to be mutable because std::priority_queue::top() returns
+ // a const reference and a key in an ordered queue must not be changed.
+ // There are two basic workarounds, one using const_cast, which would also
+ // make the key (|due_time|), non-const and the other is to make the non-key
+ // (|task|), mutable.
+ // Because of this, the |task| variable is made private and can only be
+ // mutated by calling the |Run()| method.
+ mutable std::unique_ptr<QueuedTask> task_;
+};
+
+class MultimediaTimer {
+ public:
+ MultimediaTimer() : event_(::CreateEvent(nullptr, false, false, nullptr)) {}
+
+ ~MultimediaTimer() {
+ Cancel();
+ ::CloseHandle(event_);
}
- bool StartOneShotTimer(std::unique_ptr<QueuedTask> task, UINT delay_ms) {
+ bool StartOneShotTimer(UINT delay_ms) {
RTC_DCHECK_EQ(0, timer_id_);
RTC_DCHECK(event_ != nullptr);
- RTC_DCHECK(!task_.get());
- RTC_DCHECK(task.get());
- task_ = std::move(task);
timer_id_ =
::timeSetEvent(delay_ms, 0, reinterpret_cast<LPTIMECALLBACK>(event_), 0,
TIME_ONESHOT | TIME_CALLBACK_EVENT_SET);
return timer_id_ != 0;
}
- std::unique_ptr<QueuedTask> Cancel() {
+ void Cancel() {
if (timer_id_) {
::timeKillEvent(timer_id_);
timer_id_ = 0;
}
- return std::move(task_);
- }
-
- void OnEventSignaled() {
- RTC_DCHECK_NE(0, timer_id_);
- timer_id_ = 0;
- task_->Run() ? task_.reset() : static_cast<void>(task_.release());
}
- HANDLE event() const { return event_; }
-
- bool is_active() const { return timer_id_ != 0; }
+ HANDLE* event_for_wait() { return &event_; }
private:
- void Close() {
- Cancel();
-
- if (event_) {
- ::CloseHandle(event_);
- event_ = nullptr;
- }
- }
-
HANDLE event_ = nullptr;
MMRESULT timer_id_ = 0;
- std::unique_ptr<QueuedTask> task_;
RTC_DISALLOW_COPY_AND_ASSIGN(MultimediaTimer);
};
+} // namespace
+
+class TaskQueue::ThreadState {
+ public:
+ ThreadState() {}
+ ~ThreadState() {}
+
+ void RunThreadMain();
+
+ private:
+ bool ProcessQueuedMessages();
+ void RunDueTasks();
+ void ScheduleNextTimer();
+ void CancelTimers();
+
+ // Since priority_queue<> by defult orders items in terms of
+ // largest->smallest, using std::less<>, and we want smallest->largest,
+ // we would like to use std::greater<> here. Alas it's only available in
+ // C++14 and later, so we roll our own compare template that that relies on
+ // operator<().
+ template <typename T>
+ struct greater {
+ bool operator()(const T& l, const T& r) { return l > r; }
+ };
+
+ MultimediaTimer timer_;
+ std::priority_queue<DelayedTaskInfo,
+ std::vector<DelayedTaskInfo>,
+ greater<DelayedTaskInfo>>
+ timer_tasks_;
+ UINT_PTR timer_id_ = 0;
+};
+
TaskQueue::TaskQueue(const char* queue_name, Priority priority /*= NORMAL*/)
: thread_(&TaskQueue::ThreadMain,
this,
@@ -220,18 +229,19 @@ void TaskQueue::PostTask(std::unique_ptr<QueuedTask> task) {
void TaskQueue::PostDelayedTask(std::unique_ptr<QueuedTask> task,
uint32_t milliseconds) {
- WPARAM wparam;
-#if defined(_WIN64)
- // GetTickCount() returns a fairly coarse tick count (resolution or about 8ms)
- // so this compensation isn't that accurate, but since we have unused 32 bits
- // on Win64, we might as well use them.
- wparam = (static_cast<WPARAM>(GetTick()) << 32) | milliseconds;
-#else
- wparam = milliseconds;
-#endif
- if (::PostThreadMessage(thread_.GetThreadRef(), WM_QUEUE_DELAYED_TASK, wparam,
- reinterpret_cast<LPARAM>(task.get()))) {
- task.release();
+ if (!milliseconds) {
+ PostTask(std::move(task));
+ return;
+ }
+
+ // TODO(tommi): Avoid this allocation. It is currently here since
+ // the timestamp stored in the task info object, is a 64bit timestamp
+ // and WPARAM is 32bits in 32bit builds. Otherwise, we could pass the
+ // task pointer and timestamp as LPARAM and WPARAM.
+ auto* task_info = new DelayedTaskInfo(milliseconds, std::move(task));
+ if (!::PostThreadMessage(thread_.GetThreadRef(), WM_QUEUE_DELAYED_TASK, 0,
+ reinterpret_cast<LPARAM>(task_info))) {
+ delete task_info;
}
}
@@ -260,65 +270,35 @@ void TaskQueue::PostTaskAndReply(std::unique_ptr<QueuedTask> task,
// static
void TaskQueue::ThreadMain(void* context) {
- HANDLE timer_handles[MultimediaTimer::kMaxTimers];
- // Active multimedia timers.
- std::vector<MultimediaTimer> mm_timers;
- // Tasks that have been queued by using SetTimer/WM_TIMER.
- DelayedTasks delayed_tasks;
+ ThreadState state;
+ state.RunThreadMain();
+}
+void TaskQueue::ThreadState::RunThreadMain() {
while (true) {
- RTC_DCHECK(mm_timers.size() <= arraysize(timer_handles));
- DWORD count = 0;
- for (const auto& t : mm_timers) {
- if (!t.is_active())
- break;
- timer_handles[count++] = t.event();
- }
// Make sure we do an alertable wait as that's required to allow APCs to run
// (e.g. required for InitializeQueueThread and stopping the thread in
// PlatformThread).
- DWORD result = ::MsgWaitForMultipleObjectsEx(count, timer_handles, INFINITE,
- QS_ALLEVENTS, MWMO_ALERTABLE);
+ DWORD result = ::MsgWaitForMultipleObjectsEx(
+ 1, timer_.event_for_wait(), INFINITE, QS_ALLEVENTS, MWMO_ALERTABLE);
RTC_CHECK_NE(WAIT_FAILED, result);
- // If we're not waiting for any timers, then count will be equal to
- // WAIT_OBJECT_0. If we're waiting for timers, then |count| represents
- // "One more than the number of timers", which means that there's a
- // message in the queue that needs to be handled.
- // If |result| is less than |count|, then its value will be the index of the
- // timer that has been signaled.
- if (result == (WAIT_OBJECT_0 + count)) {
- if (!ProcessQueuedMessages(&delayed_tasks, &mm_timers))
+ if (result == (WAIT_OBJECT_0 + 1)) {
+ // There are messages in the message queue that need to be handled.
+ if (!ProcessQueuedMessages())
break;
- } else if (result < (WAIT_OBJECT_0 + count)) {
- mm_timers[result].OnEventSignaled();
- RTC_DCHECK(!mm_timers[result].is_active());
- // Reuse timer events by moving inactive timers to the back of the vector.
- // When new delayed tasks are queued, they'll get reused.
- if (mm_timers.size() > 1) {
- auto it = mm_timers.begin() + result;
- std::rotate(it, it + 1, mm_timers.end());
- }
-
- // Collect some garbage.
- if (mm_timers.size() > MultimediaTimer::kInstanceThresholdGC) {
- const auto inactive = std::find_if(
- mm_timers.begin(), mm_timers.end(),
- [](const MultimediaTimer& t) { return !t.is_active(); });
- if (inactive != mm_timers.end()) {
- // Since inactive timers are always moved to the back, we can
- // safely delete all timers following the first inactive one.
- mm_timers.erase(inactive, mm_timers.end());
- }
- }
+ } else if (result == WAIT_OBJECT_0) {
+ // The multimedia timer was signaled.
+ timer_.Cancel();
+ RTC_DCHECK(!timer_tasks_.empty());
+ RunDueTasks();
+ ScheduleNextTimer();
} else {
RTC_DCHECK_EQ(WAIT_IO_COMPLETION, result);
}
}
}
-// static
-bool TaskQueue::ProcessQueuedMessages(DelayedTasks* delayed_tasks,
- std::vector<MultimediaTimer>* timers) {
+bool TaskQueue::ThreadState::ProcessQueuedMessages() {
MSG msg = {};
while (::PeekMessage(&msg, nullptr, 0, 0, PM_REMOVE) &&
msg.message != WM_QUIT) {
@@ -331,53 +311,24 @@ bool TaskQueue::ProcessQueuedMessages(DelayedTasks* delayed_tasks,
break;
}
case WM_QUEUE_DELAYED_TASK: {
- std::unique_ptr<QueuedTask> task(
- reinterpret_cast<QueuedTask*>(msg.lParam));
- uint32_t milliseconds = msg.wParam & 0xFFFFFFFF;
-#if defined(_WIN64)
- // Subtract the time it took to queue the timer.
- const DWORD now = GetTick();
- DWORD post_time = now - (msg.wParam >> 32);
- milliseconds =
- post_time > milliseconds ? 0 : milliseconds - post_time;
-#endif
- bool timer_queued = false;
- if (timers->size() < MultimediaTimer::kMaxTimers) {
- MultimediaTimer* timer = nullptr;
- auto available = std::find_if(
- timers->begin(), timers->end(),
- [](const MultimediaTimer& t) { return !t.is_active(); });
- if (available != timers->end()) {
- timer = &(*available);
- } else {
- timers->emplace_back();
- timer = &timers->back();
- }
-
- timer_queued =
- timer->StartOneShotTimer(std::move(task), milliseconds);
- if (!timer_queued) {
- // No more multimedia timers can be queued.
- // Detach the task and fall back on SetTimer.
- task = timer->Cancel();
- }
- }
-
- // When we fail to use multimedia timers, we fall back on the more
- // coarse SetTimer/WM_TIMER approach.
- if (!timer_queued) {
- UINT_PTR timer_id = ::SetTimer(nullptr, 0, milliseconds, nullptr);
- delayed_tasks->insert(std::make_pair(timer_id, task.release()));
+ std::unique_ptr<DelayedTaskInfo> info(
+ reinterpret_cast<DelayedTaskInfo*>(msg.lParam));
+ bool need_to_schedule_timers =
+ timer_tasks_.empty() ||
+ timer_tasks_.top().due_time() > info->due_time();
+ timer_tasks_.emplace(std::move(*info.get()));
+ if (need_to_schedule_timers) {
+ CancelTimers();
+ ScheduleNextTimer();
}
break;
}
case WM_TIMER: {
+ RTC_DCHECK_EQ(timer_id_, msg.wParam);
::KillTimer(nullptr, msg.wParam);
- auto found = delayed_tasks->find(msg.wParam);
- RTC_DCHECK(found != delayed_tasks->end());
- if (!found->second->Run())
- found->second.release();
- delayed_tasks->erase(found);
+ timer_id_ = 0;
+ RunDueTasks();
+ ScheduleNextTimer();
break;
}
default:
@@ -392,4 +343,36 @@ bool TaskQueue::ProcessQueuedMessages(DelayedTasks* delayed_tasks,
return msg.message != WM_QUIT;
}
+void TaskQueue::ThreadState::RunDueTasks() {
+ RTC_DCHECK(!timer_tasks_.empty());
+ auto now = GetTick();
+ do {
+ const auto& top = timer_tasks_.top();
+ if (top.due_time() > now)
+ break;
+ top.Run();
+ timer_tasks_.pop();
+ } while (!timer_tasks_.empty());
+}
+
+void TaskQueue::ThreadState::ScheduleNextTimer() {
+ RTC_DCHECK_EQ(timer_id_, 0);
+ if (timer_tasks_.empty())
+ return;
+
+ const auto& next_task = timer_tasks_.top();
+ int64_t delay_ms = std::max(0ll, next_task.due_time() - GetTick());
+ uint32_t milliseconds = rtc::dchecked_cast<uint32_t>(delay_ms);
+ if (!timer_.StartOneShotTimer(milliseconds))
+ timer_id_ = ::SetTimer(nullptr, 0, milliseconds, nullptr);
+}
+
+void TaskQueue::ThreadState::CancelTimers() {
+ timer_.Cancel();
+ if (timer_id_) {
+ ::KillTimer(nullptr, timer_id_);
+ timer_id_ = 0;
+ }
+}
+
} // namespace rtc
« no previous file with comments | « webrtc/base/task_queue_unittest.cc ('k') | no next file » | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698