Chromium Code Reviews| Index: webrtc/rtc_base/task_queue_libevent.cc |
| diff --git a/webrtc/rtc_base/task_queue_libevent.cc b/webrtc/rtc_base/task_queue_libevent.cc |
| index db267dc80be893aa7715f5dea97ed2a57a93f47c..99b88df363c939c1deace1b142dedb3d9c3b7476 100644 |
| --- a/webrtc/rtc_base/task_queue_libevent.cc |
| +++ b/webrtc/rtc_base/task_queue_libevent.cc |
| @@ -18,7 +18,11 @@ |
| #include "base/third_party/libevent/event.h" |
| #include "webrtc/rtc_base/checks.h" |
| #include "webrtc/rtc_base/logging.h" |
| +#include "webrtc/rtc_base/platform_thread.h" |
| +#include "webrtc/rtc_base/refcount.h" |
| +#include "webrtc/rtc_base/refcountedobject.h" |
| #include "webrtc/rtc_base/safe_conversions.h" |
| +#include "webrtc/rtc_base/task_queue.h" |
| #include "webrtc/rtc_base/task_queue_posix.h" |
| #include "webrtc/rtc_base/timeutils.h" |
| @@ -104,9 +108,57 @@ ThreadPriority TaskQueuePriorityToThreadPriority(Priority priority) { |
| } |
| } // namespace |
| -struct TaskQueue::QueueContext { |
| - explicit QueueContext(TaskQueue* q) : queue(q), is_active(true) {} |
| - TaskQueue* queue; |
| +class TaskQueue::Impl : public RefCountInterface { |
| + public: |
| + explicit Impl(const char* queue_name, |
| + TaskQueue* queue, |
| + Priority priority = Priority::NORMAL); |
| + ~Impl() override; |
| + |
| + static TaskQueue::Impl* Current(); |
| + static TaskQueue* CurrentQueue(); |
| + |
| + // Used for DCHECKing the current queue. |
| + static bool IsCurrent(const char* queue_name); |
| + bool IsCurrent() const; |
| + |
| + void PostTask(std::unique_ptr<QueuedTask> task); |
| + void PostTaskAndReply(std::unique_ptr<QueuedTask> task, |
| + std::unique_ptr<QueuedTask> reply, |
| + TaskQueue::Impl* reply_queue); |
| + |
| + void PostDelayedTask(std::unique_ptr<QueuedTask> task, uint32_t milliseconds); |
| + |
| + private: |
| + static void ThreadMain(void* context); |
| + static void OnWakeup(int socket, short flags, void* context); // NOLINT |
| + static void RunTask(int fd, short flags, void* context); // NOLINT |
| + static void RunTimer(int fd, short flags, void* context); // NOLINT |
| + |
| + class ReplyTaskOwner; |
| + class PostAndReplyTask; |
| + class SetTimerTask; |
| + |
| + typedef RefCountedObject<ReplyTaskOwner> ReplyTaskOwnerRef; |
| + |
| + void PrepareReplyTask(scoped_refptr<ReplyTaskOwnerRef> reply_task); |
| + |
| + struct QueueContext; |
| + TaskQueue* const queue_; |
| + int wakeup_pipe_in_ = -1; |
| + int wakeup_pipe_out_ = -1; |
| + event_base* event_base_; |
| + std::unique_ptr<event> wakeup_event_; |
| + PlatformThread thread_; |
| + rtc::CriticalSection pending_lock_; |
| + std::list<std::unique_ptr<QueuedTask>> pending_ GUARDED_BY(pending_lock_); |
| + std::list<scoped_refptr<ReplyTaskOwnerRef>> pending_replies_ |
| + GUARDED_BY(pending_lock_); |
| +}; |
| + |
| +struct TaskQueue::Impl::QueueContext { |
| + explicit QueueContext(TaskQueue::Impl* q) : queue(q), is_active(true) {} |
| + TaskQueue::Impl* queue; |
| bool is_active; |
| // Holds a list of events pending timers for cleanup when the loop exits. |
| std::list<TimerEvent*> pending_timers_; |
| @@ -135,7 +187,7 @@ struct TaskQueue::QueueContext { |
| // * if set_should_run_task() was called, the reply task will be run |
| // * Release the reference to ReplyTaskOwner |
| // * ReplyTaskOwner and associated |reply_| are deleted. |
| -class TaskQueue::ReplyTaskOwner { |
| +class TaskQueue::Impl::ReplyTaskOwner { |
| public: |
| ReplyTaskOwner(std::unique_ptr<QueuedTask> reply) |
| : reply_(std::move(reply)) {} |
| @@ -159,11 +211,11 @@ class TaskQueue::ReplyTaskOwner { |
| bool run_task_ = false; |
| }; |
| -class TaskQueue::PostAndReplyTask : public QueuedTask { |
| +class TaskQueue::Impl::PostAndReplyTask : public QueuedTask { |
| public: |
| PostAndReplyTask(std::unique_ptr<QueuedTask> task, |
| std::unique_ptr<QueuedTask> reply, |
| - TaskQueue* reply_queue, |
| + TaskQueue::Impl* reply_queue, |
| int reply_pipe) |
| : task_(std::move(task)), |
| reply_pipe_(reply_pipe), |
| @@ -196,7 +248,7 @@ class TaskQueue::PostAndReplyTask : public QueuedTask { |
| scoped_refptr<RefCountedObject<ReplyTaskOwner>> reply_task_owner_; |
| }; |
| -class TaskQueue::SetTimerTask : public QueuedTask { |
| +class TaskQueue::Impl::SetTimerTask : public QueuedTask { |
| public: |
| SetTimerTask(std::unique_ptr<QueuedTask> task, uint32_t milliseconds) |
| : task_(std::move(task)), |
| @@ -208,7 +260,7 @@ class TaskQueue::SetTimerTask : public QueuedTask { |
| // Compensate for the time that has passed since construction |
| // and until we got here. |
| uint32_t post_time = Time32() - posted_; |
| - TaskQueue::Current()->PostDelayedTask( |
| + TaskQueue::Impl::Current()->PostDelayedTask( |
| std::move(task_), |
| post_time > milliseconds_ ? 0 : milliseconds_ - post_time); |
| return true; |
| @@ -219,10 +271,13 @@ class TaskQueue::SetTimerTask : public QueuedTask { |
| const uint32_t posted_; |
| }; |
| -TaskQueue::TaskQueue(const char* queue_name, Priority priority /*= NORMAL*/) |
| - : event_base_(event_base_new()), |
| +TaskQueue::Impl::Impl(const char* queue_name, |
| + TaskQueue* queue, |
| + Priority priority /*= NORMAL*/) |
| + : queue_(queue), |
| + event_base_(event_base_new()), |
| wakeup_event_(new event()), |
| - thread_(&TaskQueue::ThreadMain, |
| + thread_(&TaskQueue::Impl::ThreadMain, |
| this, |
| queue_name, |
| TaskQueuePriorityToThreadPriority(priority)) { |
| @@ -240,7 +295,7 @@ TaskQueue::TaskQueue(const char* queue_name, Priority priority /*= NORMAL*/) |
| thread_.Start(); |
| } |
| -TaskQueue::~TaskQueue() { |
| +TaskQueue::Impl::~Impl() { |
| RTC_DCHECK(!IsCurrent()); |
| struct timespec ts; |
| char message = kQuit; |
| @@ -267,29 +322,38 @@ TaskQueue::~TaskQueue() { |
| } |
| // static |
| -TaskQueue* TaskQueue::Current() { |
| +TaskQueue::Impl* TaskQueue::Impl::Current() { |
| QueueContext* ctx = |
| static_cast<QueueContext*>(pthread_getspecific(GetQueuePtrTls())); |
| return ctx ? ctx->queue : nullptr; |
| } |
| // static |
| -bool TaskQueue::IsCurrent(const char* queue_name) { |
|
nisse-webrtc
2017/08/22 13:39:44
Not new in this cl, but this looks questionable to
perkj_webrtc
2017/08/22 18:52:38
it probably isnt.
|
| - TaskQueue* current = Current(); |
| +TaskQueue* TaskQueue::Impl::CurrentQueue() { |
| + TaskQueue::Impl* current = Current(); |
| + if (current) { |
| + return current->queue_; |
| + } |
| + return nullptr; |
| +} |
| + |
| +// static |
| +bool TaskQueue::Impl::IsCurrent(const char* queue_name) { |
| + TaskQueue::Impl* current = Current(); |
| return current && current->thread_.name().compare(queue_name) == 0; |
| } |
| -bool TaskQueue::IsCurrent() const { |
| +bool TaskQueue::Impl::IsCurrent() const { |
| return IsThreadRefEqual(thread_.GetThreadRef(), CurrentThreadRef()); |
| } |
| -void TaskQueue::PostTask(std::unique_ptr<QueuedTask> task) { |
| +void TaskQueue::Impl::PostTask(std::unique_ptr<QueuedTask> task) { |
| RTC_DCHECK(task.get()); |
| // libevent isn't thread safe. This means that we can't use methods such |
| // as event_base_once to post tasks to the worker thread from a different |
| // thread. However, we can use it when posting from the worker thread itself. |
| if (IsCurrent()) { |
| - if (event_base_once(event_base_, -1, EV_TIMEOUT, &TaskQueue::RunTask, |
| + if (event_base_once(event_base_, -1, EV_TIMEOUT, &TaskQueue::Impl::RunTask, |
| task.get(), nullptr) == 0) { |
| task.release(); |
| } |
| @@ -310,11 +374,12 @@ void TaskQueue::PostTask(std::unique_ptr<QueuedTask> task) { |
| } |
| } |
| -void TaskQueue::PostDelayedTask(std::unique_ptr<QueuedTask> task, |
| - uint32_t milliseconds) { |
| +void TaskQueue::Impl::PostDelayedTask(std::unique_ptr<QueuedTask> task, |
| + uint32_t milliseconds) { |
| if (IsCurrent()) { |
| TimerEvent* timer = new TimerEvent(std::move(task)); |
| - EventAssign(&timer->ev, event_base_, -1, 0, &TaskQueue::RunTimer, timer); |
| + EventAssign(&timer->ev, event_base_, -1, 0, &TaskQueue::Impl::RunTimer, |
| + timer); |
| QueueContext* ctx = |
| static_cast<QueueContext*>(pthread_getspecific(GetQueuePtrTls())); |
| ctx->pending_timers_.push_back(timer); |
| @@ -327,23 +392,18 @@ void TaskQueue::PostDelayedTask(std::unique_ptr<QueuedTask> task, |
| } |
| } |
| -void TaskQueue::PostTaskAndReply(std::unique_ptr<QueuedTask> task, |
| - std::unique_ptr<QueuedTask> reply, |
| - TaskQueue* reply_queue) { |
| +void TaskQueue::Impl::PostTaskAndReply(std::unique_ptr<QueuedTask> task, |
| + std::unique_ptr<QueuedTask> reply, |
| + TaskQueue::Impl* reply_queue) { |
| std::unique_ptr<QueuedTask> wrapper_task( |
| new PostAndReplyTask(std::move(task), std::move(reply), reply_queue, |
| reply_queue->wakeup_pipe_in_)); |
| PostTask(std::move(wrapper_task)); |
| } |
| -void TaskQueue::PostTaskAndReply(std::unique_ptr<QueuedTask> task, |
| - std::unique_ptr<QueuedTask> reply) { |
| - return PostTaskAndReply(std::move(task), std::move(reply), Current()); |
| -} |
| - |
| // static |
| -void TaskQueue::ThreadMain(void* context) { |
| - TaskQueue* me = static_cast<TaskQueue*>(context); |
| +void TaskQueue::Impl::ThreadMain(void* context) { |
| + TaskQueue::Impl* me = static_cast<TaskQueue::Impl*>(context); |
| QueueContext queue_context(me); |
| pthread_setspecific(GetQueuePtrTls(), &queue_context); |
| @@ -358,7 +418,9 @@ void TaskQueue::ThreadMain(void* context) { |
| } |
| // static |
| -void TaskQueue::OnWakeup(int socket, short flags, void* context) { // NOLINT |
| +void TaskQueue::Impl::OnWakeup(int socket, |
| + short flags, |
| + void* context) { // NOLINT |
| QueueContext* ctx = |
| static_cast<QueueContext*>(pthread_getspecific(GetQueuePtrTls())); |
| RTC_DCHECK(ctx->queue->wakeup_pipe_out_ == socket); |
| @@ -405,14 +467,14 @@ void TaskQueue::OnWakeup(int socket, short flags, void* context) { // NOLINT |
| } |
| // static |
| -void TaskQueue::RunTask(int fd, short flags, void* context) { // NOLINT |
| +void TaskQueue::Impl::RunTask(int fd, short flags, void* context) { // NOLINT |
| auto* task = static_cast<QueuedTask*>(context); |
| if (task->Run()) |
| delete task; |
| } |
| // static |
| -void TaskQueue::RunTimer(int fd, short flags, void* context) { // NOLINT |
| +void TaskQueue::Impl::RunTimer(int fd, short flags, void* context) { // NOLINT |
| TimerEvent* timer = static_cast<TimerEvent*>(context); |
| if (!timer->task->Run()) |
| timer->task.release(); |
| @@ -422,10 +484,54 @@ void TaskQueue::RunTimer(int fd, short flags, void* context) { // NOLINT |
| delete timer; |
| } |
| -void TaskQueue::PrepareReplyTask(scoped_refptr<ReplyTaskOwnerRef> reply_task) { |
| +void TaskQueue::Impl::PrepareReplyTask( |
| + scoped_refptr<ReplyTaskOwnerRef> reply_task) { |
| RTC_DCHECK(reply_task); |
| CritScope lock(&pending_lock_); |
| pending_replies_.push_back(std::move(reply_task)); |
| } |
| +TaskQueue::TaskQueue(const char* queue_name, Priority priority) |
|
nisse-webrtc
2017/08/22 13:39:44
So an overriding implementation is supposed to def
perkj_webrtc
2017/08/22 18:52:38
I think it is ok to copy paste the boiler plate co
|
| + : impl_(new RefCountedObject<TaskQueue::Impl>(queue_name, this, priority)) { |
| +} |
| + |
| +TaskQueue::~TaskQueue() {} |
| + |
| +// static |
| +TaskQueue* TaskQueue::Current() { |
| + return TaskQueue::Impl::CurrentQueue(); |
| +} |
| + |
| +// Used for DCHECKing the current queue. |
| +// static |
| +bool TaskQueue::IsCurrent(const char* queue_name) { |
| + return TaskQueue::Impl::IsCurrent(queue_name); |
| +} |
| + |
| +bool TaskQueue::IsCurrent() const { |
| + return impl_->IsCurrent(); |
| +} |
| + |
| +void TaskQueue::PostTask(std::unique_ptr<QueuedTask> task) { |
| + return TaskQueue::impl_->PostTask(std::move(task)); |
| +} |
| + |
| +void TaskQueue::PostTaskAndReply(std::unique_ptr<QueuedTask> task, |
| + std::unique_ptr<QueuedTask> reply, |
| + TaskQueue* reply_queue) { |
| + return TaskQueue::impl_->PostTaskAndReply(std::move(task), std::move(reply), |
| + reply_queue->impl_.get()); |
| +} |
| + |
| +void TaskQueue::PostTaskAndReply(std::unique_ptr<QueuedTask> task, |
| + std::unique_ptr<QueuedTask> reply) { |
| + return TaskQueue::impl_->PostTaskAndReply(std::move(task), std::move(reply), |
| + impl_.get()); |
| +} |
| + |
| +void TaskQueue::PostDelayedTask(std::unique_ptr<QueuedTask> task, |
| + uint32_t milliseconds) { |
| + return TaskQueue::impl_->PostDelayedTask(std::move(task), milliseconds); |
| +} |
| + |
| } // namespace rtc |