OLD | NEW |
---|---|
1 /* | 1 /* |
2 * Copyright 2016 The WebRTC Project Authors. All rights reserved. | 2 * Copyright 2016 The WebRTC Project Authors. All rights reserved. |
3 * | 3 * |
4 * Use of this source code is governed by a BSD-style license | 4 * Use of this source code is governed by a BSD-style license |
5 * that can be found in the LICENSE file in the root of the source | 5 * that can be found in the LICENSE file in the root of the source |
6 * tree. An additional intellectual property rights grant can be found | 6 * tree. An additional intellectual property rights grant can be found |
7 * in the file PATENTS. All contributing project authors may | 7 * in the file PATENTS. All contributing project authors may |
8 * be found in the AUTHORS file in the root of the source tree. | 8 * be found in the AUTHORS file in the root of the source tree. |
9 */ | 9 */ |
10 | 10 |
11 #include "webrtc/base/task_queue.h" | 11 #include "webrtc/base/task_queue.h" |
12 | 12 |
13 #include <fcntl.h> | 13 #include <fcntl.h> |
14 #include <signal.h> | |
14 #include <string.h> | 15 #include <string.h> |
15 #include <unistd.h> | 16 #include <unistd.h> |
16 | 17 |
17 #include "base/third_party/libevent/event.h" | 18 #include "base/third_party/libevent/event.h" |
18 #include "webrtc/base/checks.h" | 19 #include "webrtc/base/checks.h" |
19 #include "webrtc/base/logging.h" | 20 #include "webrtc/base/logging.h" |
20 #include "webrtc/base/task_queue_posix.h" | 21 #include "webrtc/base/task_queue_posix.h" |
21 #include "webrtc/base/timeutils.h" | 22 #include "webrtc/base/timeutils.h" |
22 | 23 |
23 namespace rtc { | 24 namespace rtc { |
24 using internal::GetQueuePtrTls; | 25 using internal::GetQueuePtrTls; |
25 using internal::AutoSetCurrentQueuePtr; | 26 using internal::AutoSetCurrentQueuePtr; |
26 | 27 |
27 namespace { | 28 namespace { |
28 static const char kQuit = 1; | 29 static const char kQuit = 1; |
29 static const char kRunTask = 2; | 30 static const char kRunTask = 2; |
31 static const char kRunReplyTask = 3; | |
32 | |
33 // This ignores the SIGPIPE signal on the calling thread. | |
34 // This signal can be fired when trying to write() to a pipe that's being | |
35 // closed or while closing a pipe that's being written to. | |
36 // We can run into that situation and handle it (e.g. reply tasks that don't | |
Taylor Brandstetter
2017/02/22 00:24:27
It's not clear to me what "We can run into that si
tommi
2017/02/22 12:43:54
thanks, yeah this isn't clear. "handle" should be
| |
37 // get a chance to run because the task queue is being deleted). | |
38 // It would be great if we could restore the sigmask, but unfortunately, | |
39 // restoring it, can actually cause SIGPIPE to be signaled :-| Which by default | |
40 // causes the process to be terminated. | |
41 // An alternative to this approach is to call: | |
42 // signal(SIGPIPE, SIG_IGN); | |
43 // However, doing so affects the whole process. | |
44 void IgnoreSigPipeSignalOnCurrentThread() { | |
45 sigset_t sigpipe_mask; | |
46 sigemptyset(&sigpipe_mask); | |
47 sigaddset(&sigpipe_mask, SIGPIPE); | |
48 pthread_sigmask(SIG_BLOCK, &sigpipe_mask, nullptr); | |
49 } | |
30 | 50 |
31 struct TimerEvent { | 51 struct TimerEvent { |
32 explicit TimerEvent(std::unique_ptr<QueuedTask> task) | 52 explicit TimerEvent(std::unique_ptr<QueuedTask> task) |
33 : task(std::move(task)) {} | 53 : task(std::move(task)) {} |
34 ~TimerEvent() { event_del(&ev); } | 54 ~TimerEvent() { event_del(&ev); } |
35 event ev; | 55 event ev; |
36 std::unique_ptr<QueuedTask> task; | 56 std::unique_ptr<QueuedTask> task; |
37 }; | 57 }; |
38 | 58 |
39 bool SetNonBlocking(int fd) { | 59 bool SetNonBlocking(int fd) { |
(...skipping 24 matching lines...) Expand all Loading... | |
64 } // namespace | 84 } // namespace |
65 | 85 |
66 struct TaskQueue::QueueContext { | 86 struct TaskQueue::QueueContext { |
67 explicit QueueContext(TaskQueue* q) : queue(q), is_active(true) {} | 87 explicit QueueContext(TaskQueue* q) : queue(q), is_active(true) {} |
68 TaskQueue* queue; | 88 TaskQueue* queue; |
69 bool is_active; | 89 bool is_active; |
70 // Holds a list of events pending timers for cleanup when the loop exits. | 90 // Holds a list of events pending timers for cleanup when the loop exits. |
71 std::list<TimerEvent*> pending_timers_; | 91 std::list<TimerEvent*> pending_timers_; |
72 }; | 92 }; |
73 | 93 |
94 // Posting a reply task is tricky business. This class owns the reply task | |
95 // and a reference to it is held by both the reply queue and the first task. | |
96 // Here's an outline of what happens when dealing with a reply task. | |
97 // * The ReplyTaskOwner owns the |reply_| task. | |
98 // * One ref owned by PostAndReplyTask | |
99 // * One ref owned by the reply TaskQueue | |
100 // * ReplyTaskOwner has a flag |run_task_| initially set to false. | |
101 // * ReplyTaskOwner has a method: HasOneRef() (provided by RefCountedObject). | |
102 // * After successfully running the original |task_|, PostAndReplyTask() calls | |
103 // set_should_run_task(). This sets |run_task_| to true. | |
104 // * In PostAndReplyTask's dtor: | |
105 // * It releases its reference to ReplyTaskOwner (important to do this first). | |
106 // * Sends (write()) a kRunReplyTask message to the reply queue's pipe. | |
107 // * PostAndReplyTask doesn't care if write() fails, but when it does: | |
108 // * The reply queue is gone. | |
109 // * ReplyTaskOwner has already been deleted and the reply task too. | |
110 // * If write() succeeds: | |
111 // * ReplyQueue receives the kRunReplyTask message | |
112 // * Goes through all pending tasks, finding the first that HasOneRef() | |
113 // * Calls ReplyTaskOwner::Run() | |
114 // * if set_should_run_task() was called, the reply task will be run | |
115 // * Release the reference to ReplyTaskOwner | |
116 // * ReplyTaskOwner and associated |reply_| are deleted. | |
117 class TaskQueue::ReplyTaskOwner { | |
118 public: | |
119 ReplyTaskOwner(std::unique_ptr<QueuedTask> reply) | |
120 : reply_(std::move(reply)) {} | |
121 | |
122 void Run() { | |
123 RTC_DCHECK(reply_); | |
Taylor Brandstetter
2017/02/22 00:24:27
Any reason the DCHECK is in Run and not the constr
tommi
2017/02/22 12:43:54
In case Run() ever gets called twice and reply_.re
| |
124 if (run_task_) { | |
125 if (!reply_->Run()) | |
126 reply_.release(); | |
127 } | |
128 } | |
129 | |
130 void set_should_run_task() { | |
131 RTC_DCHECK(!run_task_); | |
132 run_task_ = true; | |
133 } | |
134 | |
135 private: | |
136 std::unique_ptr<QueuedTask> reply_; | |
137 bool run_task_ = false; | |
138 }; | |
139 | |
74 class TaskQueue::PostAndReplyTask : public QueuedTask { | 140 class TaskQueue::PostAndReplyTask : public QueuedTask { |
75 public: | 141 public: |
76 PostAndReplyTask(std::unique_ptr<QueuedTask> task, | 142 PostAndReplyTask(std::unique_ptr<QueuedTask> task, |
77 std::unique_ptr<QueuedTask> reply, | 143 std::unique_ptr<QueuedTask> reply, |
78 TaskQueue* reply_queue) | 144 TaskQueue* reply_queue, |
145 int reply_pipe) | |
79 : task_(std::move(task)), | 146 : task_(std::move(task)), |
80 reply_(std::move(reply)), | 147 reply_pipe_(reply_pipe), |
81 reply_queue_(reply_queue) { | 148 reply_task_owner_( |
82 reply_queue->PrepareReplyTask(this); | 149 new RefCountedObject<ReplyTaskOwner>(std::move(reply))) { |
150 reply_queue->PrepareReplyTask(reply_task_owner_); | |
83 } | 151 } |
84 | 152 |
85 ~PostAndReplyTask() override { | 153 ~PostAndReplyTask() override { |
86 CritScope lock(&lock_); | 154 reply_task_owner_ = nullptr; |
87 if (reply_queue_) | 155 IgnoreSigPipeSignalOnCurrentThread(); |
88 reply_queue_->ReplyTaskDone(this); | 156 char message = kRunReplyTask; |
89 } | 157 write(reply_pipe_, &message, sizeof(message)); |
Taylor Brandstetter
2017/02/22 00:24:27
Could you leave a comment here explaining that if
tommi
2017/02/22 12:43:54
Done.
| |
90 | |
91 void OnReplyQueueGone() { | |
92 CritScope lock(&lock_); | |
93 reply_queue_ = nullptr; | |
94 } | 158 } |
95 | 159 |
96 private: | 160 private: |
97 bool Run() override { | 161 bool Run() override { |
98 if (!task_->Run()) | 162 if (!task_->Run()) |
99 task_.release(); | 163 task_.release(); |
100 | 164 reply_task_owner_->set_should_run_task(); |
101 CritScope lock(&lock_); | |
102 if (reply_queue_) | |
103 reply_queue_->PostTask(std::move(reply_)); | |
104 return true; | 165 return true; |
105 } | 166 } |
106 | 167 |
107 CriticalSection lock_; | |
108 std::unique_ptr<QueuedTask> task_; | 168 std::unique_ptr<QueuedTask> task_; |
109 std::unique_ptr<QueuedTask> reply_; | 169 int reply_pipe_; |
110 TaskQueue* reply_queue_ GUARDED_BY(lock_); | 170 scoped_refptr<RefCountedObject<ReplyTaskOwner>> reply_task_owner_; |
111 }; | 171 }; |
112 | 172 |
113 class TaskQueue::SetTimerTask : public QueuedTask { | 173 class TaskQueue::SetTimerTask : public QueuedTask { |
114 public: | 174 public: |
115 SetTimerTask(std::unique_ptr<QueuedTask> task, uint32_t milliseconds) | 175 SetTimerTask(std::unique_ptr<QueuedTask> task, uint32_t milliseconds) |
116 : task_(std::move(task)), | 176 : task_(std::move(task)), |
117 milliseconds_(milliseconds), | 177 milliseconds_(milliseconds), |
118 posted_(Time32()) {} | 178 posted_(Time32()) {} |
119 | 179 |
120 private: | 180 private: |
(...skipping 16 matching lines...) Expand all Loading... | |
137 : event_base_(event_base_new()), | 197 : event_base_(event_base_new()), |
138 wakeup_event_(new event()), | 198 wakeup_event_(new event()), |
139 thread_(&TaskQueue::ThreadMain, this, queue_name) { | 199 thread_(&TaskQueue::ThreadMain, this, queue_name) { |
140 RTC_DCHECK(queue_name); | 200 RTC_DCHECK(queue_name); |
141 int fds[2]; | 201 int fds[2]; |
142 RTC_CHECK(pipe(fds) == 0); | 202 RTC_CHECK(pipe(fds) == 0); |
143 SetNonBlocking(fds[0]); | 203 SetNonBlocking(fds[0]); |
144 SetNonBlocking(fds[1]); | 204 SetNonBlocking(fds[1]); |
145 wakeup_pipe_out_ = fds[0]; | 205 wakeup_pipe_out_ = fds[0]; |
146 wakeup_pipe_in_ = fds[1]; | 206 wakeup_pipe_in_ = fds[1]; |
207 | |
147 EventAssign(wakeup_event_.get(), event_base_, wakeup_pipe_out_, | 208 EventAssign(wakeup_event_.get(), event_base_, wakeup_pipe_out_, |
148 EV_READ | EV_PERSIST, OnWakeup, this); | 209 EV_READ | EV_PERSIST, OnWakeup, this); |
149 event_add(wakeup_event_.get(), 0); | 210 event_add(wakeup_event_.get(), 0); |
150 thread_.Start(); | 211 thread_.Start(); |
151 } | 212 } |
152 | 213 |
153 TaskQueue::~TaskQueue() { | 214 TaskQueue::~TaskQueue() { |
154 RTC_DCHECK(!IsCurrent()); | 215 RTC_DCHECK(!IsCurrent()); |
155 struct timespec ts; | 216 struct timespec ts; |
156 char message = kQuit; | 217 char message = kQuit; |
157 while (write(wakeup_pipe_in_, &message, sizeof(message)) != sizeof(message)) { | 218 while (write(wakeup_pipe_in_, &message, sizeof(message)) != sizeof(message)) { |
158 // The queue is full, so we have no choice but to wait and retry. | 219 // The queue is full, so we have no choice but to wait and retry. |
159 RTC_CHECK_EQ(EAGAIN, errno); | 220 RTC_CHECK_EQ(EAGAIN, errno); |
160 ts.tv_sec = 0; | 221 ts.tv_sec = 0; |
161 ts.tv_nsec = 1000000; | 222 ts.tv_nsec = 1000000; |
162 nanosleep(&ts, nullptr); | 223 nanosleep(&ts, nullptr); |
163 } | 224 } |
164 | 225 |
165 thread_.Stop(); | 226 thread_.Stop(); |
166 | 227 |
167 event_del(wakeup_event_.get()); | 228 event_del(wakeup_event_.get()); |
229 | |
230 IgnoreSigPipeSignalOnCurrentThread(); | |
Taylor Brandstetter
2017/02/22 00:24:27
So, does this mean after the first TaskQueue is de
tommi
2017/02/22 12:43:54
Permanently turned off for the current thread, not
| |
231 | |
168 close(wakeup_pipe_in_); | 232 close(wakeup_pipe_in_); |
169 close(wakeup_pipe_out_); | 233 close(wakeup_pipe_out_); |
170 wakeup_pipe_in_ = -1; | 234 wakeup_pipe_in_ = -1; |
171 wakeup_pipe_out_ = -1; | 235 wakeup_pipe_out_ = -1; |
172 | 236 |
173 { | |
174 // Synchronize against any pending reply tasks that might be running on | |
175 // other queues. | |
176 CritScope lock(&pending_lock_); | |
177 for (auto* reply : pending_replies_) | |
178 reply->OnReplyQueueGone(); | |
179 pending_replies_.clear(); | |
180 } | |
181 | |
182 event_base_free(event_base_); | 237 event_base_free(event_base_); |
183 } | 238 } |
184 | 239 |
185 // static | 240 // static |
186 TaskQueue* TaskQueue::Current() { | 241 TaskQueue* TaskQueue::Current() { |
187 QueueContext* ctx = | 242 QueueContext* ctx = |
188 static_cast<QueueContext*>(pthread_getspecific(GetQueuePtrTls())); | 243 static_cast<QueueContext*>(pthread_getspecific(GetQueuePtrTls())); |
189 return ctx ? ctx->queue : nullptr; | 244 return ctx ? ctx->queue : nullptr; |
190 } | 245 } |
191 | 246 |
(...skipping 47 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
239 } else { | 294 } else { |
240 PostTask(std::unique_ptr<QueuedTask>( | 295 PostTask(std::unique_ptr<QueuedTask>( |
241 new SetTimerTask(std::move(task), milliseconds))); | 296 new SetTimerTask(std::move(task), milliseconds))); |
242 } | 297 } |
243 } | 298 } |
244 | 299 |
245 void TaskQueue::PostTaskAndReply(std::unique_ptr<QueuedTask> task, | 300 void TaskQueue::PostTaskAndReply(std::unique_ptr<QueuedTask> task, |
246 std::unique_ptr<QueuedTask> reply, | 301 std::unique_ptr<QueuedTask> reply, |
247 TaskQueue* reply_queue) { | 302 TaskQueue* reply_queue) { |
248 std::unique_ptr<QueuedTask> wrapper_task( | 303 std::unique_ptr<QueuedTask> wrapper_task( |
249 new PostAndReplyTask(std::move(task), std::move(reply), reply_queue)); | 304 new PostAndReplyTask(std::move(task), std::move(reply), reply_queue, |
305 reply_queue->wakeup_pipe_in_)); | |
250 PostTask(std::move(wrapper_task)); | 306 PostTask(std::move(wrapper_task)); |
251 } | 307 } |
252 | 308 |
253 void TaskQueue::PostTaskAndReply(std::unique_ptr<QueuedTask> task, | 309 void TaskQueue::PostTaskAndReply(std::unique_ptr<QueuedTask> task, |
254 std::unique_ptr<QueuedTask> reply) { | 310 std::unique_ptr<QueuedTask> reply) { |
255 return PostTaskAndReply(std::move(task), std::move(reply), Current()); | 311 return PostTaskAndReply(std::move(task), std::move(reply), Current()); |
256 } | 312 } |
257 | 313 |
258 // static | 314 // static |
259 bool TaskQueue::ThreadMain(void* context) { | 315 bool TaskQueue::ThreadMain(void* context) { |
(...skipping 31 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
291 CritScope lock(&ctx->queue->pending_lock_); | 347 CritScope lock(&ctx->queue->pending_lock_); |
292 RTC_DCHECK(!ctx->queue->pending_.empty()); | 348 RTC_DCHECK(!ctx->queue->pending_.empty()); |
293 task = std::move(ctx->queue->pending_.front()); | 349 task = std::move(ctx->queue->pending_.front()); |
294 ctx->queue->pending_.pop_front(); | 350 ctx->queue->pending_.pop_front(); |
295 RTC_DCHECK(task.get()); | 351 RTC_DCHECK(task.get()); |
296 } | 352 } |
297 if (!task->Run()) | 353 if (!task->Run()) |
298 task.release(); | 354 task.release(); |
299 break; | 355 break; |
300 } | 356 } |
357 case kRunReplyTask: { | |
358 scoped_refptr<ReplyTaskOwnerRef> reply_task; | |
359 { | |
360 CritScope lock(&ctx->queue->pending_lock_); | |
361 for (auto it = ctx->queue->pending_replies_.begin(); | |
362 it != ctx->queue->pending_replies_.end(); ++it) { | |
363 if ((*it)->HasOneRef()) { | |
364 reply_task = std::move(*it); | |
365 ctx->queue->pending_replies_.erase(it); | |
366 break; | |
367 } | |
368 } | |
369 } | |
370 reply_task->Run(); | |
371 break; | |
372 } | |
301 default: | 373 default: |
302 RTC_NOTREACHED(); | 374 RTC_NOTREACHED(); |
303 break; | 375 break; |
304 } | 376 } |
305 } | 377 } |
306 | 378 |
307 // static | 379 // static |
308 void TaskQueue::RunTask(int fd, short flags, void* context) { // NOLINT | 380 void TaskQueue::RunTask(int fd, short flags, void* context) { // NOLINT |
309 auto* task = static_cast<QueuedTask*>(context); | 381 auto* task = static_cast<QueuedTask*>(context); |
310 if (task->Run()) | 382 if (task->Run()) |
311 delete task; | 383 delete task; |
312 } | 384 } |
313 | 385 |
314 // static | 386 // static |
315 void TaskQueue::RunTimer(int fd, short flags, void* context) { // NOLINT | 387 void TaskQueue::RunTimer(int fd, short flags, void* context) { // NOLINT |
316 TimerEvent* timer = static_cast<TimerEvent*>(context); | 388 TimerEvent* timer = static_cast<TimerEvent*>(context); |
317 if (!timer->task->Run()) | 389 if (!timer->task->Run()) |
318 timer->task.release(); | 390 timer->task.release(); |
319 QueueContext* ctx = | 391 QueueContext* ctx = |
320 static_cast<QueueContext*>(pthread_getspecific(GetQueuePtrTls())); | 392 static_cast<QueueContext*>(pthread_getspecific(GetQueuePtrTls())); |
321 ctx->pending_timers_.remove(timer); | 393 ctx->pending_timers_.remove(timer); |
322 delete timer; | 394 delete timer; |
323 } | 395 } |
324 | 396 |
325 void TaskQueue::PrepareReplyTask(PostAndReplyTask* reply_task) { | 397 void TaskQueue::PrepareReplyTask(scoped_refptr<ReplyTaskOwnerRef> reply_task) { |
326 RTC_DCHECK(reply_task); | 398 RTC_DCHECK(reply_task); |
327 CritScope lock(&pending_lock_); | 399 CritScope lock(&pending_lock_); |
328 pending_replies_.push_back(reply_task); | 400 pending_replies_.push_back(std::move(reply_task)); |
329 } | |
330 | |
331 void TaskQueue::ReplyTaskDone(PostAndReplyTask* reply_task) { | |
332 CritScope lock(&pending_lock_); | |
333 pending_replies_.remove(reply_task); | |
334 } | 401 } |
335 | 402 |
336 } // namespace rtc | 403 } // namespace rtc |
OLD | NEW |