Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(84)

Side by Side Diff: webrtc/base/task_queue_win.cc

Issue 1984503002: Reland of New task queueing primitive for async tasks: TaskQueue. (Closed) Base URL: https://chromium.googlesource.com/external/webrtc.git@master
Patch Set: Optional initialization for build_for Created 4 years, 7 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 /*
2 * Copyright 2016 The WebRTC Project Authors. All rights reserved.
3 *
4 * Use of this source code is governed by a BSD-style license
5 * that can be found in the LICENSE file in the root of the source
6 * tree. An additional intellectual property rights grant can be found
7 * in the file PATENTS. All contributing project authors may
8 * be found in the AUTHORS file in the root of the source tree.
9 */
10
11 #include "webrtc/base/task_queue.h"
12
13 #include <string.h>
14 #include <unordered_map>
15
16 #include "webrtc/base/checks.h"
17 #include "webrtc/base/logging.h"
18
19 namespace rtc {
20 namespace {
21 #define WM_RUN_TASK WM_USER + 1
22 #define WM_QUEUE_DELAYED_TASK WM_USER + 2
23
24 DWORD g_queue_ptr_tls = 0;
25
26 BOOL CALLBACK InitializeTls(PINIT_ONCE init_once, void* param, void** context) {
27 g_queue_ptr_tls = TlsAlloc();
28 return TRUE;
29 }
30
31 DWORD GetQueuePtrTls() {
32 static INIT_ONCE init_once = INIT_ONCE_STATIC_INIT;
33 InitOnceExecuteOnce(&init_once, InitializeTls, nullptr, nullptr);
34 return g_queue_ptr_tls;
35 }
36
37 struct ThreadStartupData {
38 Event* started;
39 void* thread_context;
40 };
41
42 void CALLBACK InitializeQueueThread(ULONG_PTR param) {
43 MSG msg;
44 PeekMessage(&msg, NULL, WM_USER, WM_USER, PM_NOREMOVE);
45 ThreadStartupData* data = reinterpret_cast<ThreadStartupData*>(param);
46 TlsSetValue(GetQueuePtrTls(), data->thread_context);
47 data->started->Set();
48 }
49 } // namespace
50
51 TaskQueue::TaskQueue(const char* queue_name)
52 : thread_(&TaskQueue::ThreadMain, this, queue_name) {
53 RTC_DCHECK(queue_name);
54 thread_.Start();
55 Event event(false, false);
56 ThreadStartupData startup = {&event, this};
57 RTC_CHECK(thread_.QueueAPC(&InitializeQueueThread,
58 reinterpret_cast<ULONG_PTR>(&startup)));
59 event.Wait(Event::kForever);
60 }
61
62 TaskQueue::~TaskQueue() {
63 RTC_DCHECK(!IsCurrent());
64 while (!PostThreadMessage(thread_.GetThreadRef(), WM_QUIT, 0, 0)) {
65 RTC_CHECK(ERROR_NOT_ENOUGH_QUOTA == ::GetLastError());
66 Sleep(1);
67 }
68 thread_.Stop();
69 }
70
71 // static
72 TaskQueue* TaskQueue::Current() {
73 return static_cast<TaskQueue*>(TlsGetValue(GetQueuePtrTls()));
74 }
75
76 // static
77 bool TaskQueue::IsCurrent(const char* queue_name) {
78 TaskQueue* current = Current();
79 return current && current->thread_.name().compare(queue_name) == 0;
80 }
81
82 bool TaskQueue::IsCurrent() const {
83 return IsThreadRefEqual(thread_.GetThreadRef(), CurrentThreadRef());
84 }
85
86 void TaskQueue::PostTask(std::unique_ptr<QueuedTask> task) {
87 if (PostThreadMessage(thread_.GetThreadRef(), WM_RUN_TASK, 0,
88 reinterpret_cast<LPARAM>(task.get()))) {
89 task.release();
90 }
91 }
92
93 void TaskQueue::PostDelayedTask(std::unique_ptr<QueuedTask> task,
94 uint32_t milliseconds) {
95 WPARAM wparam;
96 #if defined(_WIN64)
97 // GetTickCount() returns a fairly coarse tick count (resolution or about 8ms)
98 // so this compensation isn't that accurate, but since we have unused 32 bits
99 // on Win64, we might as well use them.
100 wparam = (static_cast<WPARAM>(::GetTickCount()) << 32) | milliseconds;
101 #else
102 wparam = milliseconds;
103 #endif
104 if (PostThreadMessage(thread_.GetThreadRef(), WM_QUEUE_DELAYED_TASK, wparam,
105 reinterpret_cast<LPARAM>(task.get()))) {
106 task.release();
107 }
108 }
109
110 void TaskQueue::PostTaskAndReply(std::unique_ptr<QueuedTask> task,
111 std::unique_ptr<QueuedTask> reply,
112 TaskQueue* reply_queue) {
113 QueuedTask* task_ptr = task.release();
114 QueuedTask* reply_task_ptr = reply.release();
115 DWORD reply_thread_id = reply_queue->thread_.GetThreadRef();
116 PostTask([task_ptr, reply_task_ptr, reply_thread_id]() {
117 if (task_ptr->Run())
118 delete task_ptr;
119 // If the thread's message queue is full, we can't queue the task and will
120 // have to drop it (i.e. delete).
121 if (!PostThreadMessage(reply_thread_id, WM_RUN_TASK, 0,
122 reinterpret_cast<LPARAM>(reply_task_ptr))) {
123 delete reply_task_ptr;
124 }
125 });
126 }
127
128 void TaskQueue::PostTaskAndReply(std::unique_ptr<QueuedTask> task,
129 std::unique_ptr<QueuedTask> reply) {
130 return PostTaskAndReply(std::move(task), std::move(reply), Current());
131 }
132
133 // static
134 bool TaskQueue::ThreadMain(void* context) {
135 std::unordered_map<UINT_PTR, std::unique_ptr<QueuedTask>> delayed_tasks;
136
137 BOOL ret;
138 MSG msg;
139
140 while ((ret = GetMessage(&msg, nullptr, 0, 0)) != 0 && ret != -1) {
141 if (!msg.hwnd) {
142 switch (msg.message) {
143 case WM_RUN_TASK: {
144 QueuedTask* task = reinterpret_cast<QueuedTask*>(msg.lParam);
145 if (task->Run())
146 delete task;
147 break;
148 }
149 case WM_QUEUE_DELAYED_TASK: {
150 QueuedTask* task = reinterpret_cast<QueuedTask*>(msg.lParam);
151 uint32_t milliseconds = msg.wParam & 0xFFFFFFFF;
152 #if defined(_WIN64)
153 // Subtract the time it took to queue the timer.
154 const DWORD now = GetTickCount();
155 DWORD post_time = now - (msg.wParam >> 32);
156 milliseconds =
157 post_time > milliseconds ? 0 : milliseconds - post_time;
158 #endif
159 UINT_PTR timer_id = SetTimer(nullptr, 0, milliseconds, nullptr);
160 delayed_tasks.insert(std::make_pair(timer_id, task));
161 break;
162 }
163 case WM_TIMER: {
164 KillTimer(nullptr, msg.wParam);
165 auto found = delayed_tasks.find(msg.wParam);
166 RTC_DCHECK(found != delayed_tasks.end());
167 if (!found->second->Run())
168 found->second.release();
169 delayed_tasks.erase(found);
170 break;
171 }
172 default:
173 RTC_NOTREACHED();
174 break;
175 }
176 } else {
177 TranslateMessage(&msg);
178 DispatchMessage(&msg);
179 }
180 }
181
182 return false;
183 }
184 } // namespace rtc
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698