Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(2538)

Unified Diff: webrtc/base/task_queue_libevent.cc

Issue 1984503002: Reland of New task queueing primitive for async tasks: TaskQueue. (Closed) Base URL: https://chromium.googlesource.com/external/webrtc.git@master
Patch Set: Optional initialization for build_for Created 4 years, 7 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: webrtc/base/task_queue_libevent.cc
diff --git a/webrtc/base/task_queue_libevent.cc b/webrtc/base/task_queue_libevent.cc
new file mode 100644
index 0000000000000000000000000000000000000000..a59b450828c7b070717579f1a26b5bbda835cfd4
--- /dev/null
+++ b/webrtc/base/task_queue_libevent.cc
@@ -0,0 +1,318 @@
+/*
+ * Copyright 2016 The WebRTC Project Authors. All rights reserved.
+ *
+ * Use of this source code is governed by a BSD-style license
+ * that can be found in the LICENSE file in the root of the source
+ * tree. An additional intellectual property rights grant can be found
+ * in the file PATENTS. All contributing project authors may
+ * be found in the AUTHORS file in the root of the source tree.
+ */
+
+#include "webrtc/base/task_queue.h"
+
+#include <fcntl.h>
+#include <string.h>
+#include <unistd.h>
+
+#include "base/third_party/libevent/event.h"
+#include "webrtc/base/checks.h"
+#include "webrtc/base/logging.h"
+#include "webrtc/base/task_queue_posix.h"
+#include "webrtc/base/timeutils.h"
+
+namespace rtc {
+using internal::GetQueuePtrTls;
+using internal::AutoSetCurrentQueuePtr;
+
+namespace {
+static const char kQuit = 1;
+static const char kRunTask = 2;
+
+struct TimerEvent {
+ explicit TimerEvent(std::unique_ptr<QueuedTask> task)
+ : task(std::move(task)) {}
+ ~TimerEvent() { event_del(&ev); }
+ event ev;
+ std::unique_ptr<QueuedTask> task;
+};
+
+bool SetNonBlocking(int fd) {
+ const int flags = fcntl(fd, F_GETFL);
+ RTC_CHECK(flags != -1);
+ return (flags & O_NONBLOCK) || fcntl(fd, F_SETFL, flags | O_NONBLOCK) != -1;
+}
+} // namespace
+
+struct TaskQueue::QueueContext {
+ explicit QueueContext(TaskQueue* q) : queue(q), is_active(true) {}
+ TaskQueue* queue;
+ bool is_active;
+ // Holds a list of events pending timers for cleanup when the loop exits.
+ std::list<TimerEvent*> pending_timers_;
+};
+
+class TaskQueue::PostAndReplyTask : public QueuedTask {
+ public:
+ PostAndReplyTask(std::unique_ptr<QueuedTask> task,
+ std::unique_ptr<QueuedTask> reply,
+ TaskQueue* reply_queue)
+ : task_(std::move(task)),
+ reply_(std::move(reply)),
+ reply_queue_(reply_queue) {
+ reply_queue->PrepareReplyTask(this);
+ }
+
+ ~PostAndReplyTask() override {
+ CritScope lock(&lock_);
+ if (reply_queue_)
+ reply_queue_->ReplyTaskDone(this);
+ }
+
+ void OnReplyQueueGone() {
+ CritScope lock(&lock_);
+ reply_queue_ = nullptr;
+ }
+
+ private:
+ bool Run() override {
+ if (!task_->Run())
+ task_.release();
+
+ CritScope lock(&lock_);
+ if (reply_queue_)
+ reply_queue_->PostTask(std::move(reply_));
+ return true;
+ }
+
+ CriticalSection lock_;
+ std::unique_ptr<QueuedTask> task_;
+ std::unique_ptr<QueuedTask> reply_;
+ TaskQueue* reply_queue_ GUARDED_BY(lock_);
+};
+
+class TaskQueue::SetTimerTask : public QueuedTask {
+ public:
+ SetTimerTask(std::unique_ptr<QueuedTask> task, uint32_t milliseconds)
+ : task_(std::move(task)),
+ milliseconds_(milliseconds),
+ posted_(Time32()) {}
+
+ private:
+ bool Run() override {
+ // Compensate for the time that has passed since construction
+ // and until we got here.
+ uint32_t post_time = Time32() - posted_;
+ TaskQueue::Current()->PostDelayedTask(
+ std::move(task_),
+ post_time > milliseconds_ ? 0 : milliseconds_ - post_time);
+ return true;
+ }
+
+ std::unique_ptr<QueuedTask> task_;
+ const uint32_t milliseconds_;
+ const uint32_t posted_;
+};
+
+TaskQueue::TaskQueue(const char* queue_name)
+ : event_base_(event_base_new()),
+ wakeup_event_(new event()),
+ thread_(&TaskQueue::ThreadMain, this, queue_name) {
+ RTC_DCHECK(queue_name);
+ int fds[2];
+ RTC_CHECK(pipe(fds) == 0);
+ SetNonBlocking(fds[0]);
+ SetNonBlocking(fds[1]);
+ wakeup_pipe_out_ = fds[0];
+ wakeup_pipe_in_ = fds[1];
+ event_set(wakeup_event_.get(), wakeup_pipe_out_, EV_READ | EV_PERSIST,
+ OnWakeup, this);
+ event_base_set(event_base_, wakeup_event_.get());
+ event_add(wakeup_event_.get(), 0);
+ thread_.Start();
+}
+
+TaskQueue::~TaskQueue() {
+ RTC_DCHECK(!IsCurrent());
+ struct timespec ts;
+ char message = kQuit;
+ while (write(wakeup_pipe_in_, &message, sizeof(message)) != sizeof(message)) {
+ // The queue is full, so we have no choice but to wait and retry.
+ RTC_CHECK_EQ(EAGAIN, errno);
+ ts.tv_sec = 0;
+ ts.tv_nsec = 1000000;
+ nanosleep(&ts, nullptr);
+ }
+
+ thread_.Stop();
+
+ event_del(wakeup_event_.get());
+ close(wakeup_pipe_in_);
+ close(wakeup_pipe_out_);
+ wakeup_pipe_in_ = -1;
+ wakeup_pipe_out_ = -1;
+
+ {
+ // Synchronize against any pending reply tasks that might be running on
+ // other queues.
+ CritScope lock(&pending_lock_);
+ for (auto* reply : pending_replies_)
+ reply->OnReplyQueueGone();
+ pending_replies_.clear();
+ }
+
+ event_base_free(event_base_);
+}
+
+// static
+TaskQueue* TaskQueue::Current() {
+ QueueContext* ctx =
+ static_cast<QueueContext*>(pthread_getspecific(GetQueuePtrTls()));
+ return ctx ? ctx->queue : nullptr;
+}
+
+// static
+bool TaskQueue::IsCurrent(const char* queue_name) {
+ TaskQueue* current = Current();
+ return current && current->thread_.name().compare(queue_name) == 0;
+}
+
+bool TaskQueue::IsCurrent() const {
+ return IsThreadRefEqual(thread_.GetThreadRef(), CurrentThreadRef());
+}
+
+void TaskQueue::PostTask(std::unique_ptr<QueuedTask> task) {
+ RTC_DCHECK(task.get());
+ // libevent isn't thread safe. This means that we can't use methods such
+ // as event_base_once to post tasks to the worker thread from a different
+ // thread. However, we can use it when posting from the worker thread itself.
+ if (IsCurrent()) {
+ if (event_base_once(event_base_, -1, EV_TIMEOUT, &TaskQueue::RunTask,
+ task.get(), nullptr) == 0) {
+ task.release();
+ }
+ } else {
+ QueuedTask* task_id = task.get(); // Only used for comparison.
+ {
+ CritScope lock(&pending_lock_);
+ pending_.push_back(std::move(task));
+ }
+ char message = kRunTask;
+ if (write(wakeup_pipe_in_, &message, sizeof(message)) != sizeof(message)) {
+ LOG(WARNING) << "Failed to queue task.";
+ CritScope lock(&pending_lock_);
+ pending_.remove_if([task_id](std::unique_ptr<QueuedTask>& t) {
+ return t.get() == task_id;
+ });
+ }
+ }
+}
+
+void TaskQueue::PostDelayedTask(std::unique_ptr<QueuedTask> task,
+ uint32_t milliseconds) {
+ if (IsCurrent()) {
+ TimerEvent* timer = new TimerEvent(std::move(task));
+ evtimer_set(&timer->ev, &TaskQueue::RunTimer, timer);
+ event_base_set(event_base_, &timer->ev);
+ QueueContext* ctx =
+ static_cast<QueueContext*>(pthread_getspecific(GetQueuePtrTls()));
+ ctx->pending_timers_.push_back(timer);
+ timeval tv = {milliseconds / 1000, (milliseconds % 1000) * 1000};
+ event_add(&timer->ev, &tv);
+ } else {
+ PostTask(std::unique_ptr<QueuedTask>(
+ new SetTimerTask(std::move(task), milliseconds)));
+ }
+}
+
+void TaskQueue::PostTaskAndReply(std::unique_ptr<QueuedTask> task,
+ std::unique_ptr<QueuedTask> reply,
+ TaskQueue* reply_queue) {
+ std::unique_ptr<QueuedTask> wrapper_task(
+ new PostAndReplyTask(std::move(task), std::move(reply), reply_queue));
+ PostTask(std::move(wrapper_task));
+}
+
+void TaskQueue::PostTaskAndReply(std::unique_ptr<QueuedTask> task,
+ std::unique_ptr<QueuedTask> reply) {
+ return PostTaskAndReply(std::move(task), std::move(reply), Current());
+}
+
+// static
+bool TaskQueue::ThreadMain(void* context) {
+ TaskQueue* me = static_cast<TaskQueue*>(context);
+
+ QueueContext queue_context(me);
+ pthread_setspecific(GetQueuePtrTls(), &queue_context);
+
+ while (queue_context.is_active)
+ event_base_loop(me->event_base_, 0);
+
+ pthread_setspecific(GetQueuePtrTls(), nullptr);
+
+ for (TimerEvent* timer : queue_context.pending_timers_)
+ delete timer;
+
+ return false;
+}
+
+// static
+void TaskQueue::OnWakeup(int socket, short flags, void* context) { // NOLINT
+ QueueContext* ctx =
+ static_cast<QueueContext*>(pthread_getspecific(GetQueuePtrTls()));
+ RTC_DCHECK(ctx->queue->wakeup_pipe_out_ == socket);
+ char buf;
+ RTC_CHECK(sizeof(buf) == read(socket, &buf, sizeof(buf)));
+ switch (buf) {
+ case kQuit:
+ ctx->is_active = false;
+ event_base_loopbreak(ctx->queue->event_base_);
+ break;
+ case kRunTask: {
+ std::unique_ptr<QueuedTask> task;
+ {
+ CritScope lock(&ctx->queue->pending_lock_);
+ RTC_DCHECK(!ctx->queue->pending_.empty());
+ task = std::move(ctx->queue->pending_.front());
+ ctx->queue->pending_.pop_front();
+ RTC_DCHECK(task.get());
+ }
+ if (!task->Run())
+ task.release();
+ break;
+ }
+ default:
+ RTC_NOTREACHED();
+ break;
+ }
+}
+
+// static
+void TaskQueue::RunTask(int fd, short flags, void* context) { // NOLINT
+ auto* task = static_cast<QueuedTask*>(context);
+ if (task->Run())
+ delete task;
+}
+
+// static
+void TaskQueue::RunTimer(int fd, short flags, void* context) { // NOLINT
+ TimerEvent* timer = static_cast<TimerEvent*>(context);
+ if (!timer->task->Run())
+ timer->task.release();
+ QueueContext* ctx =
+ static_cast<QueueContext*>(pthread_getspecific(GetQueuePtrTls()));
+ ctx->pending_timers_.remove(timer);
+ delete timer;
+}
+
+void TaskQueue::PrepareReplyTask(PostAndReplyTask* reply_task) {
+ RTC_DCHECK(reply_task);
+ CritScope lock(&pending_lock_);
+ pending_replies_.push_back(reply_task);
+}
+
+void TaskQueue::ReplyTaskDone(PostAndReplyTask* reply_task) {
+ CritScope lock(&pending_lock_);
+ pending_replies_.remove(reply_task);
+}
+
+} // namespace rtc

Powered by Google App Engine
This is Rietveld 408576698