Index: webrtc/base/task_queue_libevent.cc |
diff --git a/webrtc/base/task_queue_libevent.cc b/webrtc/base/task_queue_libevent.cc |
index f3ab89be806984d87a7e604abb7189b665ef37f0..dbdee58e6b29f5864972734ce4ce1b01b9e78b42 100644 |
--- a/webrtc/base/task_queue_libevent.cc |
+++ b/webrtc/base/task_queue_libevent.cc |
@@ -13,6 +13,7 @@ |
#include <fcntl.h> |
#include <string.h> |
#include <unistd.h> |
+#include <time.h> |
#include "base/third_party/libevent/event.h" |
#include "webrtc/base/checks.h" |
@@ -71,7 +72,7 @@ struct TaskQueue::QueueContext { |
std::list<TimerEvent*> pending_timers_; |
}; |
-class TaskQueue::PostAndReplyTask : public QueuedTask { |
+class TaskQueue::PostAndReplyTask { |
public: |
PostAndReplyTask(std::unique_ptr<QueuedTask> task, |
std::unique_ptr<QueuedTask> reply, |
@@ -79,13 +80,6 @@ class TaskQueue::PostAndReplyTask : public QueuedTask { |
: task_(std::move(task)), |
reply_(std::move(reply)), |
reply_queue_(reply_queue) { |
- reply_queue->PrepareReplyTask(this); |
- } |
- |
- ~PostAndReplyTask() override { |
- CritScope lock(&lock_); |
- if (reply_queue_) |
- reply_queue_->ReplyTaskDone(this); |
} |
void OnReplyQueueGone() { |
@@ -94,13 +88,15 @@ class TaskQueue::PostAndReplyTask : public QueuedTask { |
} |
private: |
- bool Run() override { |
+ bool Run() { |
if (!task_->Run()) |
task_.release(); |
CritScope lock(&lock_); |
- if (reply_queue_) |
+ if (reply_queue_) { |
reply_queue_->PostTask(std::move(reply_)); |
+ reply_queue_->PostAndReplyTaskDone(this); |
+ } |
return true; |
} |
@@ -108,6 +104,19 @@ class TaskQueue::PostAndReplyTask : public QueuedTask { |
std::unique_ptr<QueuedTask> task_; |
std::unique_ptr<QueuedTask> reply_; |
TaskQueue* reply_queue_ GUARDED_BY(lock_); |
+ friend class PostAndReplyTaskRefOwner; |
+}; |
+ |
+// Inherits from QueuedTask and owns a ref to PostAndReplyTaskRef. |
+class TaskQueue::PostAndReplyTaskRefOwner : public QueuedTask { |
+ public: |
+ PostAndReplyTaskRefOwner(scoped_refptr<PostAndReplyTaskRef> task) |
+ : task_(task) {} |
+ |
+ private: |
+ bool Run() override { return task_->Run(); } |
+ |
+ scoped_refptr<PostAndReplyTaskRef> task_; |
}; |
class TaskQueue::SetTimerTask : public QueuedTask { |
@@ -152,6 +161,7 @@ TaskQueue::TaskQueue(const char* queue_name) |
TaskQueue::~TaskQueue() { |
RTC_DCHECK(!IsCurrent()); |
+ destroying_ = true; |
struct timespec ts; |
char message = kQuit; |
while (write(wakeup_pipe_in_, &message, sizeof(message)) != sizeof(message)) { |
@@ -170,14 +180,18 @@ TaskQueue::~TaskQueue() { |
wakeup_pipe_in_ = -1; |
wakeup_pipe_out_ = -1; |
+ std::list<scoped_refptr<PostAndReplyTaskRef>> pending_replies; |
{ |
// Synchronize against any pending reply tasks that might be running on |
- // other queues. |
+ // other queues. Note that, since |destroying_| was set to true above, it |
+ // will be impossible for |pending_replies_| to grow after this. |
CritScope lock(&pending_lock_); |
- for (auto* reply : pending_replies_) |
- reply->OnReplyQueueGone(); |
- pending_replies_.clear(); |
+ pending_replies.swap(pending_replies_); |
} |
+ for (auto reply : pending_replies) { |
+ reply->OnReplyQueueGone(); |
+ } |
+ pending_replies.clear(); |
event_base_free(event_base_); |
} |
@@ -245,9 +259,10 @@ void TaskQueue::PostDelayedTask(std::unique_ptr<QueuedTask> task, |
void TaskQueue::PostTaskAndReply(std::unique_ptr<QueuedTask> task, |
std::unique_ptr<QueuedTask> reply, |
TaskQueue* reply_queue) { |
- std::unique_ptr<QueuedTask> wrapper_task( |
- new PostAndReplyTask(std::move(task), std::move(reply), reply_queue)); |
- PostTask(std::move(wrapper_task)); |
+ scoped_refptr<PostAndReplyTaskRef> ref( |
+ new PostAndReplyTaskRef(std::move(task), std::move(reply), reply_queue)); |
+ reply_queue->PrepareReplyTask(ref); |
+ PostTask(std::unique_ptr<QueuedTask>(new PostAndReplyTaskRefOwner(ref))); |
} |
void TaskQueue::PostTaskAndReply(std::unique_ptr<QueuedTask> task, |
@@ -322,15 +337,24 @@ void TaskQueue::RunTimer(int fd, short flags, void* context) { // NOLINT |
delete timer; |
} |
-void TaskQueue::PrepareReplyTask(PostAndReplyTask* reply_task) { |
+void TaskQueue::PrepareReplyTask( |
+ scoped_refptr<PostAndReplyTaskRef> reply_task) { |
RTC_DCHECK(reply_task); |
CritScope lock(&pending_lock_); |
+ if (destroying_) { |
+ return; |
+ } |
pending_replies_.push_back(reply_task); |
} |
-void TaskQueue::ReplyTaskDone(PostAndReplyTask* reply_task) { |
+void TaskQueue::PostAndReplyTaskDone(PostAndReplyTask* reply_task) { |
CritScope lock(&pending_lock_); |
- pending_replies_.remove(reply_task); |
+ // Note: If the destructor is currently being run, and "pending_replies_" is |
+ // already swapped out, this may fail to find |reply_task|, which is ok. |
+ pending_replies_.remove_if( |
+ [reply_task](scoped_refptr<PostAndReplyTaskRef> task) { |
+ return task.get() == reply_task; |
+ }); |
} |
} // namespace rtc |