| OLD | NEW |
| 1 /* | 1 /* |
| 2 * Copyright 2016 The WebRTC Project Authors. All rights reserved. | 2 * Copyright 2016 The WebRTC Project Authors. All rights reserved. |
| 3 * | 3 * |
| 4 * Use of this source code is governed by a BSD-style license | 4 * Use of this source code is governed by a BSD-style license |
| 5 * that can be found in the LICENSE file in the root of the source | 5 * that can be found in the LICENSE file in the root of the source |
| 6 * tree. An additional intellectual property rights grant can be found | 6 * tree. An additional intellectual property rights grant can be found |
| 7 * in the file PATENTS. All contributing project authors may | 7 * in the file PATENTS. All contributing project authors may |
| 8 * be found in the AUTHORS file in the root of the source tree. | 8 * be found in the AUTHORS file in the root of the source tree. |
| 9 */ | 9 */ |
| 10 | 10 |
| 11 #include "webrtc/base/task_queue.h" | 11 #include "webrtc/base/task_queue.h" |
| 12 | 12 |
| 13 #include <mmsystem.h> | 13 #include <mmsystem.h> |
| 14 #include <string.h> | 14 #include <string.h> |
| 15 | 15 |
| 16 #include <algorithm> | 16 #include <algorithm> |
| 17 #include <queue> | 17 #include <queue> |
| 18 | 18 |
| 19 #include "webrtc/base/arraysize.h" |
| 19 #include "webrtc/base/checks.h" | 20 #include "webrtc/base/checks.h" |
| 20 #include "webrtc/base/logging.h" | 21 #include "webrtc/base/logging.h" |
| 21 #include "webrtc/base/safe_conversions.h" | 22 #include "webrtc/base/safe_conversions.h" |
| 22 #include "webrtc/base/timeutils.h" | 23 #include "webrtc/base/timeutils.h" |
| 23 | 24 |
| 24 namespace rtc { | 25 namespace rtc { |
| 25 namespace { | 26 namespace { |
| 26 #define WM_RUN_TASK WM_USER + 1 | 27 #define WM_RUN_TASK WM_USER + 1 |
| 27 #define WM_QUEUE_DELAYED_TASK WM_USER + 2 | 28 #define WM_QUEUE_DELAYED_TASK WM_USER + 2 |
| 28 | 29 |
| (...skipping 81 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 110 // There are two basic workarounds, one using const_cast, which would also | 111 // There are two basic workarounds, one using const_cast, which would also |
| 111 // make the key (|due_time|), non-const and the other is to make the non-key | 112 // make the key (|due_time|), non-const and the other is to make the non-key |
| 112 // (|task|), mutable. | 113 // (|task|), mutable. |
| 113 // Because of this, the |task| variable is made private and can only be | 114 // Because of this, the |task| variable is made private and can only be |
| 114 // mutated by calling the |Run()| method. | 115 // mutated by calling the |Run()| method. |
| 115 mutable std::unique_ptr<QueuedTask> task_; | 116 mutable std::unique_ptr<QueuedTask> task_; |
| 116 }; | 117 }; |
| 117 | 118 |
| 118 class MultimediaTimer { | 119 class MultimediaTimer { |
| 119 public: | 120 public: |
| 120 MultimediaTimer() : event_(::CreateEvent(nullptr, false, false, nullptr)) {} | 121 // Note: We create an event that requires manual reset. |
| 122 MultimediaTimer() : event_(::CreateEvent(nullptr, true, false, nullptr)) {} |
| 121 | 123 |
| 122 ~MultimediaTimer() { | 124 ~MultimediaTimer() { |
| 123 Cancel(); | 125 Cancel(); |
| 124 ::CloseHandle(event_); | 126 ::CloseHandle(event_); |
| 125 } | 127 } |
| 126 | 128 |
| 127 bool StartOneShotTimer(UINT delay_ms) { | 129 bool StartOneShotTimer(UINT delay_ms) { |
| 128 RTC_DCHECK_EQ(0, timer_id_); | 130 RTC_DCHECK_EQ(0, timer_id_); |
| 129 RTC_DCHECK(event_ != nullptr); | 131 RTC_DCHECK(event_ != nullptr); |
| 130 timer_id_ = | 132 timer_id_ = |
| 131 ::timeSetEvent(delay_ms, 0, reinterpret_cast<LPTIMECALLBACK>(event_), 0, | 133 ::timeSetEvent(delay_ms, 0, reinterpret_cast<LPTIMECALLBACK>(event_), 0, |
| 132 TIME_ONESHOT | TIME_CALLBACK_EVENT_SET); | 134 TIME_ONESHOT | TIME_CALLBACK_EVENT_SET); |
| 133 return timer_id_ != 0; | 135 return timer_id_ != 0; |
| 134 } | 136 } |
| 135 | 137 |
| 136 void Cancel() { | 138 void Cancel() { |
| 139 ::ResetEvent(event_); |
| 137 if (timer_id_) { | 140 if (timer_id_) { |
| 138 ::timeKillEvent(timer_id_); | 141 ::timeKillEvent(timer_id_); |
| 139 timer_id_ = 0; | 142 timer_id_ = 0; |
| 140 } | 143 } |
| 141 } | 144 } |
| 142 | 145 |
| 143 HANDLE* event_for_wait() { return &event_; } | 146 HANDLE* event_for_wait() { return &event_; } |
| 144 | 147 |
| 145 private: | 148 private: |
| 146 HANDLE event_ = nullptr; | 149 HANDLE event_ = nullptr; |
| 147 MMRESULT timer_id_ = 0; | 150 MMRESULT timer_id_ = 0; |
| 148 | 151 |
| 149 RTC_DISALLOW_COPY_AND_ASSIGN(MultimediaTimer); | 152 RTC_DISALLOW_COPY_AND_ASSIGN(MultimediaTimer); |
| 150 }; | 153 }; |
| 151 | 154 |
| 152 } // namespace | 155 } // namespace |
| 153 | 156 |
| 154 class TaskQueue::ThreadState { | 157 class TaskQueue::ThreadState { |
| 155 public: | 158 public: |
| 156 ThreadState() {} | 159 explicit ThreadState(HANDLE in_queue) : in_queue_(in_queue) {} |
| 157 ~ThreadState() {} | 160 ~ThreadState() {} |
| 158 | 161 |
| 159 void RunThreadMain(); | 162 void RunThreadMain(); |
| 160 | 163 |
| 161 private: | 164 private: |
| 162 bool ProcessQueuedMessages(); | 165 bool ProcessQueuedMessages(); |
| 163 void RunDueTasks(); | 166 void RunDueTasks(); |
| 164 void ScheduleNextTimer(); | 167 void ScheduleNextTimer(); |
| 165 void CancelTimers(); | 168 void CancelTimers(); |
| 166 | 169 |
| 167 // Since priority_queue<> by defult orders items in terms of | 170 // Since priority_queue<> by defult orders items in terms of |
| 168 // largest->smallest, using std::less<>, and we want smallest->largest, | 171 // largest->smallest, using std::less<>, and we want smallest->largest, |
| 169 // we would like to use std::greater<> here. Alas it's only available in | 172 // we would like to use std::greater<> here. Alas it's only available in |
| 170 // C++14 and later, so we roll our own compare template that that relies on | 173 // C++14 and later, so we roll our own compare template that that relies on |
| 171 // operator<(). | 174 // operator<(). |
| 172 template <typename T> | 175 template <typename T> |
| 173 struct greater { | 176 struct greater { |
| 174 bool operator()(const T& l, const T& r) { return l > r; } | 177 bool operator()(const T& l, const T& r) { return l > r; } |
| 175 }; | 178 }; |
| 176 | 179 |
| 177 MultimediaTimer timer_; | 180 MultimediaTimer timer_; |
| 178 std::priority_queue<DelayedTaskInfo, | 181 std::priority_queue<DelayedTaskInfo, |
| 179 std::vector<DelayedTaskInfo>, | 182 std::vector<DelayedTaskInfo>, |
| 180 greater<DelayedTaskInfo>> | 183 greater<DelayedTaskInfo>> |
| 181 timer_tasks_; | 184 timer_tasks_; |
| 182 UINT_PTR timer_id_ = 0; | 185 UINT_PTR timer_id_ = 0; |
| 186 HANDLE in_queue_; |
| 183 }; | 187 }; |
| 184 | 188 |
| 185 TaskQueue::TaskQueue(const char* queue_name, Priority priority /*= NORMAL*/) | 189 TaskQueue::TaskQueue(const char* queue_name, Priority priority /*= NORMAL*/) |
| 186 : thread_(&TaskQueue::ThreadMain, | 190 : thread_(&TaskQueue::ThreadMain, |
| 187 this, | 191 this, |
| 188 queue_name, | 192 queue_name, |
| 189 TaskQueuePriorityToThreadPriority(priority)) { | 193 TaskQueuePriorityToThreadPriority(priority)), |
| 194 in_queue_(::CreateEvent(nullptr, true, false, nullptr)) { |
| 190 RTC_DCHECK(queue_name); | 195 RTC_DCHECK(queue_name); |
| 196 RTC_DCHECK(in_queue_); |
| 191 thread_.Start(); | 197 thread_.Start(); |
| 192 Event event(false, false); | 198 Event event(false, false); |
| 193 ThreadStartupData startup = {&event, this}; | 199 ThreadStartupData startup = {&event, this}; |
| 194 RTC_CHECK(thread_.QueueAPC(&InitializeQueueThread, | 200 RTC_CHECK(thread_.QueueAPC(&InitializeQueueThread, |
| 195 reinterpret_cast<ULONG_PTR>(&startup))); | 201 reinterpret_cast<ULONG_PTR>(&startup))); |
| 196 event.Wait(Event::kForever); | 202 event.Wait(Event::kForever); |
| 197 } | 203 } |
| 198 | 204 |
| 199 TaskQueue::~TaskQueue() { | 205 TaskQueue::~TaskQueue() { |
| 200 RTC_DCHECK(!IsCurrent()); | 206 RTC_DCHECK(!IsCurrent()); |
| 201 while (!::PostThreadMessage(thread_.GetThreadRef(), WM_QUIT, 0, 0)) { | 207 while (!::PostThreadMessage(thread_.GetThreadRef(), WM_QUIT, 0, 0)) { |
| 202 RTC_CHECK_EQ(ERROR_NOT_ENOUGH_QUOTA, ::GetLastError()); | 208 RTC_CHECK_EQ(ERROR_NOT_ENOUGH_QUOTA, ::GetLastError()); |
| 203 Sleep(1); | 209 Sleep(1); |
| 204 } | 210 } |
| 205 thread_.Stop(); | 211 thread_.Stop(); |
| 212 ::CloseHandle(in_queue_); |
| 206 } | 213 } |
| 207 | 214 |
| 208 // static | 215 // static |
| 209 TaskQueue* TaskQueue::Current() { | 216 TaskQueue* TaskQueue::Current() { |
| 210 return static_cast<TaskQueue*>(::TlsGetValue(GetQueuePtrTls())); | 217 return static_cast<TaskQueue*>(::TlsGetValue(GetQueuePtrTls())); |
| 211 } | 218 } |
| 212 | 219 |
| 213 // static | 220 // static |
| 214 bool TaskQueue::IsCurrent(const char* queue_name) { | 221 bool TaskQueue::IsCurrent(const char* queue_name) { |
| 215 TaskQueue* current = Current(); | 222 TaskQueue* current = Current(); |
| 216 return current && current->thread_.name().compare(queue_name) == 0; | 223 return current && current->thread_.name().compare(queue_name) == 0; |
| 217 } | 224 } |
| 218 | 225 |
| 219 bool TaskQueue::IsCurrent() const { | 226 bool TaskQueue::IsCurrent() const { |
| 220 return IsThreadRefEqual(thread_.GetThreadRef(), CurrentThreadRef()); | 227 return IsThreadRefEqual(thread_.GetThreadRef(), CurrentThreadRef()); |
| 221 } | 228 } |
| 222 | 229 |
| 223 void TaskQueue::PostTask(std::unique_ptr<QueuedTask> task) { | 230 void TaskQueue::PostTask(std::unique_ptr<QueuedTask> task) { |
| 224 if (::PostThreadMessage(thread_.GetThreadRef(), WM_RUN_TASK, 0, | 231 rtc::CritScope lock(&pending_lock_); |
| 225 reinterpret_cast<LPARAM>(task.get()))) { | 232 pending_.push(std::move(task)); |
| 226 task.release(); | 233 ::SetEvent(in_queue_); |
| 227 } | |
| 228 } | 234 } |
| 229 | 235 |
| 230 void TaskQueue::PostDelayedTask(std::unique_ptr<QueuedTask> task, | 236 void TaskQueue::PostDelayedTask(std::unique_ptr<QueuedTask> task, |
| 231 uint32_t milliseconds) { | 237 uint32_t milliseconds) { |
| 232 if (!milliseconds) { | 238 if (!milliseconds) { |
| 233 PostTask(std::move(task)); | 239 PostTask(std::move(task)); |
| 234 return; | 240 return; |
| 235 } | 241 } |
| 236 | 242 |
| 237 // TODO(tommi): Avoid this allocation. It is currently here since | 243 // TODO(tommi): Avoid this allocation. It is currently here since |
| (...skipping 23 matching lines...) Expand all Loading... |
| 261 delete reply_task_ptr; | 267 delete reply_task_ptr; |
| 262 } | 268 } |
| 263 }); | 269 }); |
| 264 } | 270 } |
| 265 | 271 |
| 266 void TaskQueue::PostTaskAndReply(std::unique_ptr<QueuedTask> task, | 272 void TaskQueue::PostTaskAndReply(std::unique_ptr<QueuedTask> task, |
| 267 std::unique_ptr<QueuedTask> reply) { | 273 std::unique_ptr<QueuedTask> reply) { |
| 268 return PostTaskAndReply(std::move(task), std::move(reply), Current()); | 274 return PostTaskAndReply(std::move(task), std::move(reply), Current()); |
| 269 } | 275 } |
| 270 | 276 |
| 277 void TaskQueue::RunPendingTasks() { |
| 278 while (true) { |
| 279 std::unique_ptr<QueuedTask> task; |
| 280 { |
| 281 rtc::CritScope lock(&pending_lock_); |
| 282 if (pending_.empty()) |
| 283 break; |
| 284 task = std::move(pending_.front()); |
| 285 pending_.pop(); |
| 286 } |
| 287 |
| 288 if (!task->Run()) |
| 289 task.release(); |
| 290 } |
| 291 } |
| 292 |
| 271 // static | 293 // static |
| 272 void TaskQueue::ThreadMain(void* context) { | 294 void TaskQueue::ThreadMain(void* context) { |
| 273 ThreadState state; | 295 ThreadState state(static_cast<TaskQueue*>(context)->in_queue_); |
| 274 state.RunThreadMain(); | 296 state.RunThreadMain(); |
| 275 } | 297 } |
| 276 | 298 |
| 277 void TaskQueue::ThreadState::RunThreadMain() { | 299 void TaskQueue::ThreadState::RunThreadMain() { |
| 300 HANDLE handles[2] = { *timer_.event_for_wait(), in_queue_ }; |
| 278 while (true) { | 301 while (true) { |
| 279 // Make sure we do an alertable wait as that's required to allow APCs to run | 302 // Make sure we do an alertable wait as that's required to allow APCs to run |
| 280 // (e.g. required for InitializeQueueThread and stopping the thread in | 303 // (e.g. required for InitializeQueueThread and stopping the thread in |
| 281 // PlatformThread). | 304 // PlatformThread). |
| 282 DWORD result = ::MsgWaitForMultipleObjectsEx( | 305 DWORD result = ::MsgWaitForMultipleObjectsEx( |
| 283 1, timer_.event_for_wait(), INFINITE, QS_ALLEVENTS, MWMO_ALERTABLE); | 306 arraysize(handles), handles, INFINITE, QS_ALLEVENTS, MWMO_ALERTABLE); |
| 284 RTC_CHECK_NE(WAIT_FAILED, result); | 307 RTC_CHECK_NE(WAIT_FAILED, result); |
| 285 if (result == (WAIT_OBJECT_0 + 1)) { | 308 if (result == (WAIT_OBJECT_0 + 2)) { |
| 286 // There are messages in the message queue that need to be handled. | 309 // There are messages in the message queue that need to be handled. |
| 287 if (!ProcessQueuedMessages()) | 310 if (!ProcessQueuedMessages()) |
| 288 break; | 311 break; |
| 289 } else if (result == WAIT_OBJECT_0) { | 312 } |
| 313 |
| 314 if (result == WAIT_OBJECT_0 || (!timer_tasks_.empty() && |
| 315 ::WaitForSingleObject(*timer_.event_for_wait(), 0) == WAIT_OBJECT_0)) { |
| 290 // The multimedia timer was signaled. | 316 // The multimedia timer was signaled. |
| 291 timer_.Cancel(); | 317 timer_.Cancel(); |
| 292 RTC_DCHECK(!timer_tasks_.empty()); | |
| 293 RunDueTasks(); | 318 RunDueTasks(); |
| 294 ScheduleNextTimer(); | 319 ScheduleNextTimer(); |
| 295 } else { | 320 } |
| 296 RTC_DCHECK_EQ(WAIT_IO_COMPLETION, result); | 321 |
| 322 if (result == (WAIT_OBJECT_0 + 1)) { |
| 323 ::ResetEvent(in_queue_); |
| 324 TaskQueue::Current()->RunPendingTasks(); |
| 297 } | 325 } |
| 298 } | 326 } |
| 299 } | 327 } |
| 300 | 328 |
| 301 bool TaskQueue::ThreadState::ProcessQueuedMessages() { | 329 bool TaskQueue::ThreadState::ProcessQueuedMessages() { |
| 302 MSG msg = {}; | 330 MSG msg = {}; |
| 331 // To protect against overly busy message queues, we limit the time |
| 332 // we process tasks to a few milliseconds. If we don't do that, there's |
| 333 // a chance that timer tasks won't ever run. |
| 334 static const int kMaxTaskProcessingTimeMs = 500; |
| 335 auto start = GetTick(); |
| 303 while (::PeekMessage(&msg, nullptr, 0, 0, PM_REMOVE) && | 336 while (::PeekMessage(&msg, nullptr, 0, 0, PM_REMOVE) && |
| 304 msg.message != WM_QUIT) { | 337 msg.message != WM_QUIT) { |
| 305 if (!msg.hwnd) { | 338 if (!msg.hwnd) { |
| 306 switch (msg.message) { | 339 switch (msg.message) { |
| 340 // TODO(tommi): Stop using this way of queueing tasks. |
| 307 case WM_RUN_TASK: { | 341 case WM_RUN_TASK: { |
| 308 QueuedTask* task = reinterpret_cast<QueuedTask*>(msg.lParam); | 342 QueuedTask* task = reinterpret_cast<QueuedTask*>(msg.lParam); |
| 309 if (task->Run()) | 343 if (task->Run()) |
| 310 delete task; | 344 delete task; |
| 311 break; | 345 break; |
| 312 } | 346 } |
| 313 case WM_QUEUE_DELAYED_TASK: { | 347 case WM_QUEUE_DELAYED_TASK: { |
| 314 std::unique_ptr<DelayedTaskInfo> info( | 348 std::unique_ptr<DelayedTaskInfo> info( |
| 315 reinterpret_cast<DelayedTaskInfo*>(msg.lParam)); | 349 reinterpret_cast<DelayedTaskInfo*>(msg.lParam)); |
| 316 bool need_to_schedule_timers = | 350 bool need_to_schedule_timers = |
| (...skipping 15 matching lines...) Expand all Loading... |
| 332 break; | 366 break; |
| 333 } | 367 } |
| 334 default: | 368 default: |
| 335 RTC_NOTREACHED(); | 369 RTC_NOTREACHED(); |
| 336 break; | 370 break; |
| 337 } | 371 } |
| 338 } else { | 372 } else { |
| 339 ::TranslateMessage(&msg); | 373 ::TranslateMessage(&msg); |
| 340 ::DispatchMessage(&msg); | 374 ::DispatchMessage(&msg); |
| 341 } | 375 } |
| 376 |
| 377 if (GetTick() > start + kMaxTaskProcessingTimeMs) |
| 378 break; |
| 342 } | 379 } |
| 343 return msg.message != WM_QUIT; | 380 return msg.message != WM_QUIT; |
| 344 } | 381 } |
| 345 | 382 |
| 346 void TaskQueue::ThreadState::RunDueTasks() { | 383 void TaskQueue::ThreadState::RunDueTasks() { |
| 347 RTC_DCHECK(!timer_tasks_.empty()); | 384 RTC_DCHECK(!timer_tasks_.empty()); |
| 348 auto now = GetTick(); | 385 auto now = GetTick(); |
| 349 do { | 386 do { |
| 350 const auto& top = timer_tasks_.top(); | 387 const auto& top = timer_tasks_.top(); |
| 351 if (top.due_time() > now) | 388 if (top.due_time() > now) |
| (...skipping 17 matching lines...) Expand all Loading... |
| 369 | 406 |
| 370 void TaskQueue::ThreadState::CancelTimers() { | 407 void TaskQueue::ThreadState::CancelTimers() { |
| 371 timer_.Cancel(); | 408 timer_.Cancel(); |
| 372 if (timer_id_) { | 409 if (timer_id_) { |
| 373 ::KillTimer(nullptr, timer_id_); | 410 ::KillTimer(nullptr, timer_id_); |
| 374 timer_id_ = 0; | 411 timer_id_ = 0; |
| 375 } | 412 } |
| 376 } | 413 } |
| 377 | 414 |
| 378 } // namespace rtc | 415 } // namespace rtc |
| OLD | NEW |