Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(722)

Unified Diff: webrtc/base/task_queue_libevent.cc

Issue 2709603002: Fix potential deadlock in TaskQueue's libevent PostTaskAndReply implementation (Closed)
Patch Set: Address comments Created 3 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « webrtc/base/task_queue_gcd.cc ('k') | webrtc/base/task_queue_unittest.cc » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: webrtc/base/task_queue_libevent.cc
diff --git a/webrtc/base/task_queue_libevent.cc b/webrtc/base/task_queue_libevent.cc
index f3ab89be806984d87a7e604abb7189b665ef37f0..a4f592e1bbaf60d97bf6f6b4624b3b273dabe07c 100644
--- a/webrtc/base/task_queue_libevent.cc
+++ b/webrtc/base/task_queue_libevent.cc
@@ -11,6 +11,7 @@
#include "webrtc/base/task_queue.h"
#include <fcntl.h>
+#include <signal.h>
#include <string.h>
#include <unistd.h>
@@ -27,6 +28,28 @@ using internal::AutoSetCurrentQueuePtr;
namespace {
static const char kQuit = 1;
static const char kRunTask = 2;
+static const char kRunReplyTask = 3;
+
+// This ignores the SIGPIPE signal on the calling thread.
+// This signal can be fired when trying to write() to a pipe that's being
+// closed or while closing a pipe that's being written to.
+// We can run into that situation (e.g. reply tasks that don't get a chance to
+// run because the task queue is being deleted) so we ignore this signal and
+// continue as normal.
+// As a side note for this implementation, it would be great if we could safely
+// restore the sigmask, but unfortunately the operation of restoring it, can
+// itself actually cause SIGPIPE to be signaled :-| (e.g. on MacOS)
+// The SIGPIPE signal by default causes the process to be terminated, so we
+// don't want to risk that.
+// An alternative to this approach is to ignore the signal for the whole
+// process:
+// signal(SIGPIPE, SIG_IGN);
+void IgnoreSigPipeSignalOnCurrentThread() {
+ sigset_t sigpipe_mask;
+ sigemptyset(&sigpipe_mask);
+ sigaddset(&sigpipe_mask, SIGPIPE);
+ pthread_sigmask(SIG_BLOCK, &sigpipe_mask, nullptr);
+}
struct TimerEvent {
explicit TimerEvent(std::unique_ptr<QueuedTask> task)
@@ -71,43 +94,88 @@ struct TaskQueue::QueueContext {
std::list<TimerEvent*> pending_timers_;
};
+// Posting a reply task is tricky business. This class owns the reply task
+// and a reference to it is held by both the reply queue and the first task.
+// Here's an outline of what happens when dealing with a reply task.
+// * The ReplyTaskOwner owns the |reply_| task.
+// * One ref owned by PostAndReplyTask
+// * One ref owned by the reply TaskQueue
+// * ReplyTaskOwner has a flag |run_task_| initially set to false.
+// * ReplyTaskOwner has a method: HasOneRef() (provided by RefCountedObject).
+// * After successfully running the original |task_|, PostAndReplyTask() calls
+// set_should_run_task(). This sets |run_task_| to true.
+// * In PostAndReplyTask's dtor:
+// * It releases its reference to ReplyTaskOwner (important to do this first).
+// * Sends (write()) a kRunReplyTask message to the reply queue's pipe.
+// * PostAndReplyTask doesn't care if write() fails, but when it does:
+// * The reply queue is gone.
+// * ReplyTaskOwner has already been deleted and the reply task too.
+// * If write() succeeds:
+// * ReplyQueue receives the kRunReplyTask message
+// * Goes through all pending tasks, finding the first that HasOneRef()
+// * Calls ReplyTaskOwner::Run()
+// * if set_should_run_task() was called, the reply task will be run
+// * Release the reference to ReplyTaskOwner
+// * ReplyTaskOwner and associated |reply_| are deleted.
+class TaskQueue::ReplyTaskOwner {
+ public:
+ ReplyTaskOwner(std::unique_ptr<QueuedTask> reply)
+ : reply_(std::move(reply)) {}
+
+ void Run() {
+ RTC_DCHECK(reply_);
+ if (run_task_) {
+ if (!reply_->Run())
+ reply_.release();
+ }
+ reply_.reset();
+ }
+
+ void set_should_run_task() {
+ RTC_DCHECK(!run_task_);
+ run_task_ = true;
+ }
+
+ private:
+ std::unique_ptr<QueuedTask> reply_;
+ bool run_task_ = false;
+};
+
class TaskQueue::PostAndReplyTask : public QueuedTask {
public:
PostAndReplyTask(std::unique_ptr<QueuedTask> task,
std::unique_ptr<QueuedTask> reply,
- TaskQueue* reply_queue)
+ TaskQueue* reply_queue,
+ int reply_pipe)
: task_(std::move(task)),
- reply_(std::move(reply)),
- reply_queue_(reply_queue) {
- reply_queue->PrepareReplyTask(this);
+ reply_pipe_(reply_pipe),
+ reply_task_owner_(
+ new RefCountedObject<ReplyTaskOwner>(std::move(reply))) {
+ reply_queue->PrepareReplyTask(reply_task_owner_);
}
~PostAndReplyTask() override {
- CritScope lock(&lock_);
- if (reply_queue_)
- reply_queue_->ReplyTaskDone(this);
- }
-
- void OnReplyQueueGone() {
- CritScope lock(&lock_);
- reply_queue_ = nullptr;
+ reply_task_owner_ = nullptr;
+ IgnoreSigPipeSignalOnCurrentThread();
+ // Send a signal to the reply queue that the reply task can run now.
+ // Depending on whether |set_should_run_task()| was called by the
+ // PostAndReplyTask(), the reply task may or may not actually run.
+ // In either case, it will be deleted.
+ char message = kRunReplyTask;
+ write(reply_pipe_, &message, sizeof(message));
}
private:
bool Run() override {
if (!task_->Run())
task_.release();
-
- CritScope lock(&lock_);
- if (reply_queue_)
- reply_queue_->PostTask(std::move(reply_));
+ reply_task_owner_->set_should_run_task();
return true;
}
- CriticalSection lock_;
std::unique_ptr<QueuedTask> task_;
- std::unique_ptr<QueuedTask> reply_;
- TaskQueue* reply_queue_ GUARDED_BY(lock_);
+ int reply_pipe_;
+ scoped_refptr<RefCountedObject<ReplyTaskOwner>> reply_task_owner_;
};
class TaskQueue::SetTimerTask : public QueuedTask {
@@ -144,6 +212,7 @@ TaskQueue::TaskQueue(const char* queue_name)
SetNonBlocking(fds[1]);
wakeup_pipe_out_ = fds[0];
wakeup_pipe_in_ = fds[1];
+
EventAssign(wakeup_event_.get(), event_base_, wakeup_pipe_out_,
EV_READ | EV_PERSIST, OnWakeup, this);
event_add(wakeup_event_.get(), 0);
@@ -165,20 +234,14 @@ TaskQueue::~TaskQueue() {
thread_.Stop();
event_del(wakeup_event_.get());
+
+ IgnoreSigPipeSignalOnCurrentThread();
+
close(wakeup_pipe_in_);
close(wakeup_pipe_out_);
wakeup_pipe_in_ = -1;
wakeup_pipe_out_ = -1;
- {
- // Synchronize against any pending reply tasks that might be running on
- // other queues.
- CritScope lock(&pending_lock_);
- for (auto* reply : pending_replies_)
- reply->OnReplyQueueGone();
- pending_replies_.clear();
- }
-
event_base_free(event_base_);
}
@@ -246,7 +309,8 @@ void TaskQueue::PostTaskAndReply(std::unique_ptr<QueuedTask> task,
std::unique_ptr<QueuedTask> reply,
TaskQueue* reply_queue) {
std::unique_ptr<QueuedTask> wrapper_task(
- new PostAndReplyTask(std::move(task), std::move(reply), reply_queue));
+ new PostAndReplyTask(std::move(task), std::move(reply), reply_queue,
+ reply_queue->wakeup_pipe_in_));
PostTask(std::move(wrapper_task));
}
@@ -298,6 +362,22 @@ void TaskQueue::OnWakeup(int socket, short flags, void* context) { // NOLINT
task.release();
break;
}
+ case kRunReplyTask: {
+ scoped_refptr<ReplyTaskOwnerRef> reply_task;
+ {
+ CritScope lock(&ctx->queue->pending_lock_);
+ for (auto it = ctx->queue->pending_replies_.begin();
+ it != ctx->queue->pending_replies_.end(); ++it) {
+ if ((*it)->HasOneRef()) {
+ reply_task = std::move(*it);
+ ctx->queue->pending_replies_.erase(it);
+ break;
+ }
+ }
+ }
+ reply_task->Run();
+ break;
+ }
default:
RTC_NOTREACHED();
break;
@@ -322,15 +402,10 @@ void TaskQueue::RunTimer(int fd, short flags, void* context) { // NOLINT
delete timer;
}
-void TaskQueue::PrepareReplyTask(PostAndReplyTask* reply_task) {
+void TaskQueue::PrepareReplyTask(scoped_refptr<ReplyTaskOwnerRef> reply_task) {
RTC_DCHECK(reply_task);
CritScope lock(&pending_lock_);
- pending_replies_.push_back(reply_task);
-}
-
-void TaskQueue::ReplyTaskDone(PostAndReplyTask* reply_task) {
- CritScope lock(&pending_lock_);
- pending_replies_.remove(reply_task);
+ pending_replies_.push_back(std::move(reply_task));
}
} // namespace rtc
« no previous file with comments | « webrtc/base/task_queue_gcd.cc ('k') | webrtc/base/task_queue_unittest.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698