Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(93)

Side by Side Diff: webrtc/base/task_queue_libevent.cc

Issue 2709603002: Fix potential deadlock in TaskQueue's libevent PostTaskAndReply implementation (Closed)
Patch Set: Address comments Created 3 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « webrtc/base/task_queue_gcd.cc ('k') | webrtc/base/task_queue_unittest.cc » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 /* 1 /*
2 * Copyright 2016 The WebRTC Project Authors. All rights reserved. 2 * Copyright 2016 The WebRTC Project Authors. All rights reserved.
3 * 3 *
4 * Use of this source code is governed by a BSD-style license 4 * Use of this source code is governed by a BSD-style license
5 * that can be found in the LICENSE file in the root of the source 5 * that can be found in the LICENSE file in the root of the source
6 * tree. An additional intellectual property rights grant can be found 6 * tree. An additional intellectual property rights grant can be found
7 * in the file PATENTS. All contributing project authors may 7 * in the file PATENTS. All contributing project authors may
8 * be found in the AUTHORS file in the root of the source tree. 8 * be found in the AUTHORS file in the root of the source tree.
9 */ 9 */
10 10
11 #include "webrtc/base/task_queue.h" 11 #include "webrtc/base/task_queue.h"
12 12
13 #include <fcntl.h> 13 #include <fcntl.h>
14 #include <signal.h>
14 #include <string.h> 15 #include <string.h>
15 #include <unistd.h> 16 #include <unistd.h>
16 17
17 #include "base/third_party/libevent/event.h" 18 #include "base/third_party/libevent/event.h"
18 #include "webrtc/base/checks.h" 19 #include "webrtc/base/checks.h"
19 #include "webrtc/base/logging.h" 20 #include "webrtc/base/logging.h"
20 #include "webrtc/base/task_queue_posix.h" 21 #include "webrtc/base/task_queue_posix.h"
21 #include "webrtc/base/timeutils.h" 22 #include "webrtc/base/timeutils.h"
22 23
23 namespace rtc { 24 namespace rtc {
24 using internal::GetQueuePtrTls; 25 using internal::GetQueuePtrTls;
25 using internal::AutoSetCurrentQueuePtr; 26 using internal::AutoSetCurrentQueuePtr;
26 27
27 namespace { 28 namespace {
28 static const char kQuit = 1; 29 static const char kQuit = 1;
29 static const char kRunTask = 2; 30 static const char kRunTask = 2;
31 static const char kRunReplyTask = 3;
32
33 // This ignores the SIGPIPE signal on the calling thread.
34 // This signal can be fired when trying to write() to a pipe that's being
35 // closed or while closing a pipe that's being written to.
36 // We can run into that situation (e.g. reply tasks that don't get a chance to
37 // run because the task queue is being deleted) so we ignore this signal and
38 // continue as normal.
39 // As a side note for this implementation, it would be great if we could safely
40 // restore the sigmask, but unfortunately the operation of restoring it, can
41 // itself actually cause SIGPIPE to be signaled :-| (e.g. on MacOS)
42 // The SIGPIPE signal by default causes the process to be terminated, so we
43 // don't want to risk that.
44 // An alternative to this approach is to ignore the signal for the whole
45 // process:
46 // signal(SIGPIPE, SIG_IGN);
47 void IgnoreSigPipeSignalOnCurrentThread() {
48 sigset_t sigpipe_mask;
49 sigemptyset(&sigpipe_mask);
50 sigaddset(&sigpipe_mask, SIGPIPE);
51 pthread_sigmask(SIG_BLOCK, &sigpipe_mask, nullptr);
52 }
30 53
31 struct TimerEvent { 54 struct TimerEvent {
32 explicit TimerEvent(std::unique_ptr<QueuedTask> task) 55 explicit TimerEvent(std::unique_ptr<QueuedTask> task)
33 : task(std::move(task)) {} 56 : task(std::move(task)) {}
34 ~TimerEvent() { event_del(&ev); } 57 ~TimerEvent() { event_del(&ev); }
35 event ev; 58 event ev;
36 std::unique_ptr<QueuedTask> task; 59 std::unique_ptr<QueuedTask> task;
37 }; 60 };
38 61
39 bool SetNonBlocking(int fd) { 62 bool SetNonBlocking(int fd) {
(...skipping 24 matching lines...) Expand all
64 } // namespace 87 } // namespace
65 88
66 struct TaskQueue::QueueContext { 89 struct TaskQueue::QueueContext {
67 explicit QueueContext(TaskQueue* q) : queue(q), is_active(true) {} 90 explicit QueueContext(TaskQueue* q) : queue(q), is_active(true) {}
68 TaskQueue* queue; 91 TaskQueue* queue;
69 bool is_active; 92 bool is_active;
70 // Holds a list of events pending timers for cleanup when the loop exits. 93 // Holds a list of events pending timers for cleanup when the loop exits.
71 std::list<TimerEvent*> pending_timers_; 94 std::list<TimerEvent*> pending_timers_;
72 }; 95 };
73 96
97 // Posting a reply task is tricky business. This class owns the reply task
98 // and a reference to it is held by both the reply queue and the first task.
99 // Here's an outline of what happens when dealing with a reply task.
100 // * The ReplyTaskOwner owns the |reply_| task.
101 // * One ref owned by PostAndReplyTask
102 // * One ref owned by the reply TaskQueue
103 // * ReplyTaskOwner has a flag |run_task_| initially set to false.
104 // * ReplyTaskOwner has a method: HasOneRef() (provided by RefCountedObject).
105 // * After successfully running the original |task_|, PostAndReplyTask() calls
106 // set_should_run_task(). This sets |run_task_| to true.
107 // * In PostAndReplyTask's dtor:
108 // * It releases its reference to ReplyTaskOwner (important to do this first).
109 // * Sends (write()) a kRunReplyTask message to the reply queue's pipe.
110 // * PostAndReplyTask doesn't care if write() fails, but when it does:
111 // * The reply queue is gone.
112 // * ReplyTaskOwner has already been deleted and the reply task too.
113 // * If write() succeeds:
114 // * ReplyQueue receives the kRunReplyTask message
115 // * Goes through all pending tasks, finding the first that HasOneRef()
116 // * Calls ReplyTaskOwner::Run()
117 // * if set_should_run_task() was called, the reply task will be run
118 // * Release the reference to ReplyTaskOwner
119 // * ReplyTaskOwner and associated |reply_| are deleted.
120 class TaskQueue::ReplyTaskOwner {
121 public:
122 ReplyTaskOwner(std::unique_ptr<QueuedTask> reply)
123 : reply_(std::move(reply)) {}
124
125 void Run() {
126 RTC_DCHECK(reply_);
127 if (run_task_) {
128 if (!reply_->Run())
129 reply_.release();
130 }
131 reply_.reset();
132 }
133
134 void set_should_run_task() {
135 RTC_DCHECK(!run_task_);
136 run_task_ = true;
137 }
138
139 private:
140 std::unique_ptr<QueuedTask> reply_;
141 bool run_task_ = false;
142 };
143
74 class TaskQueue::PostAndReplyTask : public QueuedTask { 144 class TaskQueue::PostAndReplyTask : public QueuedTask {
75 public: 145 public:
76 PostAndReplyTask(std::unique_ptr<QueuedTask> task, 146 PostAndReplyTask(std::unique_ptr<QueuedTask> task,
77 std::unique_ptr<QueuedTask> reply, 147 std::unique_ptr<QueuedTask> reply,
78 TaskQueue* reply_queue) 148 TaskQueue* reply_queue,
149 int reply_pipe)
79 : task_(std::move(task)), 150 : task_(std::move(task)),
80 reply_(std::move(reply)), 151 reply_pipe_(reply_pipe),
81 reply_queue_(reply_queue) { 152 reply_task_owner_(
82 reply_queue->PrepareReplyTask(this); 153 new RefCountedObject<ReplyTaskOwner>(std::move(reply))) {
154 reply_queue->PrepareReplyTask(reply_task_owner_);
83 } 155 }
84 156
85 ~PostAndReplyTask() override { 157 ~PostAndReplyTask() override {
86 CritScope lock(&lock_); 158 reply_task_owner_ = nullptr;
87 if (reply_queue_) 159 IgnoreSigPipeSignalOnCurrentThread();
88 reply_queue_->ReplyTaskDone(this); 160 // Send a signal to the reply queue that the reply task can run now.
89 } 161 // Depending on whether |set_should_run_task()| was called by the
90 162 // PostAndReplyTask(), the reply task may or may not actually run.
91 void OnReplyQueueGone() { 163 // In either case, it will be deleted.
92 CritScope lock(&lock_); 164 char message = kRunReplyTask;
93 reply_queue_ = nullptr; 165 write(reply_pipe_, &message, sizeof(message));
94 } 166 }
95 167
96 private: 168 private:
97 bool Run() override { 169 bool Run() override {
98 if (!task_->Run()) 170 if (!task_->Run())
99 task_.release(); 171 task_.release();
100 172 reply_task_owner_->set_should_run_task();
101 CritScope lock(&lock_);
102 if (reply_queue_)
103 reply_queue_->PostTask(std::move(reply_));
104 return true; 173 return true;
105 } 174 }
106 175
107 CriticalSection lock_;
108 std::unique_ptr<QueuedTask> task_; 176 std::unique_ptr<QueuedTask> task_;
109 std::unique_ptr<QueuedTask> reply_; 177 int reply_pipe_;
110 TaskQueue* reply_queue_ GUARDED_BY(lock_); 178 scoped_refptr<RefCountedObject<ReplyTaskOwner>> reply_task_owner_;
111 }; 179 };
112 180
113 class TaskQueue::SetTimerTask : public QueuedTask { 181 class TaskQueue::SetTimerTask : public QueuedTask {
114 public: 182 public:
115 SetTimerTask(std::unique_ptr<QueuedTask> task, uint32_t milliseconds) 183 SetTimerTask(std::unique_ptr<QueuedTask> task, uint32_t milliseconds)
116 : task_(std::move(task)), 184 : task_(std::move(task)),
117 milliseconds_(milliseconds), 185 milliseconds_(milliseconds),
118 posted_(Time32()) {} 186 posted_(Time32()) {}
119 187
120 private: 188 private:
(...skipping 16 matching lines...) Expand all
137 : event_base_(event_base_new()), 205 : event_base_(event_base_new()),
138 wakeup_event_(new event()), 206 wakeup_event_(new event()),
139 thread_(&TaskQueue::ThreadMain, this, queue_name) { 207 thread_(&TaskQueue::ThreadMain, this, queue_name) {
140 RTC_DCHECK(queue_name); 208 RTC_DCHECK(queue_name);
141 int fds[2]; 209 int fds[2];
142 RTC_CHECK(pipe(fds) == 0); 210 RTC_CHECK(pipe(fds) == 0);
143 SetNonBlocking(fds[0]); 211 SetNonBlocking(fds[0]);
144 SetNonBlocking(fds[1]); 212 SetNonBlocking(fds[1]);
145 wakeup_pipe_out_ = fds[0]; 213 wakeup_pipe_out_ = fds[0];
146 wakeup_pipe_in_ = fds[1]; 214 wakeup_pipe_in_ = fds[1];
215
147 EventAssign(wakeup_event_.get(), event_base_, wakeup_pipe_out_, 216 EventAssign(wakeup_event_.get(), event_base_, wakeup_pipe_out_,
148 EV_READ | EV_PERSIST, OnWakeup, this); 217 EV_READ | EV_PERSIST, OnWakeup, this);
149 event_add(wakeup_event_.get(), 0); 218 event_add(wakeup_event_.get(), 0);
150 thread_.Start(); 219 thread_.Start();
151 } 220 }
152 221
153 TaskQueue::~TaskQueue() { 222 TaskQueue::~TaskQueue() {
154 RTC_DCHECK(!IsCurrent()); 223 RTC_DCHECK(!IsCurrent());
155 struct timespec ts; 224 struct timespec ts;
156 char message = kQuit; 225 char message = kQuit;
157 while (write(wakeup_pipe_in_, &message, sizeof(message)) != sizeof(message)) { 226 while (write(wakeup_pipe_in_, &message, sizeof(message)) != sizeof(message)) {
158 // The queue is full, so we have no choice but to wait and retry. 227 // The queue is full, so we have no choice but to wait and retry.
159 RTC_CHECK_EQ(EAGAIN, errno); 228 RTC_CHECK_EQ(EAGAIN, errno);
160 ts.tv_sec = 0; 229 ts.tv_sec = 0;
161 ts.tv_nsec = 1000000; 230 ts.tv_nsec = 1000000;
162 nanosleep(&ts, nullptr); 231 nanosleep(&ts, nullptr);
163 } 232 }
164 233
165 thread_.Stop(); 234 thread_.Stop();
166 235
167 event_del(wakeup_event_.get()); 236 event_del(wakeup_event_.get());
237
238 IgnoreSigPipeSignalOnCurrentThread();
239
168 close(wakeup_pipe_in_); 240 close(wakeup_pipe_in_);
169 close(wakeup_pipe_out_); 241 close(wakeup_pipe_out_);
170 wakeup_pipe_in_ = -1; 242 wakeup_pipe_in_ = -1;
171 wakeup_pipe_out_ = -1; 243 wakeup_pipe_out_ = -1;
172 244
173 {
174 // Synchronize against any pending reply tasks that might be running on
175 // other queues.
176 CritScope lock(&pending_lock_);
177 for (auto* reply : pending_replies_)
178 reply->OnReplyQueueGone();
179 pending_replies_.clear();
180 }
181
182 event_base_free(event_base_); 245 event_base_free(event_base_);
183 } 246 }
184 247
185 // static 248 // static
186 TaskQueue* TaskQueue::Current() { 249 TaskQueue* TaskQueue::Current() {
187 QueueContext* ctx = 250 QueueContext* ctx =
188 static_cast<QueueContext*>(pthread_getspecific(GetQueuePtrTls())); 251 static_cast<QueueContext*>(pthread_getspecific(GetQueuePtrTls()));
189 return ctx ? ctx->queue : nullptr; 252 return ctx ? ctx->queue : nullptr;
190 } 253 }
191 254
(...skipping 47 matching lines...) Expand 10 before | Expand all | Expand 10 after
239 } else { 302 } else {
240 PostTask(std::unique_ptr<QueuedTask>( 303 PostTask(std::unique_ptr<QueuedTask>(
241 new SetTimerTask(std::move(task), milliseconds))); 304 new SetTimerTask(std::move(task), milliseconds)));
242 } 305 }
243 } 306 }
244 307
245 void TaskQueue::PostTaskAndReply(std::unique_ptr<QueuedTask> task, 308 void TaskQueue::PostTaskAndReply(std::unique_ptr<QueuedTask> task,
246 std::unique_ptr<QueuedTask> reply, 309 std::unique_ptr<QueuedTask> reply,
247 TaskQueue* reply_queue) { 310 TaskQueue* reply_queue) {
248 std::unique_ptr<QueuedTask> wrapper_task( 311 std::unique_ptr<QueuedTask> wrapper_task(
249 new PostAndReplyTask(std::move(task), std::move(reply), reply_queue)); 312 new PostAndReplyTask(std::move(task), std::move(reply), reply_queue,
313 reply_queue->wakeup_pipe_in_));
250 PostTask(std::move(wrapper_task)); 314 PostTask(std::move(wrapper_task));
251 } 315 }
252 316
253 void TaskQueue::PostTaskAndReply(std::unique_ptr<QueuedTask> task, 317 void TaskQueue::PostTaskAndReply(std::unique_ptr<QueuedTask> task,
254 std::unique_ptr<QueuedTask> reply) { 318 std::unique_ptr<QueuedTask> reply) {
255 return PostTaskAndReply(std::move(task), std::move(reply), Current()); 319 return PostTaskAndReply(std::move(task), std::move(reply), Current());
256 } 320 }
257 321
258 // static 322 // static
259 bool TaskQueue::ThreadMain(void* context) { 323 bool TaskQueue::ThreadMain(void* context) {
(...skipping 31 matching lines...) Expand 10 before | Expand all | Expand 10 after
291 CritScope lock(&ctx->queue->pending_lock_); 355 CritScope lock(&ctx->queue->pending_lock_);
292 RTC_DCHECK(!ctx->queue->pending_.empty()); 356 RTC_DCHECK(!ctx->queue->pending_.empty());
293 task = std::move(ctx->queue->pending_.front()); 357 task = std::move(ctx->queue->pending_.front());
294 ctx->queue->pending_.pop_front(); 358 ctx->queue->pending_.pop_front();
295 RTC_DCHECK(task.get()); 359 RTC_DCHECK(task.get());
296 } 360 }
297 if (!task->Run()) 361 if (!task->Run())
298 task.release(); 362 task.release();
299 break; 363 break;
300 } 364 }
365 case kRunReplyTask: {
366 scoped_refptr<ReplyTaskOwnerRef> reply_task;
367 {
368 CritScope lock(&ctx->queue->pending_lock_);
369 for (auto it = ctx->queue->pending_replies_.begin();
370 it != ctx->queue->pending_replies_.end(); ++it) {
371 if ((*it)->HasOneRef()) {
372 reply_task = std::move(*it);
373 ctx->queue->pending_replies_.erase(it);
374 break;
375 }
376 }
377 }
378 reply_task->Run();
379 break;
380 }
301 default: 381 default:
302 RTC_NOTREACHED(); 382 RTC_NOTREACHED();
303 break; 383 break;
304 } 384 }
305 } 385 }
306 386
307 // static 387 // static
308 void TaskQueue::RunTask(int fd, short flags, void* context) { // NOLINT 388 void TaskQueue::RunTask(int fd, short flags, void* context) { // NOLINT
309 auto* task = static_cast<QueuedTask*>(context); 389 auto* task = static_cast<QueuedTask*>(context);
310 if (task->Run()) 390 if (task->Run())
311 delete task; 391 delete task;
312 } 392 }
313 393
314 // static 394 // static
315 void TaskQueue::RunTimer(int fd, short flags, void* context) { // NOLINT 395 void TaskQueue::RunTimer(int fd, short flags, void* context) { // NOLINT
316 TimerEvent* timer = static_cast<TimerEvent*>(context); 396 TimerEvent* timer = static_cast<TimerEvent*>(context);
317 if (!timer->task->Run()) 397 if (!timer->task->Run())
318 timer->task.release(); 398 timer->task.release();
319 QueueContext* ctx = 399 QueueContext* ctx =
320 static_cast<QueueContext*>(pthread_getspecific(GetQueuePtrTls())); 400 static_cast<QueueContext*>(pthread_getspecific(GetQueuePtrTls()));
321 ctx->pending_timers_.remove(timer); 401 ctx->pending_timers_.remove(timer);
322 delete timer; 402 delete timer;
323 } 403 }
324 404
325 void TaskQueue::PrepareReplyTask(PostAndReplyTask* reply_task) { 405 void TaskQueue::PrepareReplyTask(scoped_refptr<ReplyTaskOwnerRef> reply_task) {
326 RTC_DCHECK(reply_task); 406 RTC_DCHECK(reply_task);
327 CritScope lock(&pending_lock_); 407 CritScope lock(&pending_lock_);
328 pending_replies_.push_back(reply_task); 408 pending_replies_.push_back(std::move(reply_task));
329 }
330
331 void TaskQueue::ReplyTaskDone(PostAndReplyTask* reply_task) {
332 CritScope lock(&pending_lock_);
333 pending_replies_.remove(reply_task);
334 } 409 }
335 410
336 } // namespace rtc 411 } // namespace rtc
OLDNEW
« no previous file with comments | « webrtc/base/task_queue_gcd.cc ('k') | webrtc/base/task_queue_unittest.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698