Chromium Code Reviews| Index: webrtc/rtc_base/task_queue_libevent.cc | 
| diff --git a/webrtc/rtc_base/task_queue_libevent.cc b/webrtc/rtc_base/task_queue_libevent.cc | 
| index db267dc80be893aa7715f5dea97ed2a57a93f47c..99b88df363c939c1deace1b142dedb3d9c3b7476 100644 | 
| --- a/webrtc/rtc_base/task_queue_libevent.cc | 
| +++ b/webrtc/rtc_base/task_queue_libevent.cc | 
| @@ -18,7 +18,11 @@ | 
| #include "base/third_party/libevent/event.h" | 
| #include "webrtc/rtc_base/checks.h" | 
| #include "webrtc/rtc_base/logging.h" | 
| +#include "webrtc/rtc_base/platform_thread.h" | 
| +#include "webrtc/rtc_base/refcount.h" | 
| +#include "webrtc/rtc_base/refcountedobject.h" | 
| #include "webrtc/rtc_base/safe_conversions.h" | 
| +#include "webrtc/rtc_base/task_queue.h" | 
| #include "webrtc/rtc_base/task_queue_posix.h" | 
| #include "webrtc/rtc_base/timeutils.h" | 
| @@ -104,9 +108,57 @@ ThreadPriority TaskQueuePriorityToThreadPriority(Priority priority) { | 
| } | 
| } // namespace | 
| -struct TaskQueue::QueueContext { | 
| - explicit QueueContext(TaskQueue* q) : queue(q), is_active(true) {} | 
| - TaskQueue* queue; | 
| +class TaskQueue::Impl : public RefCountInterface { | 
| + public: | 
| + explicit Impl(const char* queue_name, | 
| + TaskQueue* queue, | 
| + Priority priority = Priority::NORMAL); | 
| 
 
kwiberg-webrtc
2017/08/23 13:23:26
In a local function like this, which doesn't have
 
 | 
| + ~Impl() override; | 
| + | 
| + static TaskQueue::Impl* Current(); | 
| + static TaskQueue* CurrentQueue(); | 
| + | 
| + // Used for DCHECKing the current queue. | 
| + static bool IsCurrent(const char* queue_name); | 
| + bool IsCurrent() const; | 
| + | 
| + void PostTask(std::unique_ptr<QueuedTask> task); | 
| + void PostTaskAndReply(std::unique_ptr<QueuedTask> task, | 
| + std::unique_ptr<QueuedTask> reply, | 
| + TaskQueue::Impl* reply_queue); | 
| + | 
| + void PostDelayedTask(std::unique_ptr<QueuedTask> task, uint32_t milliseconds); | 
| + | 
| + private: | 
| + static void ThreadMain(void* context); | 
| + static void OnWakeup(int socket, short flags, void* context); // NOLINT | 
| + static void RunTask(int fd, short flags, void* context); // NOLINT | 
| + static void RunTimer(int fd, short flags, void* context); // NOLINT | 
| + | 
| + class ReplyTaskOwner; | 
| + class PostAndReplyTask; | 
| + class SetTimerTask; | 
| + | 
| + typedef RefCountedObject<ReplyTaskOwner> ReplyTaskOwnerRef; | 
| + | 
| + void PrepareReplyTask(scoped_refptr<ReplyTaskOwnerRef> reply_task); | 
| + | 
| + struct QueueContext; | 
| + TaskQueue* const queue_; | 
| + int wakeup_pipe_in_ = -1; | 
| + int wakeup_pipe_out_ = -1; | 
| + event_base* event_base_; | 
| + std::unique_ptr<event> wakeup_event_; | 
| + PlatformThread thread_; | 
| + rtc::CriticalSection pending_lock_; | 
| + std::list<std::unique_ptr<QueuedTask>> pending_ GUARDED_BY(pending_lock_); | 
| + std::list<scoped_refptr<ReplyTaskOwnerRef>> pending_replies_ | 
| + GUARDED_BY(pending_lock_); | 
| +}; | 
| + | 
| +struct TaskQueue::Impl::QueueContext { | 
| + explicit QueueContext(TaskQueue::Impl* q) : queue(q), is_active(true) {} | 
| + TaskQueue::Impl* queue; | 
| bool is_active; | 
| // Holds a list of events pending timers for cleanup when the loop exits. | 
| std::list<TimerEvent*> pending_timers_; | 
| @@ -135,7 +187,7 @@ struct TaskQueue::QueueContext { | 
| // * if set_should_run_task() was called, the reply task will be run | 
| // * Release the reference to ReplyTaskOwner | 
| // * ReplyTaskOwner and associated |reply_| are deleted. | 
| -class TaskQueue::ReplyTaskOwner { | 
| +class TaskQueue::Impl::ReplyTaskOwner { | 
| public: | 
| ReplyTaskOwner(std::unique_ptr<QueuedTask> reply) | 
| : reply_(std::move(reply)) {} | 
| @@ -159,11 +211,11 @@ class TaskQueue::ReplyTaskOwner { | 
| bool run_task_ = false; | 
| }; | 
| -class TaskQueue::PostAndReplyTask : public QueuedTask { | 
| +class TaskQueue::Impl::PostAndReplyTask : public QueuedTask { | 
| public: | 
| PostAndReplyTask(std::unique_ptr<QueuedTask> task, | 
| std::unique_ptr<QueuedTask> reply, | 
| - TaskQueue* reply_queue, | 
| + TaskQueue::Impl* reply_queue, | 
| int reply_pipe) | 
| : task_(std::move(task)), | 
| reply_pipe_(reply_pipe), | 
| @@ -196,7 +248,7 @@ class TaskQueue::PostAndReplyTask : public QueuedTask { | 
| scoped_refptr<RefCountedObject<ReplyTaskOwner>> reply_task_owner_; | 
| }; | 
| -class TaskQueue::SetTimerTask : public QueuedTask { | 
| +class TaskQueue::Impl::SetTimerTask : public QueuedTask { | 
| public: | 
| SetTimerTask(std::unique_ptr<QueuedTask> task, uint32_t milliseconds) | 
| : task_(std::move(task)), | 
| @@ -208,7 +260,7 @@ class TaskQueue::SetTimerTask : public QueuedTask { | 
| // Compensate for the time that has passed since construction | 
| // and until we got here. | 
| uint32_t post_time = Time32() - posted_; | 
| - TaskQueue::Current()->PostDelayedTask( | 
| + TaskQueue::Impl::Current()->PostDelayedTask( | 
| std::move(task_), | 
| post_time > milliseconds_ ? 0 : milliseconds_ - post_time); | 
| return true; | 
| @@ -219,10 +271,13 @@ class TaskQueue::SetTimerTask : public QueuedTask { | 
| const uint32_t posted_; | 
| }; | 
| -TaskQueue::TaskQueue(const char* queue_name, Priority priority /*= NORMAL*/) | 
| - : event_base_(event_base_new()), | 
| +TaskQueue::Impl::Impl(const char* queue_name, | 
| + TaskQueue* queue, | 
| + Priority priority /*= NORMAL*/) | 
| + : queue_(queue), | 
| + event_base_(event_base_new()), | 
| wakeup_event_(new event()), | 
| - thread_(&TaskQueue::ThreadMain, | 
| + thread_(&TaskQueue::Impl::ThreadMain, | 
| this, | 
| queue_name, | 
| TaskQueuePriorityToThreadPriority(priority)) { | 
| @@ -240,7 +295,7 @@ TaskQueue::TaskQueue(const char* queue_name, Priority priority /*= NORMAL*/) | 
| thread_.Start(); | 
| } | 
| -TaskQueue::~TaskQueue() { | 
| +TaskQueue::Impl::~Impl() { | 
| RTC_DCHECK(!IsCurrent()); | 
| struct timespec ts; | 
| char message = kQuit; | 
| @@ -267,29 +322,38 @@ TaskQueue::~TaskQueue() { | 
| } | 
| // static | 
| -TaskQueue* TaskQueue::Current() { | 
| +TaskQueue::Impl* TaskQueue::Impl::Current() { | 
| QueueContext* ctx = | 
| static_cast<QueueContext*>(pthread_getspecific(GetQueuePtrTls())); | 
| return ctx ? ctx->queue : nullptr; | 
| } | 
| // static | 
| -bool TaskQueue::IsCurrent(const char* queue_name) { | 
| - TaskQueue* current = Current(); | 
| +TaskQueue* TaskQueue::Impl::CurrentQueue() { | 
| + TaskQueue::Impl* current = Current(); | 
| + if (current) { | 
| + return current->queue_; | 
| + } | 
| + return nullptr; | 
| +} | 
| + | 
| +// static | 
| +bool TaskQueue::Impl::IsCurrent(const char* queue_name) { | 
| + TaskQueue::Impl* current = Current(); | 
| return current && current->thread_.name().compare(queue_name) == 0; | 
| } | 
| -bool TaskQueue::IsCurrent() const { | 
| +bool TaskQueue::Impl::IsCurrent() const { | 
| return IsThreadRefEqual(thread_.GetThreadRef(), CurrentThreadRef()); | 
| } | 
| -void TaskQueue::PostTask(std::unique_ptr<QueuedTask> task) { | 
| +void TaskQueue::Impl::PostTask(std::unique_ptr<QueuedTask> task) { | 
| RTC_DCHECK(task.get()); | 
| // libevent isn't thread safe. This means that we can't use methods such | 
| // as event_base_once to post tasks to the worker thread from a different | 
| // thread. However, we can use it when posting from the worker thread itself. | 
| if (IsCurrent()) { | 
| - if (event_base_once(event_base_, -1, EV_TIMEOUT, &TaskQueue::RunTask, | 
| + if (event_base_once(event_base_, -1, EV_TIMEOUT, &TaskQueue::Impl::RunTask, | 
| task.get(), nullptr) == 0) { | 
| task.release(); | 
| } | 
| @@ -310,11 +374,12 @@ void TaskQueue::PostTask(std::unique_ptr<QueuedTask> task) { | 
| } | 
| } | 
| -void TaskQueue::PostDelayedTask(std::unique_ptr<QueuedTask> task, | 
| - uint32_t milliseconds) { | 
| +void TaskQueue::Impl::PostDelayedTask(std::unique_ptr<QueuedTask> task, | 
| + uint32_t milliseconds) { | 
| if (IsCurrent()) { | 
| TimerEvent* timer = new TimerEvent(std::move(task)); | 
| - EventAssign(&timer->ev, event_base_, -1, 0, &TaskQueue::RunTimer, timer); | 
| + EventAssign(&timer->ev, event_base_, -1, 0, &TaskQueue::Impl::RunTimer, | 
| + timer); | 
| QueueContext* ctx = | 
| static_cast<QueueContext*>(pthread_getspecific(GetQueuePtrTls())); | 
| ctx->pending_timers_.push_back(timer); | 
| @@ -327,23 +392,18 @@ void TaskQueue::PostDelayedTask(std::unique_ptr<QueuedTask> task, | 
| } | 
| } | 
| -void TaskQueue::PostTaskAndReply(std::unique_ptr<QueuedTask> task, | 
| - std::unique_ptr<QueuedTask> reply, | 
| - TaskQueue* reply_queue) { | 
| +void TaskQueue::Impl::PostTaskAndReply(std::unique_ptr<QueuedTask> task, | 
| + std::unique_ptr<QueuedTask> reply, | 
| + TaskQueue::Impl* reply_queue) { | 
| std::unique_ptr<QueuedTask> wrapper_task( | 
| new PostAndReplyTask(std::move(task), std::move(reply), reply_queue, | 
| reply_queue->wakeup_pipe_in_)); | 
| PostTask(std::move(wrapper_task)); | 
| } | 
| -void TaskQueue::PostTaskAndReply(std::unique_ptr<QueuedTask> task, | 
| - std::unique_ptr<QueuedTask> reply) { | 
| - return PostTaskAndReply(std::move(task), std::move(reply), Current()); | 
| -} | 
| - | 
| // static | 
| -void TaskQueue::ThreadMain(void* context) { | 
| - TaskQueue* me = static_cast<TaskQueue*>(context); | 
| +void TaskQueue::Impl::ThreadMain(void* context) { | 
| + TaskQueue::Impl* me = static_cast<TaskQueue::Impl*>(context); | 
| QueueContext queue_context(me); | 
| pthread_setspecific(GetQueuePtrTls(), &queue_context); | 
| @@ -358,7 +418,9 @@ void TaskQueue::ThreadMain(void* context) { | 
| } | 
| // static | 
| -void TaskQueue::OnWakeup(int socket, short flags, void* context) { // NOLINT | 
| +void TaskQueue::Impl::OnWakeup(int socket, | 
| + short flags, | 
| + void* context) { // NOLINT | 
| QueueContext* ctx = | 
| static_cast<QueueContext*>(pthread_getspecific(GetQueuePtrTls())); | 
| RTC_DCHECK(ctx->queue->wakeup_pipe_out_ == socket); | 
| @@ -405,14 +467,14 @@ void TaskQueue::OnWakeup(int socket, short flags, void* context) { // NOLINT | 
| } | 
| // static | 
| -void TaskQueue::RunTask(int fd, short flags, void* context) { // NOLINT | 
| +void TaskQueue::Impl::RunTask(int fd, short flags, void* context) { // NOLINT | 
| auto* task = static_cast<QueuedTask*>(context); | 
| if (task->Run()) | 
| delete task; | 
| } | 
| // static | 
| -void TaskQueue::RunTimer(int fd, short flags, void* context) { // NOLINT | 
| +void TaskQueue::Impl::RunTimer(int fd, short flags, void* context) { // NOLINT | 
| TimerEvent* timer = static_cast<TimerEvent*>(context); | 
| if (!timer->task->Run()) | 
| timer->task.release(); | 
| @@ -422,10 +484,54 @@ void TaskQueue::RunTimer(int fd, short flags, void* context) { // NOLINT | 
| delete timer; | 
| } | 
| -void TaskQueue::PrepareReplyTask(scoped_refptr<ReplyTaskOwnerRef> reply_task) { | 
| +void TaskQueue::Impl::PrepareReplyTask( | 
| + scoped_refptr<ReplyTaskOwnerRef> reply_task) { | 
| RTC_DCHECK(reply_task); | 
| CritScope lock(&pending_lock_); | 
| pending_replies_.push_back(std::move(reply_task)); | 
| } | 
| +TaskQueue::TaskQueue(const char* queue_name, Priority priority) | 
| + : impl_(new RefCountedObject<TaskQueue::Impl>(queue_name, this, priority)) { | 
| +} | 
| + | 
| +TaskQueue::~TaskQueue() {} | 
| + | 
| +// static | 
| +TaskQueue* TaskQueue::Current() { | 
| + return TaskQueue::Impl::CurrentQueue(); | 
| +} | 
| + | 
| +// Used for DCHECKing the current queue. | 
| +// static | 
| +bool TaskQueue::IsCurrent(const char* queue_name) { | 
| + return TaskQueue::Impl::IsCurrent(queue_name); | 
| +} | 
| + | 
| +bool TaskQueue::IsCurrent() const { | 
| + return impl_->IsCurrent(); | 
| +} | 
| + | 
| +void TaskQueue::PostTask(std::unique_ptr<QueuedTask> task) { | 
| + return TaskQueue::impl_->PostTask(std::move(task)); | 
| +} | 
| + | 
| +void TaskQueue::PostTaskAndReply(std::unique_ptr<QueuedTask> task, | 
| + std::unique_ptr<QueuedTask> reply, | 
| + TaskQueue* reply_queue) { | 
| + return TaskQueue::impl_->PostTaskAndReply(std::move(task), std::move(reply), | 
| + reply_queue->impl_.get()); | 
| +} | 
| + | 
| +void TaskQueue::PostTaskAndReply(std::unique_ptr<QueuedTask> task, | 
| + std::unique_ptr<QueuedTask> reply) { | 
| + return TaskQueue::impl_->PostTaskAndReply(std::move(task), std::move(reply), | 
| + impl_.get()); | 
| +} | 
| + | 
| +void TaskQueue::PostDelayedTask(std::unique_ptr<QueuedTask> task, | 
| + uint32_t milliseconds) { | 
| + return TaskQueue::impl_->PostDelayedTask(std::move(task), milliseconds); | 
| +} | 
| + | 
| } // namespace rtc |