| Index: webrtc/base/task_queue_libevent.cc
 | 
| diff --git a/webrtc/base/task_queue_libevent.cc b/webrtc/base/task_queue_libevent.cc
 | 
| new file mode 100644
 | 
| index 0000000000000000000000000000000000000000..a59b450828c7b070717579f1a26b5bbda835cfd4
 | 
| --- /dev/null
 | 
| +++ b/webrtc/base/task_queue_libevent.cc
 | 
| @@ -0,0 +1,318 @@
 | 
| +/*
 | 
| + *  Copyright 2016 The WebRTC Project Authors. All rights reserved.
 | 
| + *
 | 
| + *  Use of this source code is governed by a BSD-style license
 | 
| + *  that can be found in the LICENSE file in the root of the source
 | 
| + *  tree. An additional intellectual property rights grant can be found
 | 
| + *  in the file PATENTS.  All contributing project authors may
 | 
| + *  be found in the AUTHORS file in the root of the source tree.
 | 
| + */
 | 
| +
 | 
| +#include "webrtc/base/task_queue.h"
 | 
| +
 | 
| +#include <fcntl.h>
 | 
| +#include <string.h>
 | 
| +#include <unistd.h>
 | 
| +
 | 
| +#include "base/third_party/libevent/event.h"
 | 
| +#include "webrtc/base/checks.h"
 | 
| +#include "webrtc/base/logging.h"
 | 
| +#include "webrtc/base/task_queue_posix.h"
 | 
| +#include "webrtc/base/timeutils.h"
 | 
| +
 | 
