Index: webrtc/rtc_base/task_queue_libevent.cc |
diff --git a/webrtc/rtc_base/task_queue_libevent.cc b/webrtc/rtc_base/task_queue_libevent.cc |
index db267dc80be893aa7715f5dea97ed2a57a93f47c..99b88df363c939c1deace1b142dedb3d9c3b7476 100644 |
--- a/webrtc/rtc_base/task_queue_libevent.cc |
+++ b/webrtc/rtc_base/task_queue_libevent.cc |
@@ -18,7 +18,11 @@ |
#include "base/third_party/libevent/event.h" |
#include "webrtc/rtc_base/checks.h" |
#include "webrtc/rtc_base/logging.h" |
+#include "webrtc/rtc_base/platform_thread.h" |
+#include "webrtc/rtc_base/refcount.h" |
+#include "webrtc/rtc_base/refcountedobject.h" |
#include "webrtc/rtc_base/safe_conversions.h" |
+#include "webrtc/rtc_base/task_queue.h" |
#include "webrtc/rtc_base/task_queue_posix.h" |
#include "webrtc/rtc_base/timeutils.h" |
@@ -104,9 +108,57 @@ ThreadPriority TaskQueuePriorityToThreadPriority(Priority priority) { |
} |
} // namespace |
-struct TaskQueue::QueueContext { |
- explicit QueueContext(TaskQueue* q) : queue(q), is_active(true) {} |
- TaskQueue* queue; |
+class TaskQueue::Impl : public RefCountInterface { |
+ public: |
+ explicit Impl(const char* queue_name, |
+ TaskQueue* queue, |
+ Priority priority = Priority::NORMAL); |
+ ~Impl() override; |
+ |
+ static TaskQueue::Impl* Current(); |
+ static TaskQueue* CurrentQueue(); |
+ |
+ // Used for DCHECKing the current queue. |
+ static bool IsCurrent(const char* queue_name); |
+ bool IsCurrent() const; |
+ |
+ void PostTask(std::unique_ptr<QueuedTask> task); |
+ void PostTaskAndReply(std::unique_ptr<QueuedTask> task, |
+ std::unique_ptr<QueuedTask> reply, |
+ TaskQueue::Impl* reply_queue); |
+ |
+ void PostDelayedTask(std::unique_ptr<QueuedTask> task, uint32_t milliseconds); |
+ |
+ private: |
+ static void ThreadMain(void* context); |
+ static void OnWakeup(int socket, short flags, void* context); // NOLINT |
+ static void RunTask(int fd, short flags, void* context); // NOLINT |
+ static void RunTimer(int fd, short flags, void* context); // NOLINT |
+ |
+ class ReplyTaskOwner; |
+ class PostAndReplyTask; |
+ class SetTimerTask; |
+ |
+ typedef RefCountedObject<ReplyTaskOwner> ReplyTaskOwnerRef; |
+ |
+ void PrepareReplyTask(scoped_refptr<ReplyTaskOwnerRef> reply_task); |
+ |
+ struct QueueContext; |
+ TaskQueue* const queue_; |
+ int wakeup_pipe_in_ = -1; |
+ int wakeup_pipe_out_ = -1; |
+ event_base* event_base_; |
+ std::unique_ptr<event> wakeup_event_; |
+ PlatformThread thread_; |
+ rtc::CriticalSection pending_lock_; |
+ std::list<std::unique_ptr<QueuedTask>> pending_ GUARDED_BY(pending_lock_); |
+ std::list<scoped_refptr<ReplyTaskOwnerRef>> pending_replies_ |
+ GUARDED_BY(pending_lock_); |
+}; |
+ |
+struct TaskQueue::Impl::QueueContext { |
+ explicit QueueContext(TaskQueue::Impl* q) : queue(q), is_active(true) {} |
+ TaskQueue::Impl* queue; |
bool is_active; |
// Holds a list of events pending timers for cleanup when the loop exits. |
std::list<TimerEvent*> pending_timers_; |
@@ -135,7 +187,7 @@ struct TaskQueue::QueueContext { |
// * if set_should_run_task() was called, the reply task will be run |
// * Release the reference to ReplyTaskOwner |
// * ReplyTaskOwner and associated |reply_| are deleted. |
-class TaskQueue::ReplyTaskOwner { |
+class TaskQueue::Impl::ReplyTaskOwner { |
public: |
ReplyTaskOwner(std::unique_ptr<QueuedTask> reply) |
: reply_(std::move(reply)) {} |
@@ -159,11 +211,11 @@ class TaskQueue::ReplyTaskOwner { |
bool run_task_ = false; |
}; |
-class TaskQueue::PostAndReplyTask : public QueuedTask { |
+class TaskQueue::Impl::PostAndReplyTask : public QueuedTask { |
public: |
PostAndReplyTask(std::unique_ptr<QueuedTask> task, |
std::unique_ptr<QueuedTask> reply, |
- TaskQueue* reply_queue, |
+ TaskQueue::Impl* reply_queue, |
int reply_pipe) |
: task_(std::move(task)), |
reply_pipe_(reply_pipe), |
@@ -196,7 +248,7 @@ class TaskQueue::PostAndReplyTask : public QueuedTask { |
scoped_refptr<RefCountedObject<ReplyTaskOwner>> reply_task_owner_; |
}; |
-class TaskQueue::SetTimerTask : public QueuedTask { |
+class TaskQueue::Impl::SetTimerTask : public QueuedTask { |
public: |
SetTimerTask(std::unique_ptr<QueuedTask> task, uint32_t milliseconds) |
: task_(std::move(task)), |
@@ -208,7 +260,7 @@ class TaskQueue::SetTimerTask : public QueuedTask { |
// Compensate for the time that has passed since construction |
// and until we got here. |
uint32_t post_time = Time32() - posted_; |
- TaskQueue::Current()->PostDelayedTask( |
+ TaskQueue::Impl::Current()->PostDelayedTask( |
std::move(task_), |
post_time > milliseconds_ ? 0 : milliseconds_ - post_time); |
return true; |
@@ -219,10 +271,13 @@ class TaskQueue::SetTimerTask : public QueuedTask { |
const uint32_t posted_; |
}; |
-TaskQueue::TaskQueue(const char* queue_name, Priority priority /*= NORMAL*/) |
- : event_base_(event_base_new()), |
+TaskQueue::Impl::Impl(const char* queue_name, |
+ TaskQueue* queue, |
+ Priority priority /*= NORMAL*/) |
+ : queue_(queue), |
+ event_base_(event_base_new()), |
wakeup_event_(new event()), |
- thread_(&TaskQueue::ThreadMain, |
+ thread_(&TaskQueue::Impl::ThreadMain, |
this, |
queue_name, |
TaskQueuePriorityToThreadPriority(priority)) { |
@@ -240,7 +295,7 @@ TaskQueue::TaskQueue(const char* queue_name, Priority priority /*= NORMAL*/) |
thread_.Start(); |
} |
-TaskQueue::~TaskQueue() { |
+TaskQueue::Impl::~Impl() { |
RTC_DCHECK(!IsCurrent()); |
struct timespec ts; |
char message = kQuit; |
@@ -267,29 +322,38 @@ TaskQueue::~TaskQueue() { |
} |
// static |
-TaskQueue* TaskQueue::Current() { |
+TaskQueue::Impl* TaskQueue::Impl::Current() { |
QueueContext* ctx = |
static_cast<QueueContext*>(pthread_getspecific(GetQueuePtrTls())); |
return ctx ? ctx->queue : nullptr; |
} |
// static |
-bool TaskQueue::IsCurrent(const char* queue_name) { |
- TaskQueue* current = Current(); |
+TaskQueue* TaskQueue::Impl::CurrentQueue() { |
+ TaskQueue::Impl* current = Current(); |
+ if (current) { |
+ return current->queue_; |
+ } |
+ return nullptr; |
+} |
+ |
+// static |
+bool TaskQueue::Impl::IsCurrent(const char* queue_name) { |
+ TaskQueue::Impl* current = Current(); |
return current && current->thread_.name().compare(queue_name) == 0; |
} |
-bool TaskQueue::IsCurrent() const { |
+bool TaskQueue::Impl::IsCurrent() const { |
return IsThreadRefEqual(thread_.GetThreadRef(), CurrentThreadRef()); |
} |
-void TaskQueue::PostTask(std::unique_ptr<QueuedTask> task) { |
+void TaskQueue::Impl::PostTask(std::unique_ptr<QueuedTask> task) { |
RTC_DCHECK(task.get()); |
// libevent isn't thread safe. This means that we can't use methods such |
// as event_base_once to post tasks to the worker thread from a different |
// thread. However, we can use it when posting from the worker thread itself. |
if (IsCurrent()) { |
- if (event_base_once(event_base_, -1, EV_TIMEOUT, &TaskQueue::RunTask, |
+ if (event_base_once(event_base_, -1, EV_TIMEOUT, &TaskQueue::Impl::RunTask, |
task.get(), nullptr) == 0) { |
task.release(); |
} |
@@ -310,11 +374,12 @@ void TaskQueue::PostTask(std::unique_ptr<QueuedTask> task) { |
} |
} |
-void TaskQueue::PostDelayedTask(std::unique_ptr<QueuedTask> task, |
- uint32_t milliseconds) { |
+void TaskQueue::Impl::PostDelayedTask(std::unique_ptr<QueuedTask> task, |
+ uint32_t milliseconds) { |
if (IsCurrent()) { |
TimerEvent* timer = new TimerEvent(std::move(task)); |
- EventAssign(&timer->ev, event_base_, -1, 0, &TaskQueue::RunTimer, timer); |
+ EventAssign(&timer->ev, event_base_, -1, 0, &TaskQueue::Impl::RunTimer, |
+ timer); |
QueueContext* ctx = |
static_cast<QueueContext*>(pthread_getspecific(GetQueuePtrTls())); |
ctx->pending_timers_.push_back(timer); |
@@ -327,23 +392,18 @@ void TaskQueue::PostDelayedTask(std::unique_ptr<QueuedTask> task, |
} |
} |
-void TaskQueue::PostTaskAndReply(std::unique_ptr<QueuedTask> task, |
- std::unique_ptr<QueuedTask> reply, |
- TaskQueue* reply_queue) { |
+void TaskQueue::Impl::PostTaskAndReply(std::unique_ptr<QueuedTask> task, |
+ std::unique_ptr<QueuedTask> reply, |
+ TaskQueue::Impl* reply_queue) { |
std::unique_ptr<QueuedTask> wrapper_task( |
new PostAndReplyTask(std::move(task), std::move(reply), reply_queue, |
reply_queue->wakeup_pipe_in_)); |
PostTask(std::move(wrapper_task)); |
} |
-void TaskQueue::PostTaskAndReply(std::unique_ptr<QueuedTask> task, |
- std::unique_ptr<QueuedTask> reply) { |
- return PostTaskAndReply(std::move(task), std::move(reply), Current()); |
-} |
- |
// static |
-void TaskQueue::ThreadMain(void* context) { |
- TaskQueue* me = static_cast<TaskQueue*>(context); |
+void TaskQueue::Impl::ThreadMain(void* context) { |
+ TaskQueue::Impl* me = static_cast<TaskQueue::Impl*>(context); |
QueueContext queue_context(me); |
pthread_setspecific(GetQueuePtrTls(), &queue_context); |
@@ -358,7 +418,9 @@ void TaskQueue::ThreadMain(void* context) { |
} |
// static |
-void TaskQueue::OnWakeup(int socket, short flags, void* context) { // NOLINT |
+void TaskQueue::Impl::OnWakeup(int socket, |
+ short flags, |
+ void* context) { // NOLINT |
QueueContext* ctx = |
static_cast<QueueContext*>(pthread_getspecific(GetQueuePtrTls())); |
RTC_DCHECK(ctx->queue->wakeup_pipe_out_ == socket); |
@@ -405,14 +467,14 @@ void TaskQueue::OnWakeup(int socket, short flags, void* context) { // NOLINT |
} |
// static |
-void TaskQueue::RunTask(int fd, short flags, void* context) { // NOLINT |
+void TaskQueue::Impl::RunTask(int fd, short flags, void* context) { // NOLINT |
auto* task = static_cast<QueuedTask*>(context); |
if (task->Run()) |
delete task; |
} |
// static |
-void TaskQueue::RunTimer(int fd, short flags, void* context) { // NOLINT |
+void TaskQueue::Impl::RunTimer(int fd, short flags, void* context) { // NOLINT |
TimerEvent* timer = static_cast<TimerEvent*>(context); |
if (!timer->task->Run()) |
timer->task.release(); |
@@ -422,10 +484,54 @@ void TaskQueue::RunTimer(int fd, short flags, void* context) { // NOLINT |
delete timer; |
} |
-void TaskQueue::PrepareReplyTask(scoped_refptr<ReplyTaskOwnerRef> reply_task) { |
+void TaskQueue::Impl::PrepareReplyTask( |
+ scoped_refptr<ReplyTaskOwnerRef> reply_task) { |
RTC_DCHECK(reply_task); |
CritScope lock(&pending_lock_); |
pending_replies_.push_back(std::move(reply_task)); |
} |
+TaskQueue::TaskQueue(const char* queue_name, Priority priority) |
+ : impl_(new RefCountedObject<TaskQueue::Impl>(queue_name, this, priority)) { |
+} |
+ |
+TaskQueue::~TaskQueue() {} |
+ |
+// static |
+TaskQueue* TaskQueue::Current() { |
+ return TaskQueue::Impl::CurrentQueue(); |
+} |
+ |
+// Used for DCHECKing the current queue. |
+// static |
+bool TaskQueue::IsCurrent(const char* queue_name) { |
+ return TaskQueue::Impl::IsCurrent(queue_name); |
+} |
+ |
+bool TaskQueue::IsCurrent() const { |
+ return impl_->IsCurrent(); |
+} |
+ |
+void TaskQueue::PostTask(std::unique_ptr<QueuedTask> task) { |
+ return TaskQueue::impl_->PostTask(std::move(task)); |
+} |
+ |
+void TaskQueue::PostTaskAndReply(std::unique_ptr<QueuedTask> task, |
+ std::unique_ptr<QueuedTask> reply, |
+ TaskQueue* reply_queue) { |
+ return TaskQueue::impl_->PostTaskAndReply(std::move(task), std::move(reply), |
+ reply_queue->impl_.get()); |
+} |
+ |
+void TaskQueue::PostTaskAndReply(std::unique_ptr<QueuedTask> task, |
+ std::unique_ptr<QueuedTask> reply) { |
+ return TaskQueue::impl_->PostTaskAndReply(std::move(task), std::move(reply), |
+ impl_.get()); |
+} |
+ |
+void TaskQueue::PostDelayedTask(std::unique_ptr<QueuedTask> task, |
+ uint32_t milliseconds) { |
+ return TaskQueue::impl_->PostDelayedTask(std::move(task), milliseconds); |
+} |
+ |
} // namespace rtc |