Chromium Code Reviews| Index: webrtc/base/task_queue_libevent.cc |
| diff --git a/webrtc/base/task_queue_libevent.cc b/webrtc/base/task_queue_libevent.cc |
| index f3ab89be806984d87a7e604abb7189b665ef37f0..1427e5a5d9844f97461ff7ff83d2ba0d30bffcc2 100644 |
| --- a/webrtc/base/task_queue_libevent.cc |
| +++ b/webrtc/base/task_queue_libevent.cc |
| @@ -11,6 +11,7 @@ |
| #include "webrtc/base/task_queue.h" |
| #include <fcntl.h> |
| +#include <signal.h> |
| #include <string.h> |
| #include <unistd.h> |
| @@ -27,6 +28,25 @@ using internal::AutoSetCurrentQueuePtr; |
| namespace { |
| static const char kQuit = 1; |
| static const char kRunTask = 2; |
| +static const char kRunReplyTask = 3; |
| + |
| +// This ignores the SIGPIPE signal on the calling thread. |
| +// This signal can be fired when trying to write() to a pipe that's being |
| +// closed or while closing a pipe that's being written to. |
| +// We can run into that situation and handle it (e.g. reply tasks that don't |
|
Taylor Brandstetter
2017/02/22 00:24:27
It's not clear to me what "We can run into that si
tommi
2017/02/22 12:43:54
thanks, yeah this isn't clear. "handle" should be
|
| +// get a chance to run because the task queue is being deleted). |
| +// It would be great if we could restore the sigmask, but unfortunately, |
| +// restoring it, can actually cause SIGPIPE to be signaled :-| Which by default |
| +// causes the process to be terminated. |
| +// An alternative to this approach is to call: |
| +// signal(SIGPIPE, SIG_IGN); |
| +// However, doing so affects the whole process. |
| +void IgnoreSigPipeSignalOnCurrentThread() { |
| + sigset_t sigpipe_mask; |
| + sigemptyset(&sigpipe_mask); |
| + sigaddset(&sigpipe_mask, SIGPIPE); |
| + pthread_sigmask(SIG_BLOCK, &sigpipe_mask, nullptr); |
| +} |
| struct TimerEvent { |
| explicit TimerEvent(std::unique_ptr<QueuedTask> task) |
| @@ -71,43 +91,83 @@ struct TaskQueue::QueueContext { |
| std::list<TimerEvent*> pending_timers_; |
| }; |
| +// Posting a reply task is tricky business. This class owns the reply task |
| +// and a reference to it is held by both the reply queue and the first task. |
| +// Here's an outline of what happens when dealing with a reply task. |
| +// * The ReplyTaskOwner owns the |reply_| task. |
| +// * One ref owned by PostAndReplyTask |
| +// * One ref owned by the reply TaskQueue |
| +// * ReplyTaskOwner has a flag |run_task_| initially set to false. |
| +// * ReplyTaskOwner has a method: HasOneRef() (provided by RefCountedObject). |
| +// * After successfully running the original |task_|, PostAndReplyTask() calls |
| +// set_should_run_task(). This sets |run_task_| to true. |
| +// * In PostAndReplyTask's dtor: |
| +// * It releases its reference to ReplyTaskOwner (important to do this first). |
| +// * Sends (write()) a kRunReplyTask message to the reply queue's pipe. |
| +// * PostAndReplyTask doesn't care if write() fails, but when it does: |
| +// * The reply queue is gone. |
| +// * ReplyTaskOwner has already been deleted and the reply task too. |
| +// * If write() succeeds: |
| +// * ReplyQueue receives the kRunReplyTask message |
| +// * Goes through all pending tasks, finding the first that HasOneRef() |
| +// * Calls ReplyTaskOwner::Run() |
| +// * if set_should_run_task() was called, the reply task will be run |
| +// * Release the reference to ReplyTaskOwner |
| +// * ReplyTaskOwner and associated |reply_| are deleted. |
| +class TaskQueue::ReplyTaskOwner { |
| + public: |
| + ReplyTaskOwner(std::unique_ptr<QueuedTask> reply) |
| + : reply_(std::move(reply)) {} |
| + |
| + void Run() { |
| + RTC_DCHECK(reply_); |
|
Taylor Brandstetter
2017/02/22 00:24:27
Any reason the DCHECK is in Run and not the constr
tommi
2017/02/22 12:43:54
In case Run() ever gets called twice and reply_.re
|
| + if (run_task_) { |
| + if (!reply_->Run()) |
| + reply_.release(); |
| + } |
| + } |
| + |
| + void set_should_run_task() { |
| + RTC_DCHECK(!run_task_); |
| + run_task_ = true; |
| + } |
| + |
| + private: |
| + std::unique_ptr<QueuedTask> reply_; |
| + bool run_task_ = false; |
| +}; |
| + |
| class TaskQueue::PostAndReplyTask : public QueuedTask { |
| public: |
| PostAndReplyTask(std::unique_ptr<QueuedTask> task, |
| std::unique_ptr<QueuedTask> reply, |
| - TaskQueue* reply_queue) |
| + TaskQueue* reply_queue, |
| + int reply_pipe) |
| : task_(std::move(task)), |
| - reply_(std::move(reply)), |
| - reply_queue_(reply_queue) { |
| - reply_queue->PrepareReplyTask(this); |
| + reply_pipe_(reply_pipe), |
| + reply_task_owner_( |
| + new RefCountedObject<ReplyTaskOwner>(std::move(reply))) { |
| + reply_queue->PrepareReplyTask(reply_task_owner_); |
| } |
| ~PostAndReplyTask() override { |
| - CritScope lock(&lock_); |
| - if (reply_queue_) |
| - reply_queue_->ReplyTaskDone(this); |
| - } |
| - |
| - void OnReplyQueueGone() { |
| - CritScope lock(&lock_); |
| - reply_queue_ = nullptr; |
| + reply_task_owner_ = nullptr; |
| + IgnoreSigPipeSignalOnCurrentThread(); |
| + char message = kRunReplyTask; |
| + write(reply_pipe_, &message, sizeof(message)); |
|
Taylor Brandstetter
2017/02/22 00:24:27
Could you leave a comment here explaining that if
tommi
2017/02/22 12:43:54
Done.
|
| } |
| private: |
| bool Run() override { |
| if (!task_->Run()) |
| task_.release(); |
| - |
| - CritScope lock(&lock_); |
| - if (reply_queue_) |
| - reply_queue_->PostTask(std::move(reply_)); |
| + reply_task_owner_->set_should_run_task(); |
| return true; |
| } |
| - CriticalSection lock_; |
| std::unique_ptr<QueuedTask> task_; |
| - std::unique_ptr<QueuedTask> reply_; |
| - TaskQueue* reply_queue_ GUARDED_BY(lock_); |
| + int reply_pipe_; |
| + scoped_refptr<RefCountedObject<ReplyTaskOwner>> reply_task_owner_; |
| }; |
| class TaskQueue::SetTimerTask : public QueuedTask { |
| @@ -144,6 +204,7 @@ TaskQueue::TaskQueue(const char* queue_name) |
| SetNonBlocking(fds[1]); |
| wakeup_pipe_out_ = fds[0]; |
| wakeup_pipe_in_ = fds[1]; |
| + |
| EventAssign(wakeup_event_.get(), event_base_, wakeup_pipe_out_, |
| EV_READ | EV_PERSIST, OnWakeup, this); |
| event_add(wakeup_event_.get(), 0); |
| @@ -165,20 +226,14 @@ TaskQueue::~TaskQueue() { |
| thread_.Stop(); |
| event_del(wakeup_event_.get()); |
| + |
| + IgnoreSigPipeSignalOnCurrentThread(); |
|
Taylor Brandstetter
2017/02/22 00:24:27
So, does this mean after the first TaskQueue is de
tommi
2017/02/22 12:43:54
Permanently turned off for the current thread, not
|
| + |
| close(wakeup_pipe_in_); |
| close(wakeup_pipe_out_); |
| wakeup_pipe_in_ = -1; |
| wakeup_pipe_out_ = -1; |
| - { |
| - // Synchronize against any pending reply tasks that might be running on |
| - // other queues. |
| - CritScope lock(&pending_lock_); |
| - for (auto* reply : pending_replies_) |
| - reply->OnReplyQueueGone(); |
| - pending_replies_.clear(); |
| - } |
| - |
| event_base_free(event_base_); |
| } |
| @@ -246,7 +301,8 @@ void TaskQueue::PostTaskAndReply(std::unique_ptr<QueuedTask> task, |
| std::unique_ptr<QueuedTask> reply, |
| TaskQueue* reply_queue) { |
| std::unique_ptr<QueuedTask> wrapper_task( |
| - new PostAndReplyTask(std::move(task), std::move(reply), reply_queue)); |
| + new PostAndReplyTask(std::move(task), std::move(reply), reply_queue, |
| + reply_queue->wakeup_pipe_in_)); |
| PostTask(std::move(wrapper_task)); |
| } |
| @@ -298,6 +354,22 @@ void TaskQueue::OnWakeup(int socket, short flags, void* context) { // NOLINT |
| task.release(); |
| break; |
| } |
| + case kRunReplyTask: { |
| + scoped_refptr<ReplyTaskOwnerRef> reply_task; |
| + { |
| + CritScope lock(&ctx->queue->pending_lock_); |
| + for (auto it = ctx->queue->pending_replies_.begin(); |
| + it != ctx->queue->pending_replies_.end(); ++it) { |
| + if ((*it)->HasOneRef()) { |
| + reply_task = std::move(*it); |
| + ctx->queue->pending_replies_.erase(it); |
| + break; |
| + } |
| + } |
| + } |
| + reply_task->Run(); |
| + break; |
| + } |
| default: |
| RTC_NOTREACHED(); |
| break; |
| @@ -322,15 +394,10 @@ void TaskQueue::RunTimer(int fd, short flags, void* context) { // NOLINT |
| delete timer; |
| } |
| -void TaskQueue::PrepareReplyTask(PostAndReplyTask* reply_task) { |
| +void TaskQueue::PrepareReplyTask(scoped_refptr<ReplyTaskOwnerRef> reply_task) { |
| RTC_DCHECK(reply_task); |
| CritScope lock(&pending_lock_); |
| - pending_replies_.push_back(reply_task); |
| -} |
| - |
| -void TaskQueue::ReplyTaskDone(PostAndReplyTask* reply_task) { |
| - CritScope lock(&pending_lock_); |
| - pending_replies_.remove(reply_task); |
| + pending_replies_.push_back(std::move(reply_task)); |
| } |
| } // namespace rtc |