| +namespace rtc {
 | 
| +using internal::GetQueuePtrTls;
 | 
| +using internal::AutoSetCurrentQueuePtr;
 | 
| +
 | 
| +namespace {
 | 
| +static const char kQuit = 1;
 | 
| +static const char kRunTask = 2;
 | 
| +
 | 
| +struct TimerEvent {
 | 
| +  explicit TimerEvent(std::unique_ptr<QueuedTask> task)
 | 
| +      : task(std::move(task)) {}
 | 
| +  ~TimerEvent() { event_del(&ev); }
 | 
| +  event ev;
 | 
| +  std::unique_ptr<QueuedTask> task;
 | 
| +};
 | 
| +
 | 
| +bool SetNonBlocking(int fd) {
 | 
| +  const int flags = fcntl(fd, F_GETFL);
 | 
| +  RTC_CHECK(flags != -1);
 | 
| +  return (flags & O_NONBLOCK) || fcntl(fd, F_SETFL, flags | O_NONBLOCK) != -1;
 | 
| +}
 | 
| +}  // namespace
 | 
| +
 | 
| +struct TaskQueue::QueueContext {
 | 
| +  explicit QueueContext(TaskQueue* q) : queue(q), is_active(true) {}
 | 
| +  TaskQueue* queue;
 | 
| +  bool is_active;
 | 
| +  // Holds a list of events pending timers for cleanup when the loop exits.
 | 
| +  std::list<TimerEvent*> pending_timers_;
 | 
| +};
 | 
| +
 | 
| +class TaskQueue::PostAndReplyTask : public QueuedTask {
 | 
| + public:
 | 
| +  PostAndReplyTask(std::unique_ptr<QueuedTask> task,
 | 
| +                   std::unique_ptr<QueuedTask> reply,
 | 
| +                   TaskQueue* reply_queue)
 | 
| +      : task_(std::move(task)),
 | 
| +        reply_(std::move(reply)),
 | 
| +        reply_queue_(reply_queue) {
 | 
| +    reply_queue->PrepareReplyTask(this);
 | 
| +  }
 | 
| +
 | 
| +  ~PostAndReplyTask() override {
 | 
| +    CritScope lock(&lock_);
 | 
| +    if (reply_queue_)
 | 
| +      reply_queue_->ReplyTaskDone(this);
 | 
| +  }
 | 
| +
 | 
| +  void OnReplyQueueGone() {
 | 
| +    CritScope lock(&lock_);
 | 
| +    reply_queue_ = nullptr;
 | 
| +  }
 | 
| +
 | 
| + private:
 | 
| +  bool Run() override {
 | 
| +    if (!task_->Run())
 | 
| +      task_.release();
 | 
| +
 | 
| +    CritScope lock(&lock_);
 | 
| +    if (reply_queue_)
 | 
| +      reply_queue_->PostTask(std::move(reply_));
 | 
| +    return true;
 | 
| +  }
 | 
| +
 | 
| +  CriticalSection lock_;
 | 
| +  std::unique_ptr<QueuedTask> task_;
 | 
| +  std::unique_ptr<QueuedTask> reply_;
 | 
| +  TaskQueue* reply_queue_ GUARDED_BY(lock_);
 | 
| +};
 | 
| +
 | 
| +class TaskQueue::SetTimerTask : public QueuedTask {
 | 
| + public:
 | 
| +  SetTimerTask(std::unique_ptr<QueuedTask> task, uint32_t milliseconds)
 | 
| +      : task_(std::move(task)),
 | 
| +        milliseconds_(milliseconds),
 | 
| +        posted_(Time32()) {}
 | 
| +
 | 
| + private:
 | 
| +  bool Run() override {
 | 
| +    // Compensate for the time that has passed since construction
 | 
| +    // and until we got here.
 | 
| +    uint32_t post_time = Time32() - posted_;
 | 
| +    TaskQueue::Current()->PostDelayedTask(
 | 
| +        std::move(task_),
 | 
| +        post_time > milliseconds_ ? 0 : milliseconds_ - post_time);
 | 
| +    return true;
 | 
| +  }
 | 
| +
 | 
| +  std::unique_ptr<QueuedTask> task_;
 | 
| +  const uint32_t milliseconds_;
 | 
| +  const uint32_t posted_;
 | 
| +};
 | 
| +
 | 
| +TaskQueue::TaskQueue(const char* queue_name)
 | 
| +    : event_base_(event_base_new()),
 | 
| +      wakeup_event_(new event()),
 | 
| +      thread_(&TaskQueue::ThreadMain, this, queue_name) {
 | 
| +  RTC_DCHECK(queue_name);
 | 
| +  int fds[2];
 | 
| +  RTC_CHECK(pipe(fds) == 0);
 | 
| +  SetNonBlocking(fds[0]);
 | 
| +  SetNonBlocking(fds[1]);
 | 
| +  wakeup_pipe_out_ = fds[0];
 | 
| +  wakeup_pipe_in_ = fds[1];
 | 
| +  event_set(wakeup_event_.get(), wakeup_pipe_out_, EV_READ | EV_PERSIST,
 | 
| +            OnWakeup, this);
 | 
| +  event_base_set(event_base_, wakeup_event_.get());
 | 
| +  event_add(wakeup_event_.get(), 0);
 | 
| +  thread_.Start();
 | 
| +}
 | 
| +
 | 
| +TaskQueue::~TaskQueue() {
 | 
| +  RTC_DCHECK(!IsCurrent());
 | 
| +  struct timespec ts;
 | 
| +  char message = kQuit;
 | 
| +  while (write(wakeup_pipe_in_, &message, sizeof(message)) != sizeof(message)) {
 | 
| +    // The queue is full, so we have no choice but to wait and retry.
 | 
| +    RTC_CHECK_EQ(EAGAIN, errno);
 | 
| +    ts.tv_sec = 0;
 | 
| +    ts.tv_nsec = 1000000;
 | 
| +    nanosleep(&ts, nullptr);
 | 
| +  }
 | 
| +
 | 
| +  thread_.Stop();
 | 
| +
 | 
| +  event_del(wakeup_event_.get());
 | 
| +  close(wakeup_pipe_in_);
 | 
| +  close(wakeup_pipe_out_);
 | 
| +  wakeup_pipe_in_ = -1;
 | 
| +  wakeup_pipe_out_ = -1;
 | 
| +
 | 
| +  {
 | 
| +    // Synchronize against any pending reply tasks that might be running on
 | 
| +    // other queues.
 | 
| +    CritScope lock(&pending_lock_);
 | 
| +    for (auto* reply : pending_replies_)
 | 
| +      reply->OnReplyQueueGone();
 | 
| +    pending_replies_.clear();
 | 
| +  }
 | 
| +
 | 
| +  event_base_free(event_base_);
 | 
| +}
 | 
| +
 | 
| +// static
 | 
| +TaskQueue* TaskQueue::Current() {
 | 
| +  QueueContext* ctx =
 | 
| +      static_cast<QueueContext*>(pthread_getspecific(GetQueuePtrTls()));
 | 
| +  return ctx ? ctx->queue : nullptr;
 | 
| +}
 | 
| +
 | 
| +// static
 | 
| +bool TaskQueue::IsCurrent(const char* queue_name) {
 | 
| +  TaskQueue* current = Current();
 | 
| +  return current && current->thread_.name().compare(queue_name) == 0;
 | 
| +}
 | 
| +
 | 
| +bool TaskQueue::IsCurrent() const {
 | 
| +  return IsThreadRefEqual(thread_.GetThreadRef(), CurrentThreadRef());
 | 
| +}
 | 
| +
 | 
| +void TaskQueue::PostTask(std::unique_ptr<QueuedTask> task) {
 | 
| +  RTC_DCHECK(task.get());
 | 
| +  // libevent isn't thread safe.  This means that we can't use methods such
 | 
| +  // as event_base_once to post tasks to the worker thread from a different
 | 
| +  // thread.  However, we can use it when posting from the worker thread itself.
 | 
| +  if (IsCurrent()) {
 | 
| +    if (event_base_once(event_base_, -1, EV_TIMEOUT, &TaskQueue::RunTask,
 | 
| +                        task.get(), nullptr) == 0) {
 | 
| +      task.release();
 | 
| +    }
 | 
| +  } else {
 | 
| +    QueuedTask* task_id = task.get();  // Only used for comparison.
 | 
| +    {
 | 
| +      CritScope lock(&pending_lock_);
 | 
| +      pending_.push_back(std::move(task));
 | 
| +    }
 | 
| +    char message = kRunTask;
 | 
| +    if (write(wakeup_pipe_in_, &message, sizeof(message)) != sizeof(message)) {
 | 
| +      LOG(WARNING) << "Failed to queue task.";
 | 
| +      CritScope lock(&pending_lock_);
 | 
| +      pending_.remove_if([task_id](std::unique_ptr<QueuedTask>& t) {
 | 
| +        return t.get() == task_id;
 | 
| +      });
 | 
| +    }
 | 
| +  }
 | 
| +}
 | 
| +
 | 
| +void TaskQueue::PostDelayedTask(std::unique_ptr<QueuedTask> task,
 | 
| +                                uint32_t milliseconds) {
 | 
| +  if (IsCurrent()) {
 | 
| +    TimerEvent* timer = new TimerEvent(std::move(task));
 | 
| +    evtimer_set(&timer->ev, &TaskQueue::RunTimer, timer);
 | 
| +    event_base_set(event_base_, &timer->ev);
 | 
| +    QueueContext* ctx =
 | 
| +        static_cast<QueueContext*>(pthread_getspecific(GetQueuePtrTls()));
 | 
| +    ctx->pending_timers_.push_back(timer);
 | 
| +    timeval tv = {milliseconds / 1000, (milliseconds % 1000) * 1000};
 | 
| +    event_add(&timer->ev, &tv);
 | 
| +  } else {
 | 
| +    PostTask(std::unique_ptr<QueuedTask>(
 | 
| +        new SetTimerTask(std::move(task), milliseconds)));
 | 
| +  }
 | 
| +}
 | 
| +
 | 
| +void TaskQueue::PostTaskAndReply(std::unique_ptr<QueuedTask> task,
 | 
| +                                 std::unique_ptr<QueuedTask> reply,
 | 
| +                                 TaskQueue* reply_queue) {
 | 
| +  std::unique_ptr<QueuedTask> wrapper_task(
 | 
| +      new PostAndReplyTask(std::move(task), std::move(reply), reply_queue));
 | 
| +  PostTask(std::move(wrapper_task));
 | 
| +}
 | 
| +
 | 
| +void TaskQueue::PostTaskAndReply(std::unique_ptr<QueuedTask> task,
 | 
| +                                 std::unique_ptr<QueuedTask> reply) {
 | 
| +  return PostTaskAndReply(std::move(task), std::move(reply), Current());
 | 
| +}
 | 
| +
 | 
| +// static
 | 
| +bool TaskQueue::ThreadMain(void* context) {
 | 
| +  TaskQueue* me = static_cast<TaskQueue*>(context);
 | 
| +
 | 
| +  QueueContext queue_context(me);
 | 
| +  pthread_setspecific(GetQueuePtrTls(), &queue_context);
 | 
| +
 | 
| +  while (queue_context.is_active)
 | 
| +    event_base_loop(me->event_base_, 0);
 | 
| +
 | 
| +  pthread_setspecific(GetQueuePtrTls(), nullptr);
 | 
| +
 | 
| +  for (TimerEvent* timer : queue_context.pending_timers_)
 | 
| +    delete timer;
 | 
| +
 | 
| +  return false;
 | 
| +}
 | 
| +
 | 
| +// static
 | 
| +void TaskQueue::OnWakeup(int socket, short flags, void* context) {  // NOLINT
 | 
| +  QueueContext* ctx =
 | 
| +      static_cast<QueueContext*>(pthread_getspecific(GetQueuePtrTls()));
 | 
| +  RTC_DCHECK(ctx->queue->wakeup_pipe_out_ == socket);
 | 
| +  char buf;
 | 
| +  RTC_CHECK(sizeof(buf) == read(socket, &buf, sizeof(buf)));
 | 
| +  switch (buf) {
 | 
| +    case kQuit:
 | 
| +      ctx->is_active = false;
 | 
| +      event_base_loopbreak(ctx->queue->event_base_);
 | 
| +      break;
 | 
| +    case kRunTask: {
 | 
| +      std::unique_ptr<QueuedTask> task;
 | 
| +      {
 | 
| +        CritScope lock(&ctx->queue->pending_lock_);
 | 
| +        RTC_DCHECK(!ctx->queue->pending_.empty());
 | 
| +        task = std::move(ctx->queue->pending_.front());
 | 
| +        ctx->queue->pending_.pop_front();
 | 
| +        RTC_DCHECK(task.get());
 | 
| +      }
 | 
| +      if (!task->Run())
 | 
| +        task.release();
 | 
| +      break;
 | 
| +    }
 | 
| +    default:
 | 
| +      RTC_NOTREACHED();
 | 
| +      break;
 | 
| +  }
 | 
| +}
 | 
| +
 | 
| +// static
 | 
| +void TaskQueue::RunTask(int fd, short flags, void* context) {  // NOLINT
 | 
| +  auto* task = static_cast<QueuedTask*>(context);
 | 
| +  if (task->Run())
 | 
| +    delete task;
 | 
| +}
 | 
| +
 | 
| +// static
 | 
| +void TaskQueue::RunTimer(int fd, short flags, void* context) {  // NOLINT
 | 
| +  TimerEvent* timer = static_cast<TimerEvent*>(context);
 | 
| +  if (!timer->task->Run())
 | 
| +    timer->task.release();
 | 
| +  QueueContext* ctx =
 | 
| +      static_cast<QueueContext*>(pthread_getspecific(GetQueuePtrTls()));
 | 
| +  ctx->pending_timers_.remove(timer);
 | 
| +  delete timer;
 | 
| +}
 | 
| +
 | 
| +void TaskQueue::PrepareReplyTask(PostAndReplyTask* reply_task) {
 | 
| +  RTC_DCHECK(reply_task);
 | 
| +  CritScope lock(&pending_lock_);
 | 
| +  pending_replies_.push_back(reply_task);
 | 
| +}
 | 
| +
 | 
| +void TaskQueue::ReplyTaskDone(PostAndReplyTask* reply_task) {
 | 
| +  CritScope lock(&pending_lock_);
 | 
| +  pending_replies_.remove(reply_task);
 | 
| +}
 | 
| +
 | 
| +}  // namespace rtc
 | 
| 
 |