Index: webrtc/base/task_queue_libevent.cc |
diff --git a/webrtc/base/task_queue_libevent.cc b/webrtc/base/task_queue_libevent.cc |
index f3ab89be806984d87a7e604abb7189b665ef37f0..a4f592e1bbaf60d97bf6f6b4624b3b273dabe07c 100644 |
--- a/webrtc/base/task_queue_libevent.cc |
+++ b/webrtc/base/task_queue_libevent.cc |
@@ -11,6 +11,7 @@ |
#include "webrtc/base/task_queue.h" |
#include <fcntl.h> |
+#include <signal.h> |
#include <string.h> |
#include <unistd.h> |
@@ -27,6 +28,28 @@ using internal::AutoSetCurrentQueuePtr; |
namespace { |
static const char kQuit = 1; |
static const char kRunTask = 2; |
+static const char kRunReplyTask = 3; |
+ |
+// This ignores the SIGPIPE signal on the calling thread. |
+// This signal can be fired when trying to write() to a pipe that's being |
+// closed or while closing a pipe that's being written to. |
+// We can run into that situation (e.g. reply tasks that don't get a chance to |
+// run because the task queue is being deleted) so we ignore this signal and |
+// continue as normal. |
+// As a side note for this implementation, it would be great if we could safely |
+// restore the sigmask, but unfortunately the operation of restoring it, can |
+// itself actually cause SIGPIPE to be signaled :-| (e.g. on MacOS) |
+// The SIGPIPE signal by default causes the process to be terminated, so we |
+// don't want to risk that. |
+// An alternative to this approach is to ignore the signal for the whole |
+// process: |
+// signal(SIGPIPE, SIG_IGN); |
+void IgnoreSigPipeSignalOnCurrentThread() { |
+ sigset_t sigpipe_mask; |
+ sigemptyset(&sigpipe_mask); |
+ sigaddset(&sigpipe_mask, SIGPIPE); |
+ pthread_sigmask(SIG_BLOCK, &sigpipe_mask, nullptr); |
+} |
struct TimerEvent { |
explicit TimerEvent(std::unique_ptr<QueuedTask> task) |
@@ -71,43 +94,88 @@ struct TaskQueue::QueueContext { |
std::list<TimerEvent*> pending_timers_; |
}; |
+// Posting a reply task is tricky business. This class owns the reply task |
+// and a reference to it is held by both the reply queue and the first task. |
+// Here's an outline of what happens when dealing with a reply task. |
+// * The ReplyTaskOwner owns the |reply_| task. |
+// * One ref owned by PostAndReplyTask |
+// * One ref owned by the reply TaskQueue |
+// * ReplyTaskOwner has a flag |run_task_| initially set to false. |
+// * ReplyTaskOwner has a method: HasOneRef() (provided by RefCountedObject). |
+// * After successfully running the original |task_|, PostAndReplyTask() calls |
+// set_should_run_task(). This sets |run_task_| to true. |
+// * In PostAndReplyTask's dtor: |
+// * It releases its reference to ReplyTaskOwner (important to do this first). |
+// * Sends (write()) a kRunReplyTask message to the reply queue's pipe. |
+// * PostAndReplyTask doesn't care if write() fails, but when it does: |
+// * The reply queue is gone. |
+// * ReplyTaskOwner has already been deleted and the reply task too. |
+// * If write() succeeds: |
+// * ReplyQueue receives the kRunReplyTask message |
+// * Goes through all pending tasks, finding the first that HasOneRef() |
+// * Calls ReplyTaskOwner::Run() |
+// * if set_should_run_task() was called, the reply task will be run |
+// * Release the reference to ReplyTaskOwner |
+// * ReplyTaskOwner and associated |reply_| are deleted. |
+class TaskQueue::ReplyTaskOwner { |
+ public: |
+ ReplyTaskOwner(std::unique_ptr<QueuedTask> reply) |
+ : reply_(std::move(reply)) {} |
+ |
+ void Run() { |
+ RTC_DCHECK(reply_); |
+ if (run_task_) { |
+ if (!reply_->Run()) |
+ reply_.release(); |
+ } |
+ reply_.reset(); |
+ } |
+ |
+ void set_should_run_task() { |
+ RTC_DCHECK(!run_task_); |
+ run_task_ = true; |
+ } |
+ |
+ private: |
+ std::unique_ptr<QueuedTask> reply_; |
+ bool run_task_ = false; |
+}; |
+ |
class TaskQueue::PostAndReplyTask : public QueuedTask { |
public: |
PostAndReplyTask(std::unique_ptr<QueuedTask> task, |
std::unique_ptr<QueuedTask> reply, |
- TaskQueue* reply_queue) |
+ TaskQueue* reply_queue, |
+ int reply_pipe) |
: task_(std::move(task)), |
- reply_(std::move(reply)), |
- reply_queue_(reply_queue) { |
- reply_queue->PrepareReplyTask(this); |
+ reply_pipe_(reply_pipe), |
+ reply_task_owner_( |
+ new RefCountedObject<ReplyTaskOwner>(std::move(reply))) { |
+ reply_queue->PrepareReplyTask(reply_task_owner_); |
} |
~PostAndReplyTask() override { |
- CritScope lock(&lock_); |
- if (reply_queue_) |
- reply_queue_->ReplyTaskDone(this); |
- } |
- |
- void OnReplyQueueGone() { |
- CritScope lock(&lock_); |
- reply_queue_ = nullptr; |
+ reply_task_owner_ = nullptr; |
+ IgnoreSigPipeSignalOnCurrentThread(); |
+ // Send a signal to the reply queue that the reply task can run now. |
+ // Depending on whether |set_should_run_task()| was called by the |
+ // PostAndReplyTask(), the reply task may or may not actually run. |
+ // In either case, it will be deleted. |
+ char message = kRunReplyTask; |
+ write(reply_pipe_, &message, sizeof(message)); |
} |
private: |
bool Run() override { |
if (!task_->Run()) |
task_.release(); |
- |
- CritScope lock(&lock_); |
- if (reply_queue_) |
- reply_queue_->PostTask(std::move(reply_)); |
+ reply_task_owner_->set_should_run_task(); |
return true; |
} |
- CriticalSection lock_; |
std::unique_ptr<QueuedTask> task_; |
- std::unique_ptr<QueuedTask> reply_; |
- TaskQueue* reply_queue_ GUARDED_BY(lock_); |
+ int reply_pipe_; |
+ scoped_refptr<RefCountedObject<ReplyTaskOwner>> reply_task_owner_; |
}; |
class TaskQueue::SetTimerTask : public QueuedTask { |
@@ -144,6 +212,7 @@ TaskQueue::TaskQueue(const char* queue_name) |
SetNonBlocking(fds[1]); |
wakeup_pipe_out_ = fds[0]; |
wakeup_pipe_in_ = fds[1]; |
+ |
EventAssign(wakeup_event_.get(), event_base_, wakeup_pipe_out_, |
EV_READ | EV_PERSIST, OnWakeup, this); |
event_add(wakeup_event_.get(), 0); |
@@ -165,20 +234,14 @@ TaskQueue::~TaskQueue() { |
thread_.Stop(); |
event_del(wakeup_event_.get()); |
+ |
+ IgnoreSigPipeSignalOnCurrentThread(); |
+ |
close(wakeup_pipe_in_); |
close(wakeup_pipe_out_); |
wakeup_pipe_in_ = -1; |
wakeup_pipe_out_ = -1; |
- { |
- // Synchronize against any pending reply tasks that might be running on |
- // other queues. |
- CritScope lock(&pending_lock_); |
- for (auto* reply : pending_replies_) |
- reply->OnReplyQueueGone(); |
- pending_replies_.clear(); |
- } |
- |
event_base_free(event_base_); |
} |
@@ -246,7 +309,8 @@ void TaskQueue::PostTaskAndReply(std::unique_ptr<QueuedTask> task, |
std::unique_ptr<QueuedTask> reply, |
TaskQueue* reply_queue) { |
std::unique_ptr<QueuedTask> wrapper_task( |
- new PostAndReplyTask(std::move(task), std::move(reply), reply_queue)); |
+ new PostAndReplyTask(std::move(task), std::move(reply), reply_queue, |
+ reply_queue->wakeup_pipe_in_)); |
PostTask(std::move(wrapper_task)); |
} |
@@ -298,6 +362,22 @@ void TaskQueue::OnWakeup(int socket, short flags, void* context) { // NOLINT |
task.release(); |
break; |
} |
+ case kRunReplyTask: { |
+ scoped_refptr<ReplyTaskOwnerRef> reply_task; |
+ { |
+ CritScope lock(&ctx->queue->pending_lock_); |
+ for (auto it = ctx->queue->pending_replies_.begin(); |
+ it != ctx->queue->pending_replies_.end(); ++it) { |
+ if ((*it)->HasOneRef()) { |
+ reply_task = std::move(*it); |
+ ctx->queue->pending_replies_.erase(it); |
+ break; |
+ } |
+ } |
+ } |
+ reply_task->Run(); |
+ break; |
+ } |
default: |
RTC_NOTREACHED(); |
break; |
@@ -322,15 +402,10 @@ void TaskQueue::RunTimer(int fd, short flags, void* context) { // NOLINT |
delete timer; |
} |
-void TaskQueue::PrepareReplyTask(PostAndReplyTask* reply_task) { |
+void TaskQueue::PrepareReplyTask(scoped_refptr<ReplyTaskOwnerRef> reply_task) { |
RTC_DCHECK(reply_task); |
CritScope lock(&pending_lock_); |
- pending_replies_.push_back(reply_task); |
-} |
- |
-void TaskQueue::ReplyTaskDone(PostAndReplyTask* reply_task) { |
- CritScope lock(&pending_lock_); |
- pending_replies_.remove(reply_task); |
+ pending_replies_.push_back(std::move(reply_task)); |
} |
} // namespace rtc |