Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(246)

Side by Side Diff: webrtc/base/task_queue_libevent.cc

Issue 2709603002: Fix potential deadlock in TaskQueue's libevent PostTaskAndReply implementation (Closed)
Patch Set: Update comment Created 3 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 /* 1 /*
2 * Copyright 2016 The WebRTC Project Authors. All rights reserved. 2 * Copyright 2016 The WebRTC Project Authors. All rights reserved.
3 * 3 *
4 * Use of this source code is governed by a BSD-style license 4 * Use of this source code is governed by a BSD-style license
5 * that can be found in the LICENSE file in the root of the source 5 * that can be found in the LICENSE file in the root of the source
6 * tree. An additional intellectual property rights grant can be found 6 * tree. An additional intellectual property rights grant can be found
7 * in the file PATENTS. All contributing project authors may 7 * in the file PATENTS. All contributing project authors may
8 * be found in the AUTHORS file in the root of the source tree. 8 * be found in the AUTHORS file in the root of the source tree.
9 */ 9 */
10 10
11 #include "webrtc/base/task_queue.h" 11 #include "webrtc/base/task_queue.h"
12 12
13 #include <fcntl.h> 13 #include <fcntl.h>
14 #include <signal.h>
14 #include <string.h> 15 #include <string.h>
15 #include <unistd.h> 16 #include <unistd.h>
16 17
17 #include "base/third_party/libevent/event.h" 18 #include "base/third_party/libevent/event.h"
18 #include "webrtc/base/checks.h" 19 #include "webrtc/base/checks.h"
19 #include "webrtc/base/logging.h" 20 #include "webrtc/base/logging.h"
20 #include "webrtc/base/task_queue_posix.h" 21 #include "webrtc/base/task_queue_posix.h"
21 #include "webrtc/base/timeutils.h" 22 #include "webrtc/base/timeutils.h"
22 23
23 namespace rtc { 24 namespace rtc {
24 using internal::GetQueuePtrTls; 25 using internal::GetQueuePtrTls;
25 using internal::AutoSetCurrentQueuePtr; 26 using internal::AutoSetCurrentQueuePtr;
26 27
27 namespace { 28 namespace {
28 static const char kQuit = 1; 29 static const char kQuit = 1;
29 static const char kRunTask = 2; 30 static const char kRunTask = 2;
31 static const char kRunReplyTask = 3;
32
33 // This ignores the SIGPIPE signal on the calling thread.
34 // This signal can be fired when trying to write() to a pipe that's being
35 // closed or while closing a pipe that's being written to.
36 // We can run into that situation and handle it (e.g. reply tasks that don't
Taylor Brandstetter 2017/02/22 00:24:27 It's not clear to me what "We can run into that si
tommi 2017/02/22 12:43:54 thanks, yeah this isn't clear. "handle" should be
37 // get a chance to run because the task queue is being deleted).
38 // It would be great if we could restore the sigmask, but unfortunately,
39 // restoring it, can actually cause SIGPIPE to be signaled :-| Which by default
40 // causes the process to be terminated.
41 // An alternative to this approach is to call:
42 // signal(SIGPIPE, SIG_IGN);
43 // However, doing so affects the whole process.
44 void IgnoreSigPipeSignalOnCurrentThread() {
45 sigset_t sigpipe_mask;
46 sigemptyset(&sigpipe_mask);
47 sigaddset(&sigpipe_mask, SIGPIPE);
48 pthread_sigmask(SIG_BLOCK, &sigpipe_mask, nullptr);
49 }
30 50
31 struct TimerEvent { 51 struct TimerEvent {
32 explicit TimerEvent(std::unique_ptr<QueuedTask> task) 52 explicit TimerEvent(std::unique_ptr<QueuedTask> task)
33 : task(std::move(task)) {} 53 : task(std::move(task)) {}
34 ~TimerEvent() { event_del(&ev); } 54 ~TimerEvent() { event_del(&ev); }
35 event ev; 55 event ev;
36 std::unique_ptr<QueuedTask> task; 56 std::unique_ptr<QueuedTask> task;
37 }; 57 };
38 58
39 bool SetNonBlocking(int fd) { 59 bool SetNonBlocking(int fd) {
(...skipping 24 matching lines...) Expand all
64 } // namespace 84 } // namespace
65 85
66 struct TaskQueue::QueueContext { 86 struct TaskQueue::QueueContext {
67 explicit QueueContext(TaskQueue* q) : queue(q), is_active(true) {} 87 explicit QueueContext(TaskQueue* q) : queue(q), is_active(true) {}
68 TaskQueue* queue; 88 TaskQueue* queue;
69 bool is_active; 89 bool is_active;
70 // Holds a list of events pending timers for cleanup when the loop exits. 90 // Holds a list of events pending timers for cleanup when the loop exits.
71 std::list<TimerEvent*> pending_timers_; 91 std::list<TimerEvent*> pending_timers_;
72 }; 92 };
73 93
94 // Posting a reply task is tricky business. This class owns the reply task
95 // and a reference to it is held by both the reply queue and the first task.
96 // Here's an outline of what happens when dealing with a reply task.
97 // * The ReplyTaskOwner owns the |reply_| task.
98 // * One ref owned by PostAndReplyTask
99 // * One ref owned by the reply TaskQueue
100 // * ReplyTaskOwner has a flag |run_task_| initially set to false.
101 // * ReplyTaskOwner has a method: HasOneRef() (provided by RefCountedObject).
102 // * After successfully running the original |task_|, PostAndReplyTask() calls
103 // set_should_run_task(). This sets |run_task_| to true.
104 // * In PostAndReplyTask's dtor:
105 // * It releases its reference to ReplyTaskOwner (important to do this first).
106 // * Sends (write()) a kRunReplyTask message to the reply queue's pipe.
107 // * PostAndReplyTask doesn't care if write() fails, but when it does:
108 // * The reply queue is gone.
109 // * ReplyTaskOwner has already been deleted and the reply task too.
110 // * If write() succeeds:
111 // * ReplyQueue receives the kRunReplyTask message
112 // * Goes through all pending tasks, finding the first that HasOneRef()
113 // * Calls ReplyTaskOwner::Run()
114 // * if set_should_run_task() was called, the reply task will be run
115 // * Release the reference to ReplyTaskOwner
116 // * ReplyTaskOwner and associated |reply_| are deleted.
117 class TaskQueue::ReplyTaskOwner {
118 public:
119 ReplyTaskOwner(std::unique_ptr<QueuedTask> reply)
120 : reply_(std::move(reply)) {}
121
122 void Run() {
123 RTC_DCHECK(reply_);
Taylor Brandstetter 2017/02/22 00:24:27 Any reason the DCHECK is in Run and not the constr
tommi 2017/02/22 12:43:54 In case Run() ever gets called twice and reply_.re
124 if (run_task_) {
125 if (!reply_->Run())
126 reply_.release();
127 }
128 }
129
130 void set_should_run_task() {
131 RTC_DCHECK(!run_task_);
132 run_task_ = true;
133 }
134
135 private:
136 std::unique_ptr<QueuedTask> reply_;
137 bool run_task_ = false;
138 };
139
74 class TaskQueue::PostAndReplyTask : public QueuedTask { 140 class TaskQueue::PostAndReplyTask : public QueuedTask {
75 public: 141 public:
76 PostAndReplyTask(std::unique_ptr<QueuedTask> task, 142 PostAndReplyTask(std::unique_ptr<QueuedTask> task,
77 std::unique_ptr<QueuedTask> reply, 143 std::unique_ptr<QueuedTask> reply,
78 TaskQueue* reply_queue) 144 TaskQueue* reply_queue,
145 int reply_pipe)
79 : task_(std::move(task)), 146 : task_(std::move(task)),
80 reply_(std::move(reply)), 147 reply_pipe_(reply_pipe),
81 reply_queue_(reply_queue) { 148 reply_task_owner_(
82 reply_queue->PrepareReplyTask(this); 149 new RefCountedObject<ReplyTaskOwner>(std::move(reply))) {
150 reply_queue->PrepareReplyTask(reply_task_owner_);
83 } 151 }
84 152
85 ~PostAndReplyTask() override { 153 ~PostAndReplyTask() override {
86 CritScope lock(&lock_); 154 reply_task_owner_ = nullptr;
87 if (reply_queue_) 155 IgnoreSigPipeSignalOnCurrentThread();
88 reply_queue_->ReplyTaskDone(this); 156 char message = kRunReplyTask;
89 } 157 write(reply_pipe_, &message, sizeof(message));
Taylor Brandstetter 2017/02/22 00:24:27 Could you leave a comment here explaining that if
tommi 2017/02/22 12:43:54 Done.
90
91 void OnReplyQueueGone() {
92 CritScope lock(&lock_);
93 reply_queue_ = nullptr;
94 } 158 }
95 159
96 private: 160 private:
97 bool Run() override { 161 bool Run() override {
98 if (!task_->Run()) 162 if (!task_->Run())
99 task_.release(); 163 task_.release();
100 164 reply_task_owner_->set_should_run_task();
101 CritScope lock(&lock_);
102 if (reply_queue_)
103 reply_queue_->PostTask(std::move(reply_));
104 return true; 165 return true;
105 } 166 }
106 167
107 CriticalSection lock_;
108 std::unique_ptr<QueuedTask> task_; 168 std::unique_ptr<QueuedTask> task_;
109 std::unique_ptr<QueuedTask> reply_; 169 int reply_pipe_;
110 TaskQueue* reply_queue_ GUARDED_BY(lock_); 170 scoped_refptr<RefCountedObject<ReplyTaskOwner>> reply_task_owner_;
111 }; 171 };
112 172
113 class TaskQueue::SetTimerTask : public QueuedTask { 173 class TaskQueue::SetTimerTask : public QueuedTask {
114 public: 174 public:
115 SetTimerTask(std::unique_ptr<QueuedTask> task, uint32_t milliseconds) 175 SetTimerTask(std::unique_ptr<QueuedTask> task, uint32_t milliseconds)
116 : task_(std::move(task)), 176 : task_(std::move(task)),
117 milliseconds_(milliseconds), 177 milliseconds_(milliseconds),
118 posted_(Time32()) {} 178 posted_(Time32()) {}
119 179
120 private: 180 private:
(...skipping 16 matching lines...) Expand all
137 : event_base_(event_base_new()), 197 : event_base_(event_base_new()),
138 wakeup_event_(new event()), 198 wakeup_event_(new event()),
139 thread_(&TaskQueue::ThreadMain, this, queue_name) { 199 thread_(&TaskQueue::ThreadMain, this, queue_name) {
140 RTC_DCHECK(queue_name); 200 RTC_DCHECK(queue_name);
141 int fds[2]; 201 int fds[2];
142 RTC_CHECK(pipe(fds) == 0); 202 RTC_CHECK(pipe(fds) == 0);
143 SetNonBlocking(fds[0]); 203 SetNonBlocking(fds[0]);
144 SetNonBlocking(fds[1]); 204 SetNonBlocking(fds[1]);
145 wakeup_pipe_out_ = fds[0]; 205 wakeup_pipe_out_ = fds[0];
146 wakeup_pipe_in_ = fds[1]; 206 wakeup_pipe_in_ = fds[1];
207
147 EventAssign(wakeup_event_.get(), event_base_, wakeup_pipe_out_, 208 EventAssign(wakeup_event_.get(), event_base_, wakeup_pipe_out_,
148 EV_READ | EV_PERSIST, OnWakeup, this); 209 EV_READ | EV_PERSIST, OnWakeup, this);
149 event_add(wakeup_event_.get(), 0); 210 event_add(wakeup_event_.get(), 0);
150 thread_.Start(); 211 thread_.Start();
151 } 212 }
152 213
153 TaskQueue::~TaskQueue() { 214 TaskQueue::~TaskQueue() {
154 RTC_DCHECK(!IsCurrent()); 215 RTC_DCHECK(!IsCurrent());
155 struct timespec ts; 216 struct timespec ts;
156 char message = kQuit; 217 char message = kQuit;
157 while (write(wakeup_pipe_in_, &message, sizeof(message)) != sizeof(message)) { 218 while (write(wakeup_pipe_in_, &message, sizeof(message)) != sizeof(message)) {
158 // The queue is full, so we have no choice but to wait and retry. 219 // The queue is full, so we have no choice but to wait and retry.
159 RTC_CHECK_EQ(EAGAIN, errno); 220 RTC_CHECK_EQ(EAGAIN, errno);
160 ts.tv_sec = 0; 221 ts.tv_sec = 0;
161 ts.tv_nsec = 1000000; 222 ts.tv_nsec = 1000000;
162 nanosleep(&ts, nullptr); 223 nanosleep(&ts, nullptr);
163 } 224 }
164 225
165 thread_.Stop(); 226 thread_.Stop();
166 227
167 event_del(wakeup_event_.get()); 228 event_del(wakeup_event_.get());
229
230 IgnoreSigPipeSignalOnCurrentThread();
Taylor Brandstetter 2017/02/22 00:24:27 So, does this mean after the first TaskQueue is de
tommi 2017/02/22 12:43:54 Permanently turned off for the current thread, not
231
168 close(wakeup_pipe_in_); 232 close(wakeup_pipe_in_);
169 close(wakeup_pipe_out_); 233 close(wakeup_pipe_out_);
170 wakeup_pipe_in_ = -1; 234 wakeup_pipe_in_ = -1;
171 wakeup_pipe_out_ = -1; 235 wakeup_pipe_out_ = -1;
172 236
173 {
174 // Synchronize against any pending reply tasks that might be running on
175 // other queues.
176 CritScope lock(&pending_lock_);
177 for (auto* reply : pending_replies_)
178 reply->OnReplyQueueGone();
179 pending_replies_.clear();
180 }
181
182 event_base_free(event_base_); 237 event_base_free(event_base_);
183 } 238 }
184 239
185 // static 240 // static
186 TaskQueue* TaskQueue::Current() { 241 TaskQueue* TaskQueue::Current() {
187 QueueContext* ctx = 242 QueueContext* ctx =
188 static_cast<QueueContext*>(pthread_getspecific(GetQueuePtrTls())); 243 static_cast<QueueContext*>(pthread_getspecific(GetQueuePtrTls()));
189 return ctx ? ctx->queue : nullptr; 244 return ctx ? ctx->queue : nullptr;
190 } 245 }
191 246
(...skipping 47 matching lines...) Expand 10 before | Expand all | Expand 10 after
239 } else { 294 } else {
240 PostTask(std::unique_ptr<QueuedTask>( 295 PostTask(std::unique_ptr<QueuedTask>(
241 new SetTimerTask(std::move(task), milliseconds))); 296 new SetTimerTask(std::move(task), milliseconds)));
242 } 297 }
243 } 298 }
244 299
245 void TaskQueue::PostTaskAndReply(std::unique_ptr<QueuedTask> task, 300 void TaskQueue::PostTaskAndReply(std::unique_ptr<QueuedTask> task,
246 std::unique_ptr<QueuedTask> reply, 301 std::unique_ptr<QueuedTask> reply,
247 TaskQueue* reply_queue) { 302 TaskQueue* reply_queue) {
248 std::unique_ptr<QueuedTask> wrapper_task( 303 std::unique_ptr<QueuedTask> wrapper_task(
249 new PostAndReplyTask(std::move(task), std::move(reply), reply_queue)); 304 new PostAndReplyTask(std::move(task), std::move(reply), reply_queue,
305 reply_queue->wakeup_pipe_in_));
250 PostTask(std::move(wrapper_task)); 306 PostTask(std::move(wrapper_task));
251 } 307 }
252 308
253 void TaskQueue::PostTaskAndReply(std::unique_ptr<QueuedTask> task, 309 void TaskQueue::PostTaskAndReply(std::unique_ptr<QueuedTask> task,
254 std::unique_ptr<QueuedTask> reply) { 310 std::unique_ptr<QueuedTask> reply) {
255 return PostTaskAndReply(std::move(task), std::move(reply), Current()); 311 return PostTaskAndReply(std::move(task), std::move(reply), Current());
256 } 312 }
257 313
258 // static 314 // static
259 bool TaskQueue::ThreadMain(void* context) { 315 bool TaskQueue::ThreadMain(void* context) {
(...skipping 31 matching lines...) Expand 10 before | Expand all | Expand 10 after
291 CritScope lock(&ctx->queue->pending_lock_); 347 CritScope lock(&ctx->queue->pending_lock_);
292 RTC_DCHECK(!ctx->queue->pending_.empty()); 348 RTC_DCHECK(!ctx->queue->pending_.empty());
293 task = std::move(ctx->queue->pending_.front()); 349 task = std::move(ctx->queue->pending_.front());
294 ctx->queue->pending_.pop_front(); 350 ctx->queue->pending_.pop_front();
295 RTC_DCHECK(task.get()); 351 RTC_DCHECK(task.get());
296 } 352 }
297 if (!task->Run()) 353 if (!task->Run())
298 task.release(); 354 task.release();
299 break; 355 break;
300 } 356 }
357 case kRunReplyTask: {
358 scoped_refptr<ReplyTaskOwnerRef> reply_task;
359 {
360 CritScope lock(&ctx->queue->pending_lock_);
361 for (auto it = ctx->queue->pending_replies_.begin();
362 it != ctx->queue->pending_replies_.end(); ++it) {
363 if ((*it)->HasOneRef()) {
364 reply_task = std::move(*it);
365 ctx->queue->pending_replies_.erase(it);
366 break;
367 }
368 }
369 }
370 reply_task->Run();
371 break;
372 }
301 default: 373 default:
302 RTC_NOTREACHED(); 374 RTC_NOTREACHED();
303 break; 375 break;
304 } 376 }
305 } 377 }
306 378
307 // static 379 // static
308 void TaskQueue::RunTask(int fd, short flags, void* context) { // NOLINT 380 void TaskQueue::RunTask(int fd, short flags, void* context) { // NOLINT
309 auto* task = static_cast<QueuedTask*>(context); 381 auto* task = static_cast<QueuedTask*>(context);
310 if (task->Run()) 382 if (task->Run())
311 delete task; 383 delete task;
312 } 384 }
313 385
314 // static 386 // static
315 void TaskQueue::RunTimer(int fd, short flags, void* context) { // NOLINT 387 void TaskQueue::RunTimer(int fd, short flags, void* context) { // NOLINT
316 TimerEvent* timer = static_cast<TimerEvent*>(context); 388 TimerEvent* timer = static_cast<TimerEvent*>(context);
317 if (!timer->task->Run()) 389 if (!timer->task->Run())
318 timer->task.release(); 390 timer->task.release();
319 QueueContext* ctx = 391 QueueContext* ctx =
320 static_cast<QueueContext*>(pthread_getspecific(GetQueuePtrTls())); 392 static_cast<QueueContext*>(pthread_getspecific(GetQueuePtrTls()));
321 ctx->pending_timers_.remove(timer); 393 ctx->pending_timers_.remove(timer);
322 delete timer; 394 delete timer;
323 } 395 }
324 396
325 void TaskQueue::PrepareReplyTask(PostAndReplyTask* reply_task) { 397 void TaskQueue::PrepareReplyTask(scoped_refptr<ReplyTaskOwnerRef> reply_task) {
326 RTC_DCHECK(reply_task); 398 RTC_DCHECK(reply_task);
327 CritScope lock(&pending_lock_); 399 CritScope lock(&pending_lock_);
328 pending_replies_.push_back(reply_task); 400 pending_replies_.push_back(std::move(reply_task));
329 }
330
331 void TaskQueue::ReplyTaskDone(PostAndReplyTask* reply_task) {
332 CritScope lock(&pending_lock_);
333 pending_replies_.remove(reply_task);
334 } 401 }
335 402
336 } // namespace rtc 403 } // namespace rtc
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698