Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(207)

Side by Side Diff: webrtc/base/task_queue.h

Issue 1919733002: New task queueing primitive for async tasks: TaskQueue. (Closed) Base URL: https://chromium.googlesource.com/external/webrtc.git@master
Patch Set: Fix variable destruction order in PostALot Created 4 years, 7 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 /*
2 * Copyright 2016 The WebRTC Project Authors. All rights reserved.
3 *
4 * Use of this source code is governed by a BSD-style license
5 * that can be found in the LICENSE file in the root of the source
6 * tree. An additional intellectual property rights grant can be found
7 * in the file PATENTS. All contributing project authors may
8 * be found in the AUTHORS file in the root of the source tree.
9 */
10
11 #ifndef WEBRTC_BASE_TASK_QUEUE_H_
12 #define WEBRTC_BASE_TASK_QUEUE_H_
13
14 #if defined(WEBRTC_POSIX) && !defined(WEBRTC_MAC)
15 #define LIBEVENT_TASK_QUEUE
16 #endif
17
18 #include <list>
19 #include <memory>
20
21 #if defined(WEBRTC_MAC)
22 #include <dispatch/dispatch.h>
23 #endif
24
25 #include "webrtc/base/constructormagic.h"
26 #include "webrtc/base/criticalsection.h"
27
28 #if !defined(WEBRTC_MAC)
29 #include "webrtc/base/platform_thread.h"
30 #endif
31
32 #if defined(LIBEVENT_TASK_QUEUE)
33 struct event_base;
34 struct event;
35 #endif
36
37 namespace rtc {
38
39 class QueuedTask {
40 public:
41 QueuedTask() {}
42 virtual ~QueuedTask() {}
43
44 // Main routine that will run when the task is executed on the desired queue.
45 // The task should return |true| to indicate that it should be deleted or
46 // |false| to indicate that the queue should consider ownership of the task
47 // having been transferred. Returning |false| can be useful if a task has
48 // re-posted itself to a different queue or is otherwise being re-used.
49 virtual bool Run() = 0;
50
51 private:
52 RTC_DISALLOW_COPY_AND_ASSIGN(QueuedTask);
53 };
54
55 // Simple implementation of QueuedTask for use with rtc::Bind and lambdas.
56 template <class Closure>
57 class ClosureTask : public QueuedTask {
58 public:
59 explicit ClosureTask(const Closure& closure) : closure_(closure) {}
60
61 private:
62 bool Run() override {
63 closure_();
64 return true;
65 }
66
67 Closure closure_;
68 };
69
70 // Implements a task queue that asynchronously executes tasks in a way that
71 // guarantees that they're executed in FIFO order and tasks never overlap.
72 // Tasks may always execute on the same worker thread and they may not.
73 // To DCHECK that tasks are executing on a known task queue, use IsCurrent().
74 class TaskQueue {
perkj_webrtc 2016/04/26 14:30:37 please add usage exampled in the description.
tommi 2016/04/28 12:04:01 Done.
75 public:
76 explicit TaskQueue(const char* queue_name);
77 // TODO(tommi): Implement move semantics?
78 ~TaskQueue();
79
80 static TaskQueue* Current();
81
82 // Used for DCHECKing the current queue.
83 static bool IsCurrent(const char* queue_name);
84 bool IsCurrent() const;
85
86 // TODO(tommi): For better debuggability, implement FROM_HERE.
87
88 // Ownership of the task is passed to PostTask.
89 void PostTask(std::unique_ptr<QueuedTask> task);
90 // TODO(tommi): Should we expose this variant publicly
91 // (or only the other one)?
92 void PostTaskAndReply(std::unique_ptr<QueuedTask> task,
perkj_webrtc 2016/04/26 14:30:37 I vote for only the simple one until it is needed.
tommi 2016/04/28 12:04:01 Done.
93 std::unique_ptr<QueuedTask> reply,
94 TaskQueue* reply_queue);
95 void PostTaskAndReply(std::unique_ptr<QueuedTask> task,
96 std::unique_ptr<QueuedTask> reply);
97
98 void PostDelayedTask(std::unique_ptr<QueuedTask> task, uint32_t milliseconds);
99
100 template <class Closure>
101 void PostTask(const Closure& closure) {
102 PostTask(std::unique_ptr<QueuedTask>(new ClosureTask<Closure>(closure)));
103 }
104
105 template <class Closure>
106 void PostDelayedTask(const Closure& closure, uint32_t milliseconds) {
107 PostDelayedTask(
108 std::unique_ptr<QueuedTask>(new ClosureTask<Closure>(closure)),
109 milliseconds);
110 }
111
112 template <class Closure1, class Closure2>
113 void PostTaskAndReply(const Closure1& task,
114 const Closure2& reply,
115 TaskQueue* reply_queue) {
116 PostTaskAndReply(
117 std::unique_ptr<QueuedTask>(new ClosureTask<Closure1>(task)),
118 std::unique_ptr<QueuedTask>(new ClosureTask<Closure2>(reply)),
119 reply_queue);
120 }
121
122 template <class Closure1, class Closure2>
123 void PostTaskAndReply(const Closure1& task, const Closure2& reply) {
124 PostTaskAndReply(
125 std::unique_ptr<QueuedTask>(new ClosureTask<Closure1>(task)),
126 std::unique_ptr<QueuedTask>(new ClosureTask<Closure2>(reply)));
127 }
128
129 private:
130 #if defined(LIBEVENT_TASK_QUEUE)
131 static bool ThreadMain(void* context);
132 static void OnWakeup(int socket, short flags, void* context); // NOLINT
133 static void RunTask(int fd, short flags, void* context); // NOLINT
134 static void RunTimer(int fd, short flags, void* context); // NOLINT
135
136 class PostAndReplyTask;
137 class SetTimerTask;
138
139 void PrepareReplyTask(PostAndReplyTask* reply_task);
140 void ReplyTaskDone(PostAndReplyTask* reply_task);
141
142 struct QueueContext;
143
144 int wakeup_pipe_in_ = -1;
145 int wakeup_pipe_out_ = -1;
146 event_base* event_base_;
147 std::unique_ptr<event> wakeup_event_;
148 PlatformThread thread_;
149 rtc::CriticalSection pending_lock_;
150 std::list<std::unique_ptr<QueuedTask>> pending_ GUARDED_BY(pending_lock_);
151 std::list<PostAndReplyTask*> pending_replies_ GUARDED_BY(pending_lock_);
152 #elif defined(WEBRTC_MAC)
153 struct QueueContext;
154 dispatch_queue_t queue_;
155 QueueContext* const context_;
156 #elif defined(WEBRTC_WIN)
157 static bool ThreadMain(void* context);
158
159 class WorkerThread : public PlatformThread {
160 public:
161 WorkerThread(ThreadRunFunction func, void* obj, const char* thread_name)
162 : PlatformThread(func, obj, thread_name) {}
163
164 bool QueueAPC(PAPCFUNC apc_function, ULONG_PTR data) {
165 return PlatformThread::QueueAPC(apc_function, data);
166 }
167 };
168 WorkerThread thread_;
169 #else
170 #error not supported.
171 #endif
172
173 RTC_DISALLOW_COPY_AND_ASSIGN(TaskQueue);
174 };
175
176 } // namespace rtc
177
178 #endif // WEBRTC_BASE_TASK_QUEUE_H_
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698