Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(268)

Side by Side Diff: webrtc/base/task_queue_libevent.cc

Issue 1919733002: New task queueing primitive for async tasks: TaskQueue. (Closed) Base URL: https://chromium.googlesource.com/external/webrtc.git@master
Patch Set: Fix variable destruction order in PostALot Created 4 years, 7 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 /*
2 * Copyright 2016 The WebRTC Project Authors. All rights reserved.
3 *
4 * Use of this source code is governed by a BSD-style license
5 * that can be found in the LICENSE file in the root of the source
6 * tree. An additional intellectual property rights grant can be found
7 * in the file PATENTS. All contributing project authors may
8 * be found in the AUTHORS file in the root of the source tree.
9 */
10
11 #include "webrtc/base/task_queue.h"
12
13 #include <fcntl.h>
14 #include <string.h>
15 #include <unistd.h>
16
17 #include "base/third_party/libevent/event.h"
18 #include "webrtc/base/checks.h"
19 #include "webrtc/base/logging.h"
20 #include "webrtc/base/task_queue_posix.h"
21 #include "webrtc/base/timeutils.h"
22
23 namespace rtc {
24 using internal::GetQueuePtrTls;
25 using internal::AutoSetCurrentQueuePtr;
26
27 namespace {
28 static const char kQuit = 1;
29 static const char kRunTask = 2;
30
31 struct TimerEvent {
32 explicit TimerEvent(std::unique_ptr<QueuedTask> task)
33 : task(std::move(task)) {}
34 ~TimerEvent() { event_del(&ev); }
35 event ev;
36 std::unique_ptr<QueuedTask> task;
37 };
38
39 bool SetNonBlocking(int fd) {
40 const int flags = fcntl(fd, F_GETFL);
41 RTC_CHECK(flags != -1);
42 return (flags & O_NONBLOCK) || fcntl(fd, F_SETFL, flags | O_NONBLOCK) != -1;
43 }
44 } // namespace
45
46 struct TaskQueue::QueueContext {
47 explicit QueueContext(TaskQueue* q) : queue(q), is_active(true) {}
48 TaskQueue* queue;
49 bool is_active;
50 // Holds a list of events pending timers for cleanup when the loop exits.
51 std::list<TimerEvent*> pending_timers_;
52 };
53
54 class TaskQueue::PostAndReplyTask : public QueuedTask {
55 public:
56 PostAndReplyTask(std::unique_ptr<QueuedTask> task,
57 std::unique_ptr<QueuedTask> reply,
58 TaskQueue* reply_queue)
59 : task_(std::move(task)),
60 reply_(std::move(reply)),
61 reply_queue_(reply_queue) {
62 reply_queue->PrepareReplyTask(this);
63 }
64
65 ~PostAndReplyTask() override {
66 CritScope lock(&lock_);
67 if (reply_queue_)
68 reply_queue_->ReplyTaskDone(this);
69 }
70
71 void OnReplyQueueGone() {
72 CritScope lock(&lock_);
73 reply_queue_ = nullptr;
74 }
75
76 private:
77 bool Run() override {
78 if (!task_->Run())
79 task_.release();
80
81 CritScope lock(&lock_);
82 if (reply_queue_)
83 reply_queue_->PostTask(std::move(reply_));
84 return true;
85 }
86
87 CriticalSection lock_;
88 std::unique_ptr<QueuedTask> task_, reply_;
perkj_webrtc 2016/04/26 14:30:38 reply_ on separate line please
tommi 2016/04/28 12:04:02 Done.
89 TaskQueue* reply_queue_ GUARDED_BY(lock_);
90 };
91
92 class TaskQueue::SetTimerTask : public QueuedTask {
93 public:
94 SetTimerTask(std::unique_ptr<QueuedTask> task, uint32_t milliseconds)
95 : task_(std::move(task)),
96 milliseconds_(milliseconds),
97 posted_(Time32()) {}
98
99 private:
100 bool Run() override {
101 // Compensate for the time that has passed since construction
102 // and until we got here.
103 uint32_t post_time = Time32() - posted_;
perkj_webrtc 2016/04/26 14:30:38 Please check with nisse@ that we use the same cloc
tommi 2016/04/28 12:04:02 Yes, I'm following those changes. I might switch
104 TaskQueue::Current()->PostDelayedTask(
105 std::move(task_),
106 post_time > milliseconds_ ? 0 : milliseconds_ - post_time);
107 return true;
108 }
109
110 std::unique_ptr<QueuedTask> task_;
111 const uint32_t milliseconds_;
112 const uint32_t posted_;
113 };
114
115 TaskQueue::TaskQueue(const char* queue_name)
116 : event_base_(event_base_new()),
117 wakeup_event_(new event()),
118 thread_(&TaskQueue::ThreadMain, this, queue_name) {
119 RTC_DCHECK(queue_name);
120 int fds[2];
121 RTC_CHECK(pipe(fds) == 0);
122 SetNonBlocking(fds[0]);
123 SetNonBlocking(fds[1]);
124 wakeup_pipe_out_ = fds[0];
125 wakeup_pipe_in_ = fds[1];
126 event_set(wakeup_event_.get(), wakeup_pipe_out_, EV_READ | EV_PERSIST,
127 OnWakeup, this);
128 event_base_set(event_base_, wakeup_event_.get());
129 event_add(wakeup_event_.get(), 0);
130 thread_.Start();
131 }
132
133 TaskQueue::~TaskQueue() {
134 RTC_DCHECK(!IsCurrent());
135 struct timespec ts;
136 char message = kQuit;
137 while (write(wakeup_pipe_in_, &message, sizeof(message)) != sizeof(message)) {
138 // The queue is full, so we have no choice but to wait and retry.
139 RTC_CHECK_EQ(EAGAIN, errno);
140 ts.tv_sec = 0;
141 ts.tv_nsec = 1000000;
142 nanosleep(&ts, nullptr);
143 }
144
145 thread_.Stop();
146
147 event_del(wakeup_event_.get());
148 close(wakeup_pipe_in_);
149 close(wakeup_pipe_out_);
150 wakeup_pipe_in_ = -1;
151 wakeup_pipe_out_ = -1;
152
153 for (auto* reply : pending_replies_)
154 reply->OnReplyQueueGone();
155 pending_replies_.clear();
perkj_webrtc 2016/04/26 14:30:38 Who owns the PostAndReplyTasks?
tommi 2016/04/28 12:04:02 They are stored in |pending_| or, if they can't be
156
157 event_base_free(event_base_);
158 }
159
160 // static
161 TaskQueue* TaskQueue::Current() {
162 QueueContext* ctx =
163 static_cast<QueueContext*>(pthread_getspecific(GetQueuePtrTls()));
164 return ctx ? ctx->queue : nullptr;
165 }
166
167 // static
168 bool TaskQueue::IsCurrent(const char* queue_name) {
169 TaskQueue* current = Current();
170 return current && current->thread_.name().compare(queue_name) == 0;
171 }
172
173 bool TaskQueue::IsCurrent() const {
174 return IsThreadRefEqual(thread_.GetThreadRef(), CurrentThreadRef());
175 }
176
177 void TaskQueue::PostTask(std::unique_ptr<QueuedTask> task) {
178 RTC_DCHECK(task.get());
179 // libevent isn't thread safe. This means that we can't use methods such
perkj_webrtc 2016/04/26 14:30:38 ? Didn't you say you fixed that in libevent?
tommi 2016/04/28 12:04:02 I didn't make libevent thread safe but I did remov
180 // as event_base_once to post tasks to the worker thread from a different
181 // thread. However, we can use it when posting from the worker thread itself.
182 if (IsCurrent()) {
183 if (event_base_once(event_base_, -1, EV_TIMEOUT, &TaskQueue::RunTask,
184 task.get(), nullptr) == 0) {
185 task.release();
186 }
187 } else {
188 QueuedTask* task_id = task.get(); // Only used for comparison.
189 {
190 CritScope lock(&pending_lock_);
191 pending_.push_back(std::move(task));
192 }
193 char message = kRunTask;
194 if (write(wakeup_pipe_in_, &message, sizeof(message)) != sizeof(message)) {
195 LOG(WARNING) << "Failed to queue task.";
196 CritScope lock(&pending_lock_);
197 pending_.remove_if([task_id](std::unique_ptr<QueuedTask>& t) {
198 return t.get() == task_id;
199 });
200 }
201 }
202 }
203
204 void TaskQueue::PostDelayedTask(std::unique_ptr<QueuedTask> task,
205 uint32_t milliseconds) {
206 if (IsCurrent()) {
207 TimerEvent* timer = new TimerEvent(std::move(task));
208 evtimer_set(&timer->ev, &TaskQueue::RunTimer, timer);
209 event_base_set(event_base_, &timer->ev);
210 QueueContext* ctx =
211 static_cast<QueueContext*>(pthread_getspecific(GetQueuePtrTls()));
212 ctx->pending_timers_.push_back(timer);
213 timeval tv = {milliseconds / 1000, (milliseconds % 1000) * 1000};
214 event_add(&timer->ev, &tv);
215 } else {
216 PostTask(std::unique_ptr<QueuedTask>(
217 new SetTimerTask(std::move(task), milliseconds)));
218 }
219 }
220
221 void TaskQueue::PostTaskAndReply(std::unique_ptr<QueuedTask> task,
222 std::unique_ptr<QueuedTask> reply,
223 TaskQueue* reply_queue) {
224 std::unique_ptr<QueuedTask> wrapper_task(
225 new PostAndReplyTask(std::move(task), std::move(reply), reply_queue));
226 PostTask(std::move(wrapper_task));
tommi 2016/04/28 12:04:02 Here is where ownership of PostAndReplyTasks is ha
227 }
228
229 void TaskQueue::PostTaskAndReply(std::unique_ptr<QueuedTask> task,
230 std::unique_ptr<QueuedTask> reply) {
231 return PostTaskAndReply(std::move(task), std::move(reply), Current());
232 }
233
234 // static
235 bool TaskQueue::ThreadMain(void* context) {
236 TaskQueue* me = static_cast<TaskQueue*>(context);
237
238 QueueContext queue_context(me);
239 pthread_setspecific(GetQueuePtrTls(), &queue_context);
240
241 while (queue_context.is_active)
242 event_base_loop(me->event_base_, 0);
243
244 pthread_setspecific(GetQueuePtrTls(), nullptr);
245
246 for (TimerEvent* timer : queue_context.pending_timers_)
247 delete timer;
248
249 return false;
250 }
251
252 // static
253 void TaskQueue::OnWakeup(int socket, short flags, void* context) { // NOLINT
254 QueueContext* ctx =
255 static_cast<QueueContext*>(pthread_getspecific(GetQueuePtrTls()));
256 RTC_DCHECK(ctx->queue->wakeup_pipe_out_ == socket);
257 char buf;
258 RTC_CHECK(sizeof(buf) == read(socket, &buf, sizeof(buf)));
259 switch (buf) {
260 case kQuit:
261 ctx->is_active = false;
262 event_base_loopbreak(ctx->queue->event_base_);
263 break;
264 case kRunTask: {
265 std::unique_ptr<QueuedTask> task;
266 {
267 CritScope lock(&ctx->queue->pending_lock_);
268 RTC_DCHECK(!ctx->queue->pending_.empty());
269 task = std::move(ctx->queue->pending_.front());
270 ctx->queue->pending_.pop_front();
271 RTC_DCHECK(task.get());
272 }
273 if (!task->Run())
274 task.release();
275 break;
276 }
277 default:
278 RTC_NOTREACHED();
279 break;
280 }
281 }
282
283 // static
284 void TaskQueue::RunTask(int fd, short flags, void* context) { // NOLINT
285 auto* task = static_cast<QueuedTask*>(context);
286 if (task->Run())
287 delete task;
288 }
289
290 // static
291 void TaskQueue::RunTimer(int fd, short flags, void* context) { // NOLINT
292 TimerEvent* timer = static_cast<TimerEvent*>(context);
293 if (!timer->task->Run())
294 timer->task.release();
295 QueueContext* ctx =
296 static_cast<QueueContext*>(pthread_getspecific(GetQueuePtrTls()));
297 ctx->pending_timers_.remove(timer);
298 delete timer;
299 }
300
301 void TaskQueue::PrepareReplyTask(PostAndReplyTask* reply_task) {
302 RTC_DCHECK(reply_task);
303 CritScope lock(&pending_lock_);
304 pending_replies_.push_back(reply_task);
305 }
306
307 void TaskQueue::ReplyTaskDone(PostAndReplyTask* reply_task) {
308 CritScope lock(&pending_lock_);
309 pending_replies_.remove(reply_task);
310 }
311
312 } // namespace rtc
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698