Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(527)

Side by Side Diff: webrtc/base/task_queue_libevent.cc

Issue 2877023002: Move webrtc/{base => rtc_base} (Closed)
Patch Set: update presubmit.py and DEPS include rules Created 3 years, 5 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « webrtc/base/task_queue_gcd.cc ('k') | webrtc/base/task_queue_posix.h » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
(Empty)
1 /*
2 * Copyright 2016 The WebRTC Project Authors. All rights reserved.
3 *
4 * Use of this source code is governed by a BSD-style license
5 * that can be found in the LICENSE file in the root of the source
6 * tree. An additional intellectual property rights grant can be found
7 * in the file PATENTS. All contributing project authors may
8 * be found in the AUTHORS file in the root of the source tree.
9 */
10
11 #include "webrtc/base/task_queue.h"
12
13 #include <fcntl.h>
14 #include <signal.h>
15 #include <string.h>
16 #include <unistd.h>
17
18 #include "base/third_party/libevent/event.h"
19 #include "webrtc/base/checks.h"
20 #include "webrtc/base/logging.h"
21 #include "webrtc/base/task_queue_posix.h"
22 #include "webrtc/base/timeutils.h"
23
24 namespace rtc {
25 using internal::GetQueuePtrTls;
26 using internal::AutoSetCurrentQueuePtr;
27
28 namespace {
29 static const char kQuit = 1;
30 static const char kRunTask = 2;
31 static const char kRunReplyTask = 3;
32
33 using Priority = TaskQueue::Priority;
34
35 // This ignores the SIGPIPE signal on the calling thread.
36 // This signal can be fired when trying to write() to a pipe that's being
37 // closed or while closing a pipe that's being written to.
38 // We can run into that situation (e.g. reply tasks that don't get a chance to
39 // run because the task queue is being deleted) so we ignore this signal and
40 // continue as normal.
41 // As a side note for this implementation, it would be great if we could safely
42 // restore the sigmask, but unfortunately the operation of restoring it, can
43 // itself actually cause SIGPIPE to be signaled :-| (e.g. on MacOS)
44 // The SIGPIPE signal by default causes the process to be terminated, so we
45 // don't want to risk that.
46 // An alternative to this approach is to ignore the signal for the whole
47 // process:
48 // signal(SIGPIPE, SIG_IGN);
49 void IgnoreSigPipeSignalOnCurrentThread() {
50 sigset_t sigpipe_mask;
51 sigemptyset(&sigpipe_mask);
52 sigaddset(&sigpipe_mask, SIGPIPE);
53 pthread_sigmask(SIG_BLOCK, &sigpipe_mask, nullptr);
54 }
55
56 struct TimerEvent {
57 explicit TimerEvent(std::unique_ptr<QueuedTask> task)
58 : task(std::move(task)) {}
59 ~TimerEvent() { event_del(&ev); }
60 event ev;
61 std::unique_ptr<QueuedTask> task;
62 };
63
64 bool SetNonBlocking(int fd) {
65 const int flags = fcntl(fd, F_GETFL);
66 RTC_CHECK(flags != -1);
67 return (flags & O_NONBLOCK) || fcntl(fd, F_SETFL, flags | O_NONBLOCK) != -1;
68 }
69
70 // TODO(tommi): This is a hack to support two versions of libevent that we're
71 // compatible with. The method we really want to call is event_assign(),
72 // since event_set() has been marked as deprecated (and doesn't accept
73 // passing event_base__ as a parameter). However, the version of libevent
74 // that we have in Chromium, doesn't have event_assign(), so we need to call
75 // event_set() there.
76 void EventAssign(struct event* ev,
77 struct event_base* base,
78 int fd,
79 short events,
80 void (*callback)(int, short, void*),
81 void* arg) {
82 #if defined(_EVENT2_EVENT_H_)
83 RTC_CHECK_EQ(0, event_assign(ev, base, fd, events, callback, arg));
84 #else
85 event_set(ev, fd, events, callback, arg);
86 RTC_CHECK_EQ(0, event_base_set(base, ev));
87 #endif
88 }
89
90 ThreadPriority TaskQueuePriorityToThreadPriority(Priority priority) {
91 switch (priority) {
92 case Priority::HIGH:
93 return kRealtimePriority;
94 case Priority::LOW:
95 return kLowPriority;
96 case Priority::NORMAL:
97 return kNormalPriority;
98 default:
99 RTC_NOTREACHED();
100 break;
101 }
102 return kNormalPriority;
103 }
104 } // namespace
105
106 struct TaskQueue::QueueContext {
107 explicit QueueContext(TaskQueue* q) : queue(q), is_active(true) {}
108 TaskQueue* queue;
109 bool is_active;
110 // Holds a list of events pending timers for cleanup when the loop exits.
111 std::list<TimerEvent*> pending_timers_;
112 };
113
114 // Posting a reply task is tricky business. This class owns the reply task
115 // and a reference to it is held by both the reply queue and the first task.
116 // Here's an outline of what happens when dealing with a reply task.
117 // * The ReplyTaskOwner owns the |reply_| task.
118 // * One ref owned by PostAndReplyTask
119 // * One ref owned by the reply TaskQueue
120 // * ReplyTaskOwner has a flag |run_task_| initially set to false.
121 // * ReplyTaskOwner has a method: HasOneRef() (provided by RefCountedObject).
122 // * After successfully running the original |task_|, PostAndReplyTask() calls
123 // set_should_run_task(). This sets |run_task_| to true.
124 // * In PostAndReplyTask's dtor:
125 // * It releases its reference to ReplyTaskOwner (important to do this first).
126 // * Sends (write()) a kRunReplyTask message to the reply queue's pipe.
127 // * PostAndReplyTask doesn't care if write() fails, but when it does:
128 // * The reply queue is gone.
129 // * ReplyTaskOwner has already been deleted and the reply task too.
130 // * If write() succeeds:
131 // * ReplyQueue receives the kRunReplyTask message
132 // * Goes through all pending tasks, finding the first that HasOneRef()
133 // * Calls ReplyTaskOwner::Run()
134 // * if set_should_run_task() was called, the reply task will be run
135 // * Release the reference to ReplyTaskOwner
136 // * ReplyTaskOwner and associated |reply_| are deleted.
137 class TaskQueue::ReplyTaskOwner {
138 public:
139 ReplyTaskOwner(std::unique_ptr<QueuedTask> reply)
140 : reply_(std::move(reply)) {}
141
142 void Run() {
143 RTC_DCHECK(reply_);
144 if (run_task_) {
145 if (!reply_->Run())
146 reply_.release();
147 }
148 reply_.reset();
149 }
150
151 void set_should_run_task() {
152 RTC_DCHECK(!run_task_);
153 run_task_ = true;
154 }
155
156 private:
157 std::unique_ptr<QueuedTask> reply_;
158 bool run_task_ = false;
159 };
160
161 class TaskQueue::PostAndReplyTask : public QueuedTask {
162 public:
163 PostAndReplyTask(std::unique_ptr<QueuedTask> task,
164 std::unique_ptr<QueuedTask> reply,
165 TaskQueue* reply_queue,
166 int reply_pipe)
167 : task_(std::move(task)),
168 reply_pipe_(reply_pipe),
169 reply_task_owner_(
170 new RefCountedObject<ReplyTaskOwner>(std::move(reply))) {
171 reply_queue->PrepareReplyTask(reply_task_owner_);
172 }
173
174 ~PostAndReplyTask() override {
175 reply_task_owner_ = nullptr;
176 IgnoreSigPipeSignalOnCurrentThread();
177 // Send a signal to the reply queue that the reply task can run now.
178 // Depending on whether |set_should_run_task()| was called by the
179 // PostAndReplyTask(), the reply task may or may not actually run.
180 // In either case, it will be deleted.
181 char message = kRunReplyTask;
182 write(reply_pipe_, &message, sizeof(message));
183 }
184
185 private:
186 bool Run() override {
187 if (!task_->Run())
188 task_.release();
189 reply_task_owner_->set_should_run_task();
190 return true;
191 }
192
193 std::unique_ptr<QueuedTask> task_;
194 int reply_pipe_;
195 scoped_refptr<RefCountedObject<ReplyTaskOwner>> reply_task_owner_;
196 };
197
198 class TaskQueue::SetTimerTask : public QueuedTask {
199 public:
200 SetTimerTask(std::unique_ptr<QueuedTask> task, uint32_t milliseconds)
201 : task_(std::move(task)),
202 milliseconds_(milliseconds),
203 posted_(Time32()) {}
204
205 private:
206 bool Run() override {
207 // Compensate for the time that has passed since construction
208 // and until we got here.
209 uint32_t post_time = Time32() - posted_;
210 TaskQueue::Current()->PostDelayedTask(
211 std::move(task_),
212 post_time > milliseconds_ ? 0 : milliseconds_ - post_time);
213 return true;
214 }
215
216 std::unique_ptr<QueuedTask> task_;
217 const uint32_t milliseconds_;
218 const uint32_t posted_;
219 };
220
221 TaskQueue::TaskQueue(const char* queue_name, Priority priority /*= NORMAL*/)
222 : event_base_(event_base_new()),
223 wakeup_event_(new event()),
224 thread_(&TaskQueue::ThreadMain,
225 this,
226 queue_name,
227 TaskQueuePriorityToThreadPriority(priority)) {
228 RTC_DCHECK(queue_name);
229 int fds[2];
230 RTC_CHECK(pipe(fds) == 0);
231 SetNonBlocking(fds[0]);
232 SetNonBlocking(fds[1]);
233 wakeup_pipe_out_ = fds[0];
234 wakeup_pipe_in_ = fds[1];
235
236 EventAssign(wakeup_event_.get(), event_base_, wakeup_pipe_out_,
237 EV_READ | EV_PERSIST, OnWakeup, this);
238 event_add(wakeup_event_.get(), 0);
239 thread_.Start();
240 }
241
242 TaskQueue::~TaskQueue() {
243 RTC_DCHECK(!IsCurrent());
244 struct timespec ts;
245 char message = kQuit;
246 while (write(wakeup_pipe_in_, &message, sizeof(message)) != sizeof(message)) {
247 // The queue is full, so we have no choice but to wait and retry.
248 RTC_CHECK_EQ(EAGAIN, errno);
249 ts.tv_sec = 0;
250 ts.tv_nsec = 1000000;
251 nanosleep(&ts, nullptr);
252 }
253
254 thread_.Stop();
255
256 event_del(wakeup_event_.get());
257
258 IgnoreSigPipeSignalOnCurrentThread();
259
260 close(wakeup_pipe_in_);
261 close(wakeup_pipe_out_);
262 wakeup_pipe_in_ = -1;
263 wakeup_pipe_out_ = -1;
264
265 event_base_free(event_base_);
266 }
267
268 // static
269 TaskQueue* TaskQueue::Current() {
270 QueueContext* ctx =
271 static_cast<QueueContext*>(pthread_getspecific(GetQueuePtrTls()));
272 return ctx ? ctx->queue : nullptr;
273 }
274
275 // static
276 bool TaskQueue::IsCurrent(const char* queue_name) {
277 TaskQueue* current = Current();
278 return current && current->thread_.name().compare(queue_name) == 0;
279 }
280
281 bool TaskQueue::IsCurrent() const {
282 return IsThreadRefEqual(thread_.GetThreadRef(), CurrentThreadRef());
283 }
284
285 void TaskQueue::PostTask(std::unique_ptr<QueuedTask> task) {
286 RTC_DCHECK(task.get());
287 // libevent isn't thread safe. This means that we can't use methods such
288 // as event_base_once to post tasks to the worker thread from a different
289 // thread. However, we can use it when posting from the worker thread itself.
290 if (IsCurrent()) {
291 if (event_base_once(event_base_, -1, EV_TIMEOUT, &TaskQueue::RunTask,
292 task.get(), nullptr) == 0) {
293 task.release();
294 }
295 } else {
296 QueuedTask* task_id = task.get(); // Only used for comparison.
297 {
298 CritScope lock(&pending_lock_);
299 pending_.push_back(std::move(task));
300 }
301 char message = kRunTask;
302 if (write(wakeup_pipe_in_, &message, sizeof(message)) != sizeof(message)) {
303 LOG(WARNING) << "Failed to queue task.";
304 CritScope lock(&pending_lock_);
305 pending_.remove_if([task_id](std::unique_ptr<QueuedTask>& t) {
306 return t.get() == task_id;
307 });
308 }
309 }
310 }
311
312 void TaskQueue::PostDelayedTask(std::unique_ptr<QueuedTask> task,
313 uint32_t milliseconds) {
314 if (IsCurrent()) {
315 TimerEvent* timer = new TimerEvent(std::move(task));
316 EventAssign(&timer->ev, event_base_, -1, 0, &TaskQueue::RunTimer, timer);
317 QueueContext* ctx =
318 static_cast<QueueContext*>(pthread_getspecific(GetQueuePtrTls()));
319 ctx->pending_timers_.push_back(timer);
320 timeval tv = {milliseconds / 1000, (milliseconds % 1000) * 1000};
321 event_add(&timer->ev, &tv);
322 } else {
323 PostTask(std::unique_ptr<QueuedTask>(
324 new SetTimerTask(std::move(task), milliseconds)));
325 }
326 }
327
328 void TaskQueue::PostTaskAndReply(std::unique_ptr<QueuedTask> task,
329 std::unique_ptr<QueuedTask> reply,
330 TaskQueue* reply_queue) {
331 std::unique_ptr<QueuedTask> wrapper_task(
332 new PostAndReplyTask(std::move(task), std::move(reply), reply_queue,
333 reply_queue->wakeup_pipe_in_));
334 PostTask(std::move(wrapper_task));
335 }
336
337 void TaskQueue::PostTaskAndReply(std::unique_ptr<QueuedTask> task,
338 std::unique_ptr<QueuedTask> reply) {
339 return PostTaskAndReply(std::move(task), std::move(reply), Current());
340 }
341
342 // static
343 void TaskQueue::ThreadMain(void* context) {
344 TaskQueue* me = static_cast<TaskQueue*>(context);
345
346 QueueContext queue_context(me);
347 pthread_setspecific(GetQueuePtrTls(), &queue_context);
348
349 while (queue_context.is_active)
350 event_base_loop(me->event_base_, 0);
351
352 pthread_setspecific(GetQueuePtrTls(), nullptr);
353
354 for (TimerEvent* timer : queue_context.pending_timers_)
355 delete timer;
356 }
357
358 // static
359 void TaskQueue::OnWakeup(int socket, short flags, void* context) { // NOLINT
360 QueueContext* ctx =
361 static_cast<QueueContext*>(pthread_getspecific(GetQueuePtrTls()));
362 RTC_DCHECK(ctx->queue->wakeup_pipe_out_ == socket);
363 char buf;
364 RTC_CHECK(sizeof(buf) == read(socket, &buf, sizeof(buf)));
365 switch (buf) {
366 case kQuit:
367 ctx->is_active = false;
368 event_base_loopbreak(ctx->queue->event_base_);
369 break;
370 case kRunTask: {
371 std::unique_ptr<QueuedTask> task;
372 {
373 CritScope lock(&ctx->queue->pending_lock_);
374 RTC_DCHECK(!ctx->queue->pending_.empty());
375 task = std::move(ctx->queue->pending_.front());
376 ctx->queue->pending_.pop_front();
377 RTC_DCHECK(task.get());
378 }
379 if (!task->Run())
380 task.release();
381 break;
382 }
383 case kRunReplyTask: {
384 scoped_refptr<ReplyTaskOwnerRef> reply_task;
385 {
386 CritScope lock(&ctx->queue->pending_lock_);
387 for (auto it = ctx->queue->pending_replies_.begin();
388 it != ctx->queue->pending_replies_.end(); ++it) {
389 if ((*it)->HasOneRef()) {
390 reply_task = std::move(*it);
391 ctx->queue->pending_replies_.erase(it);
392 break;
393 }
394 }
395 }
396 reply_task->Run();
397 break;
398 }
399 default:
400 RTC_NOTREACHED();
401 break;
402 }
403 }
404
405 // static
406 void TaskQueue::RunTask(int fd, short flags, void* context) { // NOLINT
407 auto* task = static_cast<QueuedTask*>(context);
408 if (task->Run())
409 delete task;
410 }
411
412 // static
413 void TaskQueue::RunTimer(int fd, short flags, void* context) { // NOLINT
414 TimerEvent* timer = static_cast<TimerEvent*>(context);
415 if (!timer->task->Run())
416 timer->task.release();
417 QueueContext* ctx =
418 static_cast<QueueContext*>(pthread_getspecific(GetQueuePtrTls()));
419 ctx->pending_timers_.remove(timer);
420 delete timer;
421 }
422
423 void TaskQueue::PrepareReplyTask(scoped_refptr<ReplyTaskOwnerRef> reply_task) {
424 RTC_DCHECK(reply_task);
425 CritScope lock(&pending_lock_);
426 pending_replies_.push_back(std::move(reply_task));
427 }
428
429 } // namespace rtc
OLDNEW
« no previous file with comments | « webrtc/base/task_queue_gcd.cc ('k') | webrtc/base/task_queue_posix.h » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698