| Index: webrtc/base/task_queue_libevent.cc
|
| diff --git a/webrtc/base/task_queue_libevent.cc b/webrtc/base/task_queue_libevent.cc
|
| index f3ab89be806984d87a7e604abb7189b665ef37f0..dbdee58e6b29f5864972734ce4ce1b01b9e78b42 100644
|
| --- a/webrtc/base/task_queue_libevent.cc
|
| +++ b/webrtc/base/task_queue_libevent.cc
|
| @@ -13,6 +13,7 @@
|
| #include <fcntl.h>
|
| #include <string.h>
|
| #include <unistd.h>
|
| +#include <time.h>
|
|
|
| #include "base/third_party/libevent/event.h"
|
| #include "webrtc/base/checks.h"
|
| @@ -71,7 +72,7 @@ struct TaskQueue::QueueContext {
|
| std::list<TimerEvent*> pending_timers_;
|
| };
|
|
|
| -class TaskQueue::PostAndReplyTask : public QueuedTask {
|
| +class TaskQueue::PostAndReplyTask {
|
| public:
|
| PostAndReplyTask(std::unique_ptr<QueuedTask> task,
|
| std::unique_ptr<QueuedTask> reply,
|
| @@ -79,13 +80,6 @@ class TaskQueue::PostAndReplyTask : public QueuedTask {
|
| : task_(std::move(task)),
|
| reply_(std::move(reply)),
|
| reply_queue_(reply_queue) {
|
| - reply_queue->PrepareReplyTask(this);
|
| - }
|
| -
|
| - ~PostAndReplyTask() override {
|
| - CritScope lock(&lock_);
|
| - if (reply_queue_)
|
| - reply_queue_->ReplyTaskDone(this);
|
| }
|
|
|
| void OnReplyQueueGone() {
|
| @@ -94,13 +88,15 @@ class TaskQueue::PostAndReplyTask : public QueuedTask {
|
| }
|
|
|
| private:
|
| - bool Run() override {
|
| + bool Run() {
|
| if (!task_->Run())
|
| task_.release();
|
|
|
| CritScope lock(&lock_);
|
| - if (reply_queue_)
|
| + if (reply_queue_) {
|
| reply_queue_->PostTask(std::move(reply_));
|
| + reply_queue_->PostAndReplyTaskDone(this);
|
| + }
|
| return true;
|
| }
|
|
|
| @@ -108,6 +104,19 @@ class TaskQueue::PostAndReplyTask : public QueuedTask {
|
| std::unique_ptr<QueuedTask> task_;
|
| std::unique_ptr<QueuedTask> reply_;
|
| TaskQueue* reply_queue_ GUARDED_BY(lock_);
|
| + friend class PostAndReplyTaskRefOwner;
|
| +};
|
| +
|
| +// Inherits from QueuedTask and owns a ref to PostAndReplyTaskRef.
|
| +class TaskQueue::PostAndReplyTaskRefOwner : public QueuedTask {
|
| + public:
|
| + PostAndReplyTaskRefOwner(scoped_refptr<PostAndReplyTaskRef> task)
|
| + : task_(task) {}
|
| +
|
| + private:
|
| + bool Run() override { return task_->Run(); }
|
| +
|
| + scoped_refptr<PostAndReplyTaskRef> task_;
|
| };
|
|
|
| class TaskQueue::SetTimerTask : public QueuedTask {
|
| @@ -152,6 +161,7 @@ TaskQueue::TaskQueue(const char* queue_name)
|
|
|
| TaskQueue::~TaskQueue() {
|
| RTC_DCHECK(!IsCurrent());
|
| + destroying_ = true;
|
| struct timespec ts;
|
| char message = kQuit;
|
| while (write(wakeup_pipe_in_, &message, sizeof(message)) != sizeof(message)) {
|
| @@ -170,14 +180,18 @@ TaskQueue::~TaskQueue() {
|
| wakeup_pipe_in_ = -1;
|
| wakeup_pipe_out_ = -1;
|
|
|
| + std::list<scoped_refptr<PostAndReplyTaskRef>> pending_replies;
|
| {
|
| // Synchronize against any pending reply tasks that might be running on
|
| - // other queues.
|
| + // other queues. Note that, since |destroying_| was set to true above, it
|
| + // will be impossible for |pending_replies_| to grow after this.
|
| CritScope lock(&pending_lock_);
|
| - for (auto* reply : pending_replies_)
|
| - reply->OnReplyQueueGone();
|
| - pending_replies_.clear();
|
| + pending_replies.swap(pending_replies_);
|
| }
|
| + for (auto reply : pending_replies) {
|
| + reply->OnReplyQueueGone();
|
| + }
|
| + pending_replies.clear();
|
|
|
| event_base_free(event_base_);
|
| }
|
| @@ -245,9 +259,10 @@ void TaskQueue::PostDelayedTask(std::unique_ptr<QueuedTask> task,
|
| void TaskQueue::PostTaskAndReply(std::unique_ptr<QueuedTask> task,
|
| std::unique_ptr<QueuedTask> reply,
|
| TaskQueue* reply_queue) {
|
| - std::unique_ptr<QueuedTask> wrapper_task(
|
| - new PostAndReplyTask(std::move(task), std::move(reply), reply_queue));
|
| - PostTask(std::move(wrapper_task));
|
| + scoped_refptr<PostAndReplyTaskRef> ref(
|
| + new PostAndReplyTaskRef(std::move(task), std::move(reply), reply_queue));
|
| + reply_queue->PrepareReplyTask(ref);
|
| + PostTask(std::unique_ptr<QueuedTask>(new PostAndReplyTaskRefOwner(ref)));
|
| }
|
|
|
| void TaskQueue::PostTaskAndReply(std::unique_ptr<QueuedTask> task,
|
| @@ -322,15 +337,24 @@ void TaskQueue::RunTimer(int fd, short flags, void* context) { // NOLINT
|
| delete timer;
|
| }
|
|
|
| -void TaskQueue::PrepareReplyTask(PostAndReplyTask* reply_task) {
|
| +void TaskQueue::PrepareReplyTask(
|
| + scoped_refptr<PostAndReplyTaskRef> reply_task) {
|
| RTC_DCHECK(reply_task);
|
| CritScope lock(&pending_lock_);
|
| + if (destroying_) {
|
| + return;
|
| + }
|
| pending_replies_.push_back(reply_task);
|
| }
|
|
|
| -void TaskQueue::ReplyTaskDone(PostAndReplyTask* reply_task) {
|
| +void TaskQueue::PostAndReplyTaskDone(PostAndReplyTask* reply_task) {
|
| CritScope lock(&pending_lock_);
|
| - pending_replies_.remove(reply_task);
|
| + // Note: If the destructor is currently being run, and "pending_replies_" is
|
| + // already swapped out, this may fail to find |reply_task|, which is ok.
|
| + pending_replies_.remove_if(
|
| + [reply_task](scoped_refptr<PostAndReplyTaskRef> task) {
|
| + return task.get() == reply_task;
|
| + });
|
| }
|
|
|
| } // namespace rtc
|
|
|