Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(78)

Side by Side Diff: webrtc/base/task_queue_libevent.cc

Issue 2706923005: Showing alternative approach to fixing TaskQueue deadlock. (Closed)
Patch Set: Created 3 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « webrtc/base/task_queue.h ('k') | webrtc/base/task_queue_unittest.cc » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 /* 1 /*
2 * Copyright 2016 The WebRTC Project Authors. All rights reserved. 2 * Copyright 2016 The WebRTC Project Authors. All rights reserved.
3 * 3 *
4 * Use of this source code is governed by a BSD-style license 4 * Use of this source code is governed by a BSD-style license
5 * that can be found in the LICENSE file in the root of the source 5 * that can be found in the LICENSE file in the root of the source
6 * tree. An additional intellectual property rights grant can be found 6 * tree. An additional intellectual property rights grant can be found
7 * in the file PATENTS. All contributing project authors may 7 * in the file PATENTS. All contributing project authors may
8 * be found in the AUTHORS file in the root of the source tree. 8 * be found in the AUTHORS file in the root of the source tree.
9 */ 9 */
10 10
11 #include "webrtc/base/task_queue.h" 11 #include "webrtc/base/task_queue.h"
12 12
13 #include <fcntl.h> 13 #include <fcntl.h>
14 #include <string.h> 14 #include <string.h>
15 #include <unistd.h> 15 #include <unistd.h>
16 #include <time.h>
16 17
17 #include "base/third_party/libevent/event.h" 18 #include "base/third_party/libevent/event.h"
18 #include "webrtc/base/checks.h" 19 #include "webrtc/base/checks.h"
19 #include "webrtc/base/logging.h" 20 #include "webrtc/base/logging.h"
20 #include "webrtc/base/task_queue_posix.h" 21 #include "webrtc/base/task_queue_posix.h"
21 #include "webrtc/base/timeutils.h" 22 #include "webrtc/base/timeutils.h"
22 23
23 namespace rtc { 24 namespace rtc {
24 using internal::GetQueuePtrTls; 25 using internal::GetQueuePtrTls;
25 using internal::AutoSetCurrentQueuePtr; 26 using internal::AutoSetCurrentQueuePtr;
(...skipping 38 matching lines...) Expand 10 before | Expand all | Expand 10 after
64 } // namespace 65 } // namespace
65 66
66 struct TaskQueue::QueueContext { 67 struct TaskQueue::QueueContext {
67 explicit QueueContext(TaskQueue* q) : queue(q), is_active(true) {} 68 explicit QueueContext(TaskQueue* q) : queue(q), is_active(true) {}
68 TaskQueue* queue; 69 TaskQueue* queue;
69 bool is_active; 70 bool is_active;
70 // Holds a list of events pending timers for cleanup when the loop exits. 71 // Holds a list of events pending timers for cleanup when the loop exits.
71 std::list<TimerEvent*> pending_timers_; 72 std::list<TimerEvent*> pending_timers_;
72 }; 73 };
73 74
74 class TaskQueue::PostAndReplyTask : public QueuedTask { 75 class TaskQueue::PostAndReplyTask {
75 public: 76 public:
76 PostAndReplyTask(std::unique_ptr<QueuedTask> task, 77 PostAndReplyTask(std::unique_ptr<QueuedTask> task,
77 std::unique_ptr<QueuedTask> reply, 78 std::unique_ptr<QueuedTask> reply,
78 TaskQueue* reply_queue) 79 TaskQueue* reply_queue)
79 : task_(std::move(task)), 80 : task_(std::move(task)),
80 reply_(std::move(reply)), 81 reply_(std::move(reply)),
81 reply_queue_(reply_queue) { 82 reply_queue_(reply_queue) {
82 reply_queue->PrepareReplyTask(this);
83 }
84
85 ~PostAndReplyTask() override {
86 CritScope lock(&lock_);
87 if (reply_queue_)
88 reply_queue_->ReplyTaskDone(this);
89 } 83 }
90 84
91 void OnReplyQueueGone() { 85 void OnReplyQueueGone() {
92 CritScope lock(&lock_); 86 CritScope lock(&lock_);
93 reply_queue_ = nullptr; 87 reply_queue_ = nullptr;
94 } 88 }
95 89
96 private: 90 private:
97 bool Run() override { 91 bool Run() {
98 if (!task_->Run()) 92 if (!task_->Run())
99 task_.release(); 93 task_.release();
100 94
101 CritScope lock(&lock_); 95 CritScope lock(&lock_);
102 if (reply_queue_) 96 if (reply_queue_) {
103 reply_queue_->PostTask(std::move(reply_)); 97 reply_queue_->PostTask(std::move(reply_));
98 reply_queue_->PostAndReplyTaskDone(this);
99 }
104 return true; 100 return true;
105 } 101 }
106 102
107 CriticalSection lock_; 103 CriticalSection lock_;
108 std::unique_ptr<QueuedTask> task_; 104 std::unique_ptr<QueuedTask> task_;
109 std::unique_ptr<QueuedTask> reply_; 105 std::unique_ptr<QueuedTask> reply_;
110 TaskQueue* reply_queue_ GUARDED_BY(lock_); 106 TaskQueue* reply_queue_ GUARDED_BY(lock_);
107 friend class PostAndReplyTaskRefOwner;
108 };
109
110 // Inherits from QueuedTask and owns a ref to PostAndReplyTaskRef.
111 class TaskQueue::PostAndReplyTaskRefOwner : public QueuedTask {
112 public:
113 PostAndReplyTaskRefOwner(scoped_refptr<PostAndReplyTaskRef> task)
114 : task_(task) {}
115
116 private:
117 bool Run() override { return task_->Run(); }
118
119 scoped_refptr<PostAndReplyTaskRef> task_;
111 }; 120 };
112 121
113 class TaskQueue::SetTimerTask : public QueuedTask { 122 class TaskQueue::SetTimerTask : public QueuedTask {
114 public: 123 public:
115 SetTimerTask(std::unique_ptr<QueuedTask> task, uint32_t milliseconds) 124 SetTimerTask(std::unique_ptr<QueuedTask> task, uint32_t milliseconds)
116 : task_(std::move(task)), 125 : task_(std::move(task)),
117 milliseconds_(milliseconds), 126 milliseconds_(milliseconds),
118 posted_(Time32()) {} 127 posted_(Time32()) {}
119 128
120 private: 129 private:
(...skipping 24 matching lines...) Expand all
145 wakeup_pipe_out_ = fds[0]; 154 wakeup_pipe_out_ = fds[0];
146 wakeup_pipe_in_ = fds[1]; 155 wakeup_pipe_in_ = fds[1];
147 EventAssign(wakeup_event_.get(), event_base_, wakeup_pipe_out_, 156 EventAssign(wakeup_event_.get(), event_base_, wakeup_pipe_out_,
148 EV_READ | EV_PERSIST, OnWakeup, this); 157 EV_READ | EV_PERSIST, OnWakeup, this);
149 event_add(wakeup_event_.get(), 0); 158 event_add(wakeup_event_.get(), 0);
150 thread_.Start(); 159 thread_.Start();
151 } 160 }
152 161
153 TaskQueue::~TaskQueue() { 162 TaskQueue::~TaskQueue() {
154 RTC_DCHECK(!IsCurrent()); 163 RTC_DCHECK(!IsCurrent());
164 destroying_ = true;
155 struct timespec ts; 165 struct timespec ts;
156 char message = kQuit; 166 char message = kQuit;
157 while (write(wakeup_pipe_in_, &message, sizeof(message)) != sizeof(message)) { 167 while (write(wakeup_pipe_in_, &message, sizeof(message)) != sizeof(message)) {
158 // The queue is full, so we have no choice but to wait and retry. 168 // The queue is full, so we have no choice but to wait and retry.
159 RTC_CHECK_EQ(EAGAIN, errno); 169 RTC_CHECK_EQ(EAGAIN, errno);
160 ts.tv_sec = 0; 170 ts.tv_sec = 0;
161 ts.tv_nsec = 1000000; 171 ts.tv_nsec = 1000000;
162 nanosleep(&ts, nullptr); 172 nanosleep(&ts, nullptr);
163 } 173 }
164 174
165 thread_.Stop(); 175 thread_.Stop();
166 176
167 event_del(wakeup_event_.get()); 177 event_del(wakeup_event_.get());
168 close(wakeup_pipe_in_); 178 close(wakeup_pipe_in_);
169 close(wakeup_pipe_out_); 179 close(wakeup_pipe_out_);
170 wakeup_pipe_in_ = -1; 180 wakeup_pipe_in_ = -1;
171 wakeup_pipe_out_ = -1; 181 wakeup_pipe_out_ = -1;
172 182
183 std::list<scoped_refptr<PostAndReplyTaskRef>> pending_replies;
173 { 184 {
174 // Synchronize against any pending reply tasks that might be running on 185 // Synchronize against any pending reply tasks that might be running on
175 // other queues. 186 // other queues. Note that, since |destroying_| was set to true above, it
187 // will be impossible for |pending_replies_| to grow after this.
176 CritScope lock(&pending_lock_); 188 CritScope lock(&pending_lock_);
177 for (auto* reply : pending_replies_) 189 pending_replies.swap(pending_replies_);
178 reply->OnReplyQueueGone();
179 pending_replies_.clear();
180 } 190 }
191 for (auto reply : pending_replies) {
192 reply->OnReplyQueueGone();
193 }
194 pending_replies.clear();
181 195
182 event_base_free(event_base_); 196 event_base_free(event_base_);
183 } 197 }
184 198
185 // static 199 // static
186 TaskQueue* TaskQueue::Current() { 200 TaskQueue* TaskQueue::Current() {
187 QueueContext* ctx = 201 QueueContext* ctx =
188 static_cast<QueueContext*>(pthread_getspecific(GetQueuePtrTls())); 202 static_cast<QueueContext*>(pthread_getspecific(GetQueuePtrTls()));
189 return ctx ? ctx->queue : nullptr; 203 return ctx ? ctx->queue : nullptr;
190 } 204 }
(...skipping 47 matching lines...) Expand 10 before | Expand all | Expand 10 after
238 event_add(&timer->ev, &tv); 252 event_add(&timer->ev, &tv);
239 } else { 253 } else {
240 PostTask(std::unique_ptr<QueuedTask>( 254 PostTask(std::unique_ptr<QueuedTask>(
241 new SetTimerTask(std::move(task), milliseconds))); 255 new SetTimerTask(std::move(task), milliseconds)));
242 } 256 }
243 } 257 }
244 258
245 void TaskQueue::PostTaskAndReply(std::unique_ptr<QueuedTask> task, 259 void TaskQueue::PostTaskAndReply(std::unique_ptr<QueuedTask> task,
246 std::unique_ptr<QueuedTask> reply, 260 std::unique_ptr<QueuedTask> reply,
247 TaskQueue* reply_queue) { 261 TaskQueue* reply_queue) {
248 std::unique_ptr<QueuedTask> wrapper_task( 262 scoped_refptr<PostAndReplyTaskRef> ref(
249 new PostAndReplyTask(std::move(task), std::move(reply), reply_queue)); 263 new PostAndReplyTaskRef(std::move(task), std::move(reply), reply_queue));
250 PostTask(std::move(wrapper_task)); 264 reply_queue->PrepareReplyTask(ref);
265 PostTask(std::unique_ptr<QueuedTask>(new PostAndReplyTaskRefOwner(ref)));
251 } 266 }
252 267
253 void TaskQueue::PostTaskAndReply(std::unique_ptr<QueuedTask> task, 268 void TaskQueue::PostTaskAndReply(std::unique_ptr<QueuedTask> task,
254 std::unique_ptr<QueuedTask> reply) { 269 std::unique_ptr<QueuedTask> reply) {
255 return PostTaskAndReply(std::move(task), std::move(reply), Current()); 270 return PostTaskAndReply(std::move(task), std::move(reply), Current());
256 } 271 }
257 272
258 // static 273 // static
259 bool TaskQueue::ThreadMain(void* context) { 274 bool TaskQueue::ThreadMain(void* context) {
260 TaskQueue* me = static_cast<TaskQueue*>(context); 275 TaskQueue* me = static_cast<TaskQueue*>(context);
(...skipping 54 matching lines...) Expand 10 before | Expand all | Expand 10 after
315 void TaskQueue::RunTimer(int fd, short flags, void* context) { // NOLINT 330 void TaskQueue::RunTimer(int fd, short flags, void* context) { // NOLINT
316 TimerEvent* timer = static_cast<TimerEvent*>(context); 331 TimerEvent* timer = static_cast<TimerEvent*>(context);
317 if (!timer->task->Run()) 332 if (!timer->task->Run())
318 timer->task.release(); 333 timer->task.release();
319 QueueContext* ctx = 334 QueueContext* ctx =
320 static_cast<QueueContext*>(pthread_getspecific(GetQueuePtrTls())); 335 static_cast<QueueContext*>(pthread_getspecific(GetQueuePtrTls()));
321 ctx->pending_timers_.remove(timer); 336 ctx->pending_timers_.remove(timer);
322 delete timer; 337 delete timer;
323 } 338 }
324 339
325 void TaskQueue::PrepareReplyTask(PostAndReplyTask* reply_task) { 340 void TaskQueue::PrepareReplyTask(
341 scoped_refptr<PostAndReplyTaskRef> reply_task) {
326 RTC_DCHECK(reply_task); 342 RTC_DCHECK(reply_task);
327 CritScope lock(&pending_lock_); 343 CritScope lock(&pending_lock_);
344 if (destroying_) {
345 return;
346 }
328 pending_replies_.push_back(reply_task); 347 pending_replies_.push_back(reply_task);
329 } 348 }
330 349
331 void TaskQueue::ReplyTaskDone(PostAndReplyTask* reply_task) { 350 void TaskQueue::PostAndReplyTaskDone(PostAndReplyTask* reply_task) {
332 CritScope lock(&pending_lock_); 351 CritScope lock(&pending_lock_);
333 pending_replies_.remove(reply_task); 352 // Note: If the destructor is currently being run, and "pending_replies_" is
353 // already swapped out, this may fail to find |reply_task|, which is ok.
354 pending_replies_.remove_if(
355 [reply_task](scoped_refptr<PostAndReplyTaskRef> task) {
356 return task.get() == reply_task;
357 });
334 } 358 }
335 359
336 } // namespace rtc 360 } // namespace rtc
OLDNEW
« no previous file with comments | « webrtc/base/task_queue.h ('k') | webrtc/base/task_queue_unittest.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698