| Index: webrtc/rtc_base/task_queue_libevent.cc
|
| diff --git a/webrtc/rtc_base/task_queue_libevent.cc b/webrtc/rtc_base/task_queue_libevent.cc
|
| index db267dc80be893aa7715f5dea97ed2a57a93f47c..99b88df363c939c1deace1b142dedb3d9c3b7476 100644
|
| --- a/webrtc/rtc_base/task_queue_libevent.cc
|
| +++ b/webrtc/rtc_base/task_queue_libevent.cc
|
| @@ -18,7 +18,11 @@
|
| #include "base/third_party/libevent/event.h"
|
| #include "webrtc/rtc_base/checks.h"
|
| #include "webrtc/rtc_base/logging.h"
|
| +#include "webrtc/rtc_base/platform_thread.h"
|
| +#include "webrtc/rtc_base/refcount.h"
|
| +#include "webrtc/rtc_base/refcountedobject.h"
|
| #include "webrtc/rtc_base/safe_conversions.h"
|
| +#include "webrtc/rtc_base/task_queue.h"
|
| #include "webrtc/rtc_base/task_queue_posix.h"
|
| #include "webrtc/rtc_base/timeutils.h"
|
|
|
| @@ -104,9 +108,57 @@ ThreadPriority TaskQueuePriorityToThreadPriority(Priority priority) {
|
| }
|
| } // namespace
|
|
|
| -struct TaskQueue::QueueContext {
|
| - explicit QueueContext(TaskQueue* q) : queue(q), is_active(true) {}
|
| - TaskQueue* queue;
|
| +class TaskQueue::Impl : public RefCountInterface {
|
| + public:
|
| + explicit Impl(const char* queue_name,
|
| + TaskQueue* queue,
|
| + Priority priority = Priority::NORMAL);
|
| + ~Impl() override;
|
| +
|
| + static TaskQueue::Impl* Current();
|
| + static TaskQueue* CurrentQueue();
|
| +
|
| + // Used for DCHECKing the current queue.
|
| + static bool IsCurrent(const char* queue_name);
|
| + bool IsCurrent() const;
|
| +
|
| + void PostTask(std::unique_ptr<QueuedTask> task);
|
| + void PostTaskAndReply(std::unique_ptr<QueuedTask> task,
|
| + std::unique_ptr<QueuedTask> reply,
|
| + TaskQueue::Impl* reply_queue);
|
| +
|
| + void PostDelayedTask(std::unique_ptr<QueuedTask> task, uint32_t milliseconds);
|
| +
|
| + private:
|
| + static void ThreadMain(void* context);
|
| + static void OnWakeup(int socket, short flags, void* context); // NOLINT
|
| + static void RunTask(int fd, short flags, void* context); // NOLINT
|
| + static void RunTimer(int fd, short flags, void* context); // NOLINT
|
| +
|
| + class ReplyTaskOwner;
|
| + class PostAndReplyTask;
|
| + class SetTimerTask;
|
| +
|
| + typedef RefCountedObject<ReplyTaskOwner> ReplyTaskOwnerRef;
|
| +
|
| + void PrepareReplyTask(scoped_refptr<ReplyTaskOwnerRef> reply_task);
|
| +
|
| + struct QueueContext;
|
| + TaskQueue* const queue_;
|
| + int wakeup_pipe_in_ = -1;
|
| + int wakeup_pipe_out_ = -1;
|
| + event_base* event_base_;
|
| + std::unique_ptr<event> wakeup_event_;
|
| + PlatformThread thread_;
|
| + rtc::CriticalSection pending_lock_;
|
| + std::list<std::unique_ptr<QueuedTask>> pending_ GUARDED_BY(pending_lock_);
|
| + std::list<scoped_refptr<ReplyTaskOwnerRef>> pending_replies_
|
| + GUARDED_BY(pending_lock_);
|
| +};
|
| +
|
| +struct TaskQueue::Impl::QueueContext {
|
| + explicit QueueContext(TaskQueue::Impl* q) : queue(q), is_active(true) {}
|
| + TaskQueue::Impl* queue;
|
| bool is_active;
|
| // Holds a list of events pending timers for cleanup when the loop exits.
|
| std::list<TimerEvent*> pending_timers_;
|
| @@ -135,7 +187,7 @@ struct TaskQueue::QueueContext {
|
| // * if set_should_run_task() was called, the reply task will be run
|
| // * Release the reference to ReplyTaskOwner
|
| // * ReplyTaskOwner and associated |reply_| are deleted.
|
| -class TaskQueue::ReplyTaskOwner {
|
| +class TaskQueue::Impl::ReplyTaskOwner {
|
| public:
|
| ReplyTaskOwner(std::unique_ptr<QueuedTask> reply)
|
| : reply_(std::move(reply)) {}
|
| @@ -159,11 +211,11 @@ class TaskQueue::ReplyTaskOwner {
|
| bool run_task_ = false;
|
| };
|
|
|
| -class TaskQueue::PostAndReplyTask : public QueuedTask {
|
| +class TaskQueue::Impl::PostAndReplyTask : public QueuedTask {
|
| public:
|
| PostAndReplyTask(std::unique_ptr<QueuedTask> task,
|
| std::unique_ptr<QueuedTask> reply,
|
| - TaskQueue* reply_queue,
|
| + TaskQueue::Impl* reply_queue,
|
| int reply_pipe)
|
| : task_(std::move(task)),
|
| reply_pipe_(reply_pipe),
|
| @@ -196,7 +248,7 @@ class TaskQueue::PostAndReplyTask : public QueuedTask {
|
| scoped_refptr<RefCountedObject<ReplyTaskOwner>> reply_task_owner_;
|
| };
|
|
|
| -class TaskQueue::SetTimerTask : public QueuedTask {
|
| +class TaskQueue::Impl::SetTimerTask : public QueuedTask {
|
| public:
|
| SetTimerTask(std::unique_ptr<QueuedTask> task, uint32_t milliseconds)
|
| : task_(std::move(task)),
|
| @@ -208,7 +260,7 @@ class TaskQueue::SetTimerTask : public QueuedTask {
|
| // Compensate for the time that has passed since construction
|
| // and until we got here.
|
| uint32_t post_time = Time32() - posted_;
|
| - TaskQueue::Current()->PostDelayedTask(
|
| + TaskQueue::Impl::Current()->PostDelayedTask(
|
| std::move(task_),
|
| post_time > milliseconds_ ? 0 : milliseconds_ - post_time);
|
| return true;
|
| @@ -219,10 +271,13 @@ class TaskQueue::SetTimerTask : public QueuedTask {
|
| const uint32_t posted_;
|
| };
|
|
|
| -TaskQueue::TaskQueue(const char* queue_name, Priority priority /*= NORMAL*/)
|
| - : event_base_(event_base_new()),
|
| +TaskQueue::Impl::Impl(const char* queue_name,
|
| + TaskQueue* queue,
|
| + Priority priority /*= NORMAL*/)
|
| + : queue_(queue),
|
| + event_base_(event_base_new()),
|
| wakeup_event_(new event()),
|
| - thread_(&TaskQueue::ThreadMain,
|
| + thread_(&TaskQueue::Impl::ThreadMain,
|
| this,
|
| queue_name,
|
| TaskQueuePriorityToThreadPriority(priority)) {
|
| @@ -240,7 +295,7 @@ TaskQueue::TaskQueue(const char* queue_name, Priority priority /*= NORMAL*/)
|
| thread_.Start();
|
| }
|
|
|
| -TaskQueue::~TaskQueue() {
|
| +TaskQueue::Impl::~Impl() {
|
| RTC_DCHECK(!IsCurrent());
|
| struct timespec ts;
|
| char message = kQuit;
|
| @@ -267,29 +322,38 @@ TaskQueue::~TaskQueue() {
|
| }
|
|
|
| // static
|
| -TaskQueue* TaskQueue::Current() {
|
| +TaskQueue::Impl* TaskQueue::Impl::Current() {
|
| QueueContext* ctx =
|
| static_cast<QueueContext*>(pthread_getspecific(GetQueuePtrTls()));
|
| return ctx ? ctx->queue : nullptr;
|
| }
|
|
|
| // static
|
| -bool TaskQueue::IsCurrent(const char* queue_name) {
|
| - TaskQueue* current = Current();
|
| +TaskQueue* TaskQueue::Impl::CurrentQueue() {
|
| + TaskQueue::Impl* current = Current();
|
| + if (current) {
|
| + return current->queue_;
|
| + }
|
| + return nullptr;
|
| +}
|
| +
|
| +// static
|
| +bool TaskQueue::Impl::IsCurrent(const char* queue_name) {
|
| + TaskQueue::Impl* current = Current();
|
| return current && current->thread_.name().compare(queue_name) == 0;
|
| }
|
|
|
| -bool TaskQueue::IsCurrent() const {
|
| +bool TaskQueue::Impl::IsCurrent() const {
|
| return IsThreadRefEqual(thread_.GetThreadRef(), CurrentThreadRef());
|
| }
|
|
|
| -void TaskQueue::PostTask(std::unique_ptr<QueuedTask> task) {
|
| +void TaskQueue::Impl::PostTask(std::unique_ptr<QueuedTask> task) {
|
| RTC_DCHECK(task.get());
|
| // libevent isn't thread safe. This means that we can't use methods such
|
| // as event_base_once to post tasks to the worker thread from a different
|
| // thread. However, we can use it when posting from the worker thread itself.
|
| if (IsCurrent()) {
|
| - if (event_base_once(event_base_, -1, EV_TIMEOUT, &TaskQueue::RunTask,
|
| + if (event_base_once(event_base_, -1, EV_TIMEOUT, &TaskQueue::Impl::RunTask,
|
| task.get(), nullptr) == 0) {
|
| task.release();
|
| }
|
| @@ -310,11 +374,12 @@ void TaskQueue::PostTask(std::unique_ptr<QueuedTask> task) {
|
| }
|
| }
|
|
|
| -void TaskQueue::PostDelayedTask(std::unique_ptr<QueuedTask> task,
|
| - uint32_t milliseconds) {
|
| +void TaskQueue::Impl::PostDelayedTask(std::unique_ptr<QueuedTask> task,
|
| + uint32_t milliseconds) {
|
| if (IsCurrent()) {
|
| TimerEvent* timer = new TimerEvent(std::move(task));
|
| - EventAssign(&timer->ev, event_base_, -1, 0, &TaskQueue::RunTimer, timer);
|
| + EventAssign(&timer->ev, event_base_, -1, 0, &TaskQueue::Impl::RunTimer,
|
| + timer);
|
| QueueContext* ctx =
|
| static_cast<QueueContext*>(pthread_getspecific(GetQueuePtrTls()));
|
| ctx->pending_timers_.push_back(timer);
|
| @@ -327,23 +392,18 @@ void TaskQueue::PostDelayedTask(std::unique_ptr<QueuedTask> task,
|
| }
|
| }
|
|
|
| -void TaskQueue::PostTaskAndReply(std::unique_ptr<QueuedTask> task,
|
| - std::unique_ptr<QueuedTask> reply,
|
| - TaskQueue* reply_queue) {
|
| +void TaskQueue::Impl::PostTaskAndReply(std::unique_ptr<QueuedTask> task,
|
| + std::unique_ptr<QueuedTask> reply,
|
| + TaskQueue::Impl* reply_queue) {
|
| std::unique_ptr<QueuedTask> wrapper_task(
|
| new PostAndReplyTask(std::move(task), std::move(reply), reply_queue,
|
| reply_queue->wakeup_pipe_in_));
|
| PostTask(std::move(wrapper_task));
|
| }
|
|
|
| -void TaskQueue::PostTaskAndReply(std::unique_ptr<QueuedTask> task,
|
| - std::unique_ptr<QueuedTask> reply) {
|
| - return PostTaskAndReply(std::move(task), std::move(reply), Current());
|
| -}
|
| -
|
| // static
|
| -void TaskQueue::ThreadMain(void* context) {
|
| - TaskQueue* me = static_cast<TaskQueue*>(context);
|
| +void TaskQueue::Impl::ThreadMain(void* context) {
|
| + TaskQueue::Impl* me = static_cast<TaskQueue::Impl*>(context);
|
|
|
| QueueContext queue_context(me);
|
| pthread_setspecific(GetQueuePtrTls(), &queue_context);
|
| @@ -358,7 +418,9 @@ void TaskQueue::ThreadMain(void* context) {
|
| }
|
|
|
| // static
|
| -void TaskQueue::OnWakeup(int socket, short flags, void* context) { // NOLINT
|
| +void TaskQueue::Impl::OnWakeup(int socket,
|
| + short flags,
|
| + void* context) { // NOLINT
|
| QueueContext* ctx =
|
| static_cast<QueueContext*>(pthread_getspecific(GetQueuePtrTls()));
|
| RTC_DCHECK(ctx->queue->wakeup_pipe_out_ == socket);
|
| @@ -405,14 +467,14 @@ void TaskQueue::OnWakeup(int socket, short flags, void* context) { // NOLINT
|
| }
|
|
|
| // static
|
| -void TaskQueue::RunTask(int fd, short flags, void* context) { // NOLINT
|
| +void TaskQueue::Impl::RunTask(int fd, short flags, void* context) { // NOLINT
|
| auto* task = static_cast<QueuedTask*>(context);
|
| if (task->Run())
|
| delete task;
|
| }
|
|
|
| // static
|
| -void TaskQueue::RunTimer(int fd, short flags, void* context) { // NOLINT
|
| +void TaskQueue::Impl::RunTimer(int fd, short flags, void* context) { // NOLINT
|
| TimerEvent* timer = static_cast<TimerEvent*>(context);
|
| if (!timer->task->Run())
|
| timer->task.release();
|
| @@ -422,10 +484,54 @@ void TaskQueue::RunTimer(int fd, short flags, void* context) { // NOLINT
|
| delete timer;
|
| }
|
|
|
| -void TaskQueue::PrepareReplyTask(scoped_refptr<ReplyTaskOwnerRef> reply_task) {
|
| +void TaskQueue::Impl::PrepareReplyTask(
|
| + scoped_refptr<ReplyTaskOwnerRef> reply_task) {
|
| RTC_DCHECK(reply_task);
|
| CritScope lock(&pending_lock_);
|
| pending_replies_.push_back(std::move(reply_task));
|
| }
|
|
|
| +TaskQueue::TaskQueue(const char* queue_name, Priority priority)
|
| + : impl_(new RefCountedObject<TaskQueue::Impl>(queue_name, this, priority)) {
|
| +}
|
| +
|
| +TaskQueue::~TaskQueue() {}
|
| +
|
| +// static
|
| +TaskQueue* TaskQueue::Current() {
|
| + return TaskQueue::Impl::CurrentQueue();
|
| +}
|
| +
|
| +// Used for DCHECKing the current queue.
|
| +// static
|
| +bool TaskQueue::IsCurrent(const char* queue_name) {
|
| + return TaskQueue::Impl::IsCurrent(queue_name);
|
| +}
|
| +
|
| +bool TaskQueue::IsCurrent() const {
|
| + return impl_->IsCurrent();
|
| +}
|
| +
|
| +void TaskQueue::PostTask(std::unique_ptr<QueuedTask> task) {
|
| + return TaskQueue::impl_->PostTask(std::move(task));
|
| +}
|
| +
|
| +void TaskQueue::PostTaskAndReply(std::unique_ptr<QueuedTask> task,
|
| + std::unique_ptr<QueuedTask> reply,
|
| + TaskQueue* reply_queue) {
|
| + return TaskQueue::impl_->PostTaskAndReply(std::move(task), std::move(reply),
|
| + reply_queue->impl_.get());
|
| +}
|
| +
|
| +void TaskQueue::PostTaskAndReply(std::unique_ptr<QueuedTask> task,
|
| + std::unique_ptr<QueuedTask> reply) {
|
| + return TaskQueue::impl_->PostTaskAndReply(std::move(task), std::move(reply),
|
| + impl_.get());
|
| +}
|
| +
|
| +void TaskQueue::PostDelayedTask(std::unique_ptr<QueuedTask> task,
|
| + uint32_t milliseconds) {
|
| + return TaskQueue::impl_->PostDelayedTask(std::move(task), milliseconds);
|
| +}
|
| +
|
| } // namespace rtc
|
|
|