| Index: webrtc/base/task_queue_libevent.cc
|
| diff --git a/webrtc/base/task_queue_libevent.cc b/webrtc/base/task_queue_libevent.cc
|
| index f3ab89be806984d87a7e604abb7189b665ef37f0..a4f592e1bbaf60d97bf6f6b4624b3b273dabe07c 100644
|
| --- a/webrtc/base/task_queue_libevent.cc
|
| +++ b/webrtc/base/task_queue_libevent.cc
|
| @@ -11,6 +11,7 @@
|
| #include "webrtc/base/task_queue.h"
|
|
|
| #include <fcntl.h>
|
| +#include <signal.h>
|
| #include <string.h>
|
| #include <unistd.h>
|
|
|
| @@ -27,6 +28,28 @@ using internal::AutoSetCurrentQueuePtr;
|
| namespace {
|
| static const char kQuit = 1;
|
| static const char kRunTask = 2;
|
| +static const char kRunReplyTask = 3;
|
| +
|
| +// This ignores the SIGPIPE signal on the calling thread.
|
| +// This signal can be fired when trying to write() to a pipe that's being
|
| +// closed or while closing a pipe that's being written to.
|
| +// We can run into that situation (e.g. reply tasks that don't get a chance to
|
| +// run because the task queue is being deleted) so we ignore this signal and
|
| +// continue as normal.
|
| +// As a side note for this implementation, it would be great if we could safely
|
| +// restore the sigmask, but unfortunately the operation of restoring it, can
|
| +// itself actually cause SIGPIPE to be signaled :-| (e.g. on MacOS)
|
| +// The SIGPIPE signal by default causes the process to be terminated, so we
|
| +// don't want to risk that.
|
| +// An alternative to this approach is to ignore the signal for the whole
|
| +// process:
|
| +// signal(SIGPIPE, SIG_IGN);
|
| +void IgnoreSigPipeSignalOnCurrentThread() {
|
| + sigset_t sigpipe_mask;
|
| + sigemptyset(&sigpipe_mask);
|
| + sigaddset(&sigpipe_mask, SIGPIPE);
|
| + pthread_sigmask(SIG_BLOCK, &sigpipe_mask, nullptr);
|
| +}
|
|
|
| struct TimerEvent {
|
| explicit TimerEvent(std::unique_ptr<QueuedTask> task)
|
| @@ -71,43 +94,88 @@ struct TaskQueue::QueueContext {
|
| std::list<TimerEvent*> pending_timers_;
|
| };
|
|
|
| +// Posting a reply task is tricky business. This class owns the reply task
|
| +// and a reference to it is held by both the reply queue and the first task.
|
| +// Here's an outline of what happens when dealing with a reply task.
|
| +// * The ReplyTaskOwner owns the |reply_| task.
|
| +// * One ref owned by PostAndReplyTask
|
| +// * One ref owned by the reply TaskQueue
|
| +// * ReplyTaskOwner has a flag |run_task_| initially set to false.
|
| +// * ReplyTaskOwner has a method: HasOneRef() (provided by RefCountedObject).
|
| +// * After successfully running the original |task_|, PostAndReplyTask() calls
|
| +// set_should_run_task(). This sets |run_task_| to true.
|
| +// * In PostAndReplyTask's dtor:
|
| +// * It releases its reference to ReplyTaskOwner (important to do this first).
|
| +// * Sends (write()) a kRunReplyTask message to the reply queue's pipe.
|
| +// * PostAndReplyTask doesn't care if write() fails, but when it does:
|
| +// * The reply queue is gone.
|
| +// * ReplyTaskOwner has already been deleted and the reply task too.
|
| +// * If write() succeeds:
|
| +// * ReplyQueue receives the kRunReplyTask message
|
| +// * Goes through all pending tasks, finding the first that HasOneRef()
|
| +// * Calls ReplyTaskOwner::Run()
|
| +// * if set_should_run_task() was called, the reply task will be run
|
| +// * Release the reference to ReplyTaskOwner
|
| +// * ReplyTaskOwner and associated |reply_| are deleted.
|
| +class TaskQueue::ReplyTaskOwner {
|
| + public:
|
| + ReplyTaskOwner(std::unique_ptr<QueuedTask> reply)
|
| + : reply_(std::move(reply)) {}
|
| +
|
| + void Run() {
|
| + RTC_DCHECK(reply_);
|
| + if (run_task_) {
|
| + if (!reply_->Run())
|
| + reply_.release();
|
| + }
|
| + reply_.reset();
|
| + }
|
| +
|
| + void set_should_run_task() {
|
| + RTC_DCHECK(!run_task_);
|
| + run_task_ = true;
|
| + }
|
| +
|
| + private:
|
| + std::unique_ptr<QueuedTask> reply_;
|
| + bool run_task_ = false;
|
| +};
|
| +
|
| class TaskQueue::PostAndReplyTask : public QueuedTask {
|
| public:
|
| PostAndReplyTask(std::unique_ptr<QueuedTask> task,
|
| std::unique_ptr<QueuedTask> reply,
|
| - TaskQueue* reply_queue)
|
| + TaskQueue* reply_queue,
|
| + int reply_pipe)
|
| : task_(std::move(task)),
|
| - reply_(std::move(reply)),
|
| - reply_queue_(reply_queue) {
|
| - reply_queue->PrepareReplyTask(this);
|
| + reply_pipe_(reply_pipe),
|
| + reply_task_owner_(
|
| + new RefCountedObject<ReplyTaskOwner>(std::move(reply))) {
|
| + reply_queue->PrepareReplyTask(reply_task_owner_);
|
| }
|
|
|
| ~PostAndReplyTask() override {
|
| - CritScope lock(&lock_);
|
| - if (reply_queue_)
|
| - reply_queue_->ReplyTaskDone(this);
|
| - }
|
| -
|
| - void OnReplyQueueGone() {
|
| - CritScope lock(&lock_);
|
| - reply_queue_ = nullptr;
|
| + reply_task_owner_ = nullptr;
|
| + IgnoreSigPipeSignalOnCurrentThread();
|
| + // Send a signal to the reply queue that the reply task can run now.
|
| + // Depending on whether |set_should_run_task()| was called by the
|
| + // PostAndReplyTask(), the reply task may or may not actually run.
|
| + // In either case, it will be deleted.
|
| + char message = kRunReplyTask;
|
| + write(reply_pipe_, &message, sizeof(message));
|
| }
|
|
|
| private:
|
| bool Run() override {
|
| if (!task_->Run())
|
| task_.release();
|
| -
|
| - CritScope lock(&lock_);
|
| - if (reply_queue_)
|
| - reply_queue_->PostTask(std::move(reply_));
|
| + reply_task_owner_->set_should_run_task();
|
| return true;
|
| }
|
|
|
| - CriticalSection lock_;
|
| std::unique_ptr<QueuedTask> task_;
|
| - std::unique_ptr<QueuedTask> reply_;
|
| - TaskQueue* reply_queue_ GUARDED_BY(lock_);
|
| + int reply_pipe_;
|
| + scoped_refptr<RefCountedObject<ReplyTaskOwner>> reply_task_owner_;
|
| };
|
|
|
| class TaskQueue::SetTimerTask : public QueuedTask {
|
| @@ -144,6 +212,7 @@ TaskQueue::TaskQueue(const char* queue_name)
|
| SetNonBlocking(fds[1]);
|
| wakeup_pipe_out_ = fds[0];
|
| wakeup_pipe_in_ = fds[1];
|
| +
|
| EventAssign(wakeup_event_.get(), event_base_, wakeup_pipe_out_,
|
| EV_READ | EV_PERSIST, OnWakeup, this);
|
| event_add(wakeup_event_.get(), 0);
|
| @@ -165,20 +234,14 @@ TaskQueue::~TaskQueue() {
|
| thread_.Stop();
|
|
|
| event_del(wakeup_event_.get());
|
| +
|
| + IgnoreSigPipeSignalOnCurrentThread();
|
| +
|
| close(wakeup_pipe_in_);
|
| close(wakeup_pipe_out_);
|
| wakeup_pipe_in_ = -1;
|
| wakeup_pipe_out_ = -1;
|
|
|
| - {
|
| - // Synchronize against any pending reply tasks that might be running on
|
| - // other queues.
|
| - CritScope lock(&pending_lock_);
|
| - for (auto* reply : pending_replies_)
|
| - reply->OnReplyQueueGone();
|
| - pending_replies_.clear();
|
| - }
|
| -
|
| event_base_free(event_base_);
|
| }
|
|
|
| @@ -246,7 +309,8 @@ void TaskQueue::PostTaskAndReply(std::unique_ptr<QueuedTask> task,
|
| std::unique_ptr<QueuedTask> reply,
|
| TaskQueue* reply_queue) {
|
| std::unique_ptr<QueuedTask> wrapper_task(
|
| - new PostAndReplyTask(std::move(task), std::move(reply), reply_queue));
|
| + new PostAndReplyTask(std::move(task), std::move(reply), reply_queue,
|
| + reply_queue->wakeup_pipe_in_));
|
| PostTask(std::move(wrapper_task));
|
| }
|
|
|
| @@ -298,6 +362,22 @@ void TaskQueue::OnWakeup(int socket, short flags, void* context) { // NOLINT
|
| task.release();
|
| break;
|
| }
|
| + case kRunReplyTask: {
|
| + scoped_refptr<ReplyTaskOwnerRef> reply_task;
|
| + {
|
| + CritScope lock(&ctx->queue->pending_lock_);
|
| + for (auto it = ctx->queue->pending_replies_.begin();
|
| + it != ctx->queue->pending_replies_.end(); ++it) {
|
| + if ((*it)->HasOneRef()) {
|
| + reply_task = std::move(*it);
|
| + ctx->queue->pending_replies_.erase(it);
|
| + break;
|
| + }
|
| + }
|
| + }
|
| + reply_task->Run();
|
| + break;
|
| + }
|
| default:
|
| RTC_NOTREACHED();
|
| break;
|
| @@ -322,15 +402,10 @@ void TaskQueue::RunTimer(int fd, short flags, void* context) { // NOLINT
|
| delete timer;
|
| }
|
|
|
| -void TaskQueue::PrepareReplyTask(PostAndReplyTask* reply_task) {
|
| +void TaskQueue::PrepareReplyTask(scoped_refptr<ReplyTaskOwnerRef> reply_task) {
|
| RTC_DCHECK(reply_task);
|
| CritScope lock(&pending_lock_);
|
| - pending_replies_.push_back(reply_task);
|
| -}
|
| -
|
| -void TaskQueue::ReplyTaskDone(PostAndReplyTask* reply_task) {
|
| - CritScope lock(&pending_lock_);
|
| - pending_replies_.remove(reply_task);
|
| + pending_replies_.push_back(std::move(reply_task));
|
| }
|
|
|
| } // namespace rtc
|
|
|