OLD | NEW |
| (Empty) |
1 /* | |
2 * Copyright 2016 The WebRTC Project Authors. All rights reserved. | |
3 * | |
4 * Use of this source code is governed by a BSD-style license | |
5 * that can be found in the LICENSE file in the root of the source | |
6 * tree. An additional intellectual property rights grant can be found | |
7 * in the file PATENTS. All contributing project authors may | |
8 * be found in the AUTHORS file in the root of the source tree. | |
9 */ | |
10 | |
11 #ifndef WEBRTC_BASE_TASK_QUEUE_H_ | |
12 #define WEBRTC_BASE_TASK_QUEUE_H_ | |
13 | |
14 #include <list> | |
15 #include <memory> | |
16 | |
17 #if defined(WEBRTC_MAC) && !defined(WEBRTC_BUILD_LIBEVENT) | |
18 #include <dispatch/dispatch.h> | |
19 #endif | |
20 | |
21 #include "webrtc/base/constructormagic.h" | |
22 #include "webrtc/base/criticalsection.h" | |
23 | |
24 #if defined(WEBRTC_WIN) || defined(WEBRTC_BUILD_LIBEVENT) | |
25 #include "webrtc/base/platform_thread.h" | |
26 #endif | |
27 | |
28 #if defined(WEBRTC_BUILD_LIBEVENT) | |
29 struct event_base; | |
30 struct event; | |
31 #endif | |
32 | |
33 namespace rtc { | |
34 | |
35 // Base interface for asynchronously executed tasks. | |
36 // The interface basically consists of a single function, Run(), that executes | |
37 // on the target queue. For more details see the Run() method and TaskQueue. | |
38 class QueuedTask { | |
39 public: | |
40 QueuedTask() {} | |
41 virtual ~QueuedTask() {} | |
42 | |
43 // Main routine that will run when the task is executed on the desired queue. | |
44 // The task should return |true| to indicate that it should be deleted or | |
45 // |false| to indicate that the queue should consider ownership of the task | |
46 // having been transferred. Returning |false| can be useful if a task has | |
47 // re-posted itself to a different queue or is otherwise being re-used. | |
48 virtual bool Run() = 0; | |
49 | |
50 private: | |
51 RTC_DISALLOW_COPY_AND_ASSIGN(QueuedTask); | |
52 }; | |
53 | |
54 // Simple implementation of QueuedTask for use with rtc::Bind and lambdas. | |
55 template <class Closure> | |
56 class ClosureTask : public QueuedTask { | |
57 public: | |
58 explicit ClosureTask(const Closure& closure) : closure_(closure) {} | |
59 | |
60 private: | |
61 bool Run() override { | |
62 closure_(); | |
63 return true; | |
64 } | |
65 | |
66 Closure closure_; | |
67 }; | |
68 | |
69 // Extends ClosureTask to also allow specifying cleanup code. | |
70 // This is useful when using lambdas if guaranteeing cleanup, even if a task | |
71 // was dropped (queue is too full), is required. | |
72 template <class Closure, class Cleanup> | |
73 class ClosureTaskWithCleanup : public ClosureTask<Closure> { | |
74 public: | |
75 ClosureTaskWithCleanup(const Closure& closure, Cleanup cleanup) | |
76 : ClosureTask<Closure>(closure), cleanup_(cleanup) {} | |
77 ~ClosureTaskWithCleanup() { cleanup_(); } | |
78 | |
79 private: | |
80 Cleanup cleanup_; | |
81 }; | |
82 | |
83 // Convenience function to construct closures that can be passed directly | |
84 // to methods that support std::unique_ptr<QueuedTask> but not template | |
85 // based parameters. | |
86 template <class Closure> | |
87 static std::unique_ptr<QueuedTask> NewClosure(const Closure& closure) { | |
88 return std::unique_ptr<QueuedTask>(new ClosureTask<Closure>(closure)); | |
89 } | |
90 | |
91 template <class Closure, class Cleanup> | |
92 static std::unique_ptr<QueuedTask> NewClosure(const Closure& closure, | |
93 const Cleanup& cleanup) { | |
94 return std::unique_ptr<QueuedTask>( | |
95 new ClosureTaskWithCleanup<Closure, Cleanup>(closure, cleanup)); | |
96 } | |
97 | |
98 // Implements a task queue that asynchronously executes tasks in a way that | |
99 // guarantees that they're executed in FIFO order and that tasks never overlap. | |
100 // Tasks may always execute on the same worker thread and they may not. | |
101 // To DCHECK that tasks are executing on a known task queue, use IsCurrent(). | |
102 // | |
103 // Here are some usage examples: | |
104 // | |
105 // 1) Asynchronously running a lambda: | |
106 // | |
107 // class MyClass { | |
108 // ... | |
109 // TaskQueue queue_("MyQueue"); | |
110 // }; | |
111 // | |
112 // void MyClass::StartWork() { | |
113 // queue_.PostTask([]() { Work(); }); | |
114 // ... | |
115 // | |
116 // 2) Doing work asynchronously on a worker queue and providing a notification | |
117 // callback on the current queue, when the work has been done: | |
118 // | |
119 // void MyClass::StartWorkAndLetMeKnowWhenDone( | |
120 // std::unique_ptr<QueuedTask> callback) { | |
121 // DCHECK(TaskQueue::Current()) << "Need to be running on a queue"; | |
122 // queue_.PostTaskAndReply([]() { Work(); }, std::move(callback)); | |
123 // } | |
124 // ... | |
125 // my_class->StartWorkAndLetMeKnowWhenDone( | |
126 // NewClosure([]() { LOG(INFO) << "The work is done!";})); | |
127 // | |
128 // 3) Posting a custom task on a timer. The task posts itself again after | |
129 // every running: | |
130 // | |
131 // class TimerTask : public QueuedTask { | |
132 // public: | |
133 // TimerTask() {} | |
134 // private: | |
135 // bool Run() override { | |
136 // ++count_; | |
137 // TaskQueue::Current()->PostDelayedTask( | |
138 // std::unique_ptr<QueuedTask>(this), 1000); | |
139 // // Ownership has been transferred to the next occurance, | |
140 // // so return false to prevent from being deleted now. | |
141 // return false; | |
142 // } | |
143 // int count_ = 0; | |
144 // }; | |
145 // ... | |
146 // queue_.PostDelayedTask( | |
147 // std::unique_ptr<QueuedTask>(new TimerTask()), 1000); | |
148 // | |
149 // For more examples, see task_queue_unittests.cc. | |
150 // | |
151 // A note on destruction: | |
152 // | |
153 // When a TaskQueue is deleted, pending tasks will not be executed but they will | |
154 // be deleted. The deletion of tasks may happen asynchronously after the | |
155 // TaskQueue itself has been deleted or it may happen synchronously while the | |
156 // TaskQueue instance is being deleted. This may vary from one OS to the next | |
157 // so assumptions about lifetimes of pending tasks should not be made. | |
158 class TaskQueue { | |
159 public: | |
160 explicit TaskQueue(const char* queue_name); | |
161 // TODO(tommi): Implement move semantics? | |
162 ~TaskQueue(); | |
163 | |
164 static TaskQueue* Current(); | |
165 | |
166 // Used for DCHECKing the current queue. | |
167 static bool IsCurrent(const char* queue_name); | |
168 bool IsCurrent() const; | |
169 | |
170 // TODO(tommi): For better debuggability, implement FROM_HERE. | |
171 | |
172 // Ownership of the task is passed to PostTask. | |
173 void PostTask(std::unique_ptr<QueuedTask> task); | |
174 void PostTaskAndReply(std::unique_ptr<QueuedTask> task, | |
175 std::unique_ptr<QueuedTask> reply, | |
176 TaskQueue* reply_queue); | |
177 void PostTaskAndReply(std::unique_ptr<QueuedTask> task, | |
178 std::unique_ptr<QueuedTask> reply); | |
179 | |
180 void PostDelayedTask(std::unique_ptr<QueuedTask> task, uint32_t milliseconds); | |
181 | |
182 template <class Closure> | |
183 void PostTask(const Closure& closure) { | |
184 PostTask(std::unique_ptr<QueuedTask>(new ClosureTask<Closure>(closure))); | |
185 } | |
186 | |
187 template <class Closure> | |
188 void PostDelayedTask(const Closure& closure, uint32_t milliseconds) { | |
189 PostDelayedTask( | |
190 std::unique_ptr<QueuedTask>(new ClosureTask<Closure>(closure)), | |
191 milliseconds); | |
192 } | |
193 | |
194 template <class Closure1, class Closure2> | |
195 void PostTaskAndReply(const Closure1& task, | |
196 const Closure2& reply, | |
197 TaskQueue* reply_queue) { | |
198 PostTaskAndReply( | |
199 std::unique_ptr<QueuedTask>(new ClosureTask<Closure1>(task)), | |
200 std::unique_ptr<QueuedTask>(new ClosureTask<Closure2>(reply)), | |
201 reply_queue); | |
202 } | |
203 | |
204 template <class Closure> | |
205 void PostTaskAndReply(std::unique_ptr<QueuedTask> task, | |
206 const Closure& reply) { | |
207 PostTaskAndReply(std::move(task), std::unique_ptr<QueuedTask>( | |
208 new ClosureTask<Closure>(reply))); | |
209 } | |
210 | |
211 template <class Closure> | |
212 void PostTaskAndReply(const Closure& task, | |
213 std::unique_ptr<QueuedTask> reply) { | |
214 PostTaskAndReply( | |
215 std::unique_ptr<QueuedTask>(new ClosureTask<Closure>(task)), | |
216 std::move(reply)); | |
217 } | |
218 | |
219 template <class Closure1, class Closure2> | |
220 void PostTaskAndReply(const Closure1& task, const Closure2& reply) { | |
221 PostTaskAndReply( | |
222 std::unique_ptr<QueuedTask>(new ClosureTask<Closure1>(task)), | |
223 std::unique_ptr<QueuedTask>(new ClosureTask<Closure2>(reply))); | |
224 } | |
225 | |
226 private: | |
227 #if defined(WEBRTC_BUILD_LIBEVENT) | |
228 static bool ThreadMain(void* context); | |
229 static void OnWakeup(int socket, short flags, void* context); // NOLINT | |
230 static void RunTask(int fd, short flags, void* context); // NOLINT | |
231 static void RunTimer(int fd, short flags, void* context); // NOLINT | |
232 | |
233 class PostAndReplyTask; | |
234 class SetTimerTask; | |
235 | |
236 void PrepareReplyTask(PostAndReplyTask* reply_task); | |
237 void ReplyTaskDone(PostAndReplyTask* reply_task); | |
238 | |
239 struct QueueContext; | |
240 | |
241 int wakeup_pipe_in_ = -1; | |
242 int wakeup_pipe_out_ = -1; | |
243 event_base* event_base_; | |
244 std::unique_ptr<event> wakeup_event_; | |
245 PlatformThread thread_; | |
246 rtc::CriticalSection pending_lock_; | |
247 std::list<std::unique_ptr<QueuedTask>> pending_ GUARDED_BY(pending_lock_); | |
248 std::list<PostAndReplyTask*> pending_replies_ GUARDED_BY(pending_lock_); | |
249 #elif defined(WEBRTC_MAC) | |
250 struct QueueContext; | |
251 struct TaskContext; | |
252 struct PostTaskAndReplyContext; | |
253 dispatch_queue_t queue_; | |
254 QueueContext* const context_; | |
255 #elif defined(WEBRTC_WIN) | |
256 static bool ThreadMain(void* context); | |
257 | |
258 class WorkerThread : public PlatformThread { | |
259 public: | |
260 WorkerThread(ThreadRunFunction func, void* obj, const char* thread_name) | |
261 : PlatformThread(func, obj, thread_name) {} | |
262 | |
263 bool QueueAPC(PAPCFUNC apc_function, ULONG_PTR data) { | |
264 return PlatformThread::QueueAPC(apc_function, data); | |
265 } | |
266 }; | |
267 WorkerThread thread_; | |
268 #else | |
269 #error not supported. | |
270 #endif | |
271 | |
272 RTC_DISALLOW_COPY_AND_ASSIGN(TaskQueue); | |
273 }; | |
274 | |
275 } // namespace rtc | |
276 | |
277 #endif // WEBRTC_BASE_TASK_QUEUE_H_ | |
OLD | NEW |