OLD | NEW |
1 /* | 1 /* |
2 * Copyright 2016 The WebRTC Project Authors. All rights reserved. | 2 * Copyright 2016 The WebRTC Project Authors. All rights reserved. |
3 * | 3 * |
4 * Use of this source code is governed by a BSD-style license | 4 * Use of this source code is governed by a BSD-style license |
5 * that can be found in the LICENSE file in the root of the source | 5 * that can be found in the LICENSE file in the root of the source |
6 * tree. An additional intellectual property rights grant can be found | 6 * tree. An additional intellectual property rights grant can be found |
7 * in the file PATENTS. All contributing project authors may | 7 * in the file PATENTS. All contributing project authors may |
8 * be found in the AUTHORS file in the root of the source tree. | 8 * be found in the AUTHORS file in the root of the source tree. |
9 */ | 9 */ |
10 | 10 |
11 #include "webrtc/base/task_queue.h" | 11 #include "webrtc/base/task_queue.h" |
12 | 12 |
13 #include <fcntl.h> | 13 #include <fcntl.h> |
| 14 #include <signal.h> |
14 #include <string.h> | 15 #include <string.h> |
15 #include <unistd.h> | 16 #include <unistd.h> |
16 | 17 |
17 #include "base/third_party/libevent/event.h" | 18 #include "base/third_party/libevent/event.h" |
18 #include "webrtc/base/checks.h" | 19 #include "webrtc/base/checks.h" |
19 #include "webrtc/base/logging.h" | 20 #include "webrtc/base/logging.h" |
20 #include "webrtc/base/task_queue_posix.h" | 21 #include "webrtc/base/task_queue_posix.h" |
21 #include "webrtc/base/timeutils.h" | 22 #include "webrtc/base/timeutils.h" |
22 | 23 |
23 namespace rtc { | 24 namespace rtc { |
24 using internal::GetQueuePtrTls; | 25 using internal::GetQueuePtrTls; |
25 using internal::AutoSetCurrentQueuePtr; | 26 using internal::AutoSetCurrentQueuePtr; |
26 | 27 |
27 namespace { | 28 namespace { |
28 static const char kQuit = 1; | 29 static const char kQuit = 1; |
29 static const char kRunTask = 2; | 30 static const char kRunTask = 2; |
| 31 static const char kRunReplyTask = 3; |
| 32 |
| 33 // This ignores the SIGPIPE signal on the calling thread. |
| 34 // This signal can be fired when trying to write() to a pipe that's being |
| 35 // closed or while closing a pipe that's being written to. |
| 36 // We can run into that situation (e.g. reply tasks that don't get a chance to |
| 37 // run because the task queue is being deleted) so we ignore this signal and |
| 38 // continue as normal. |
| 39 // As a side note for this implementation, it would be great if we could safely |
| 40 // restore the sigmask, but unfortunately the operation of restoring it, can |
| 41 // itself actually cause SIGPIPE to be signaled :-| (e.g. on MacOS) |
| 42 // The SIGPIPE signal by default causes the process to be terminated, so we |
| 43 // don't want to risk that. |
| 44 // An alternative to this approach is to ignore the signal for the whole |
| 45 // process: |
| 46 // signal(SIGPIPE, SIG_IGN); |
| 47 void IgnoreSigPipeSignalOnCurrentThread() { |
| 48 sigset_t sigpipe_mask; |
| 49 sigemptyset(&sigpipe_mask); |
| 50 sigaddset(&sigpipe_mask, SIGPIPE); |
| 51 pthread_sigmask(SIG_BLOCK, &sigpipe_mask, nullptr); |
| 52 } |
30 | 53 |
31 struct TimerEvent { | 54 struct TimerEvent { |
32 explicit TimerEvent(std::unique_ptr<QueuedTask> task) | 55 explicit TimerEvent(std::unique_ptr<QueuedTask> task) |
33 : task(std::move(task)) {} | 56 : task(std::move(task)) {} |
34 ~TimerEvent() { event_del(&ev); } | 57 ~TimerEvent() { event_del(&ev); } |
35 event ev; | 58 event ev; |
36 std::unique_ptr<QueuedTask> task; | 59 std::unique_ptr<QueuedTask> task; |
37 }; | 60 }; |
38 | 61 |
39 bool SetNonBlocking(int fd) { | 62 bool SetNonBlocking(int fd) { |
(...skipping 24 matching lines...) Expand all Loading... |
64 } // namespace | 87 } // namespace |
65 | 88 |
66 struct TaskQueue::QueueContext { | 89 struct TaskQueue::QueueContext { |
67 explicit QueueContext(TaskQueue* q) : queue(q), is_active(true) {} | 90 explicit QueueContext(TaskQueue* q) : queue(q), is_active(true) {} |
68 TaskQueue* queue; | 91 TaskQueue* queue; |
69 bool is_active; | 92 bool is_active; |
70 // Holds a list of events pending timers for cleanup when the loop exits. | 93 // Holds a list of events pending timers for cleanup when the loop exits. |
71 std::list<TimerEvent*> pending_timers_; | 94 std::list<TimerEvent*> pending_timers_; |
72 }; | 95 }; |
73 | 96 |
| 97 // Posting a reply task is tricky business. This class owns the reply task |
| 98 // and a reference to it is held by both the reply queue and the first task. |
| 99 // Here's an outline of what happens when dealing with a reply task. |
| 100 // * The ReplyTaskOwner owns the |reply_| task. |
| 101 // * One ref owned by PostAndReplyTask |
| 102 // * One ref owned by the reply TaskQueue |
| 103 // * ReplyTaskOwner has a flag |run_task_| initially set to false. |
| 104 // * ReplyTaskOwner has a method: HasOneRef() (provided by RefCountedObject). |
| 105 // * After successfully running the original |task_|, PostAndReplyTask() calls |
| 106 // set_should_run_task(). This sets |run_task_| to true. |
| 107 // * In PostAndReplyTask's dtor: |
| 108 // * It releases its reference to ReplyTaskOwner (important to do this first). |
| 109 // * Sends (write()) a kRunReplyTask message to the reply queue's pipe. |
| 110 // * PostAndReplyTask doesn't care if write() fails, but when it does: |
| 111 // * The reply queue is gone. |
| 112 // * ReplyTaskOwner has already been deleted and the reply task too. |
| 113 // * If write() succeeds: |
| 114 // * ReplyQueue receives the kRunReplyTask message |
| 115 // * Goes through all pending tasks, finding the first that HasOneRef() |
| 116 // * Calls ReplyTaskOwner::Run() |
| 117 // * if set_should_run_task() was called, the reply task will be run |
| 118 // * Release the reference to ReplyTaskOwner |
| 119 // * ReplyTaskOwner and associated |reply_| are deleted. |
| 120 class TaskQueue::ReplyTaskOwner { |
| 121 public: |
| 122 ReplyTaskOwner(std::unique_ptr<QueuedTask> reply) |
| 123 : reply_(std::move(reply)) {} |
| 124 |
| 125 void Run() { |
| 126 RTC_DCHECK(reply_); |
| 127 if (run_task_) { |
| 128 if (!reply_->Run()) |
| 129 reply_.release(); |
| 130 } |
| 131 reply_.reset(); |
| 132 } |
| 133 |
| 134 void set_should_run_task() { |
| 135 RTC_DCHECK(!run_task_); |
| 136 run_task_ = true; |
| 137 } |
| 138 |
| 139 private: |
| 140 std::unique_ptr<QueuedTask> reply_; |
| 141 bool run_task_ = false; |
| 142 }; |
| 143 |
74 class TaskQueue::PostAndReplyTask : public QueuedTask { | 144 class TaskQueue::PostAndReplyTask : public QueuedTask { |
75 public: | 145 public: |
76 PostAndReplyTask(std::unique_ptr<QueuedTask> task, | 146 PostAndReplyTask(std::unique_ptr<QueuedTask> task, |
77 std::unique_ptr<QueuedTask> reply, | 147 std::unique_ptr<QueuedTask> reply, |
78 TaskQueue* reply_queue) | 148 TaskQueue* reply_queue, |
| 149 int reply_pipe) |
79 : task_(std::move(task)), | 150 : task_(std::move(task)), |
80 reply_(std::move(reply)), | 151 reply_pipe_(reply_pipe), |
81 reply_queue_(reply_queue) { | 152 reply_task_owner_( |
82 reply_queue->PrepareReplyTask(this); | 153 new RefCountedObject<ReplyTaskOwner>(std::move(reply))) { |
| 154 reply_queue->PrepareReplyTask(reply_task_owner_); |
83 } | 155 } |
84 | 156 |
85 ~PostAndReplyTask() override { | 157 ~PostAndReplyTask() override { |
86 CritScope lock(&lock_); | 158 reply_task_owner_ = nullptr; |
87 if (reply_queue_) | 159 IgnoreSigPipeSignalOnCurrentThread(); |
88 reply_queue_->ReplyTaskDone(this); | 160 // Send a signal to the reply queue that the reply task can run now. |
89 } | 161 // Depending on whether |set_should_run_task()| was called by the |
90 | 162 // PostAndReplyTask(), the reply task may or may not actually run. |
91 void OnReplyQueueGone() { | 163 // In either case, it will be deleted. |
92 CritScope lock(&lock_); | 164 char message = kRunReplyTask; |
93 reply_queue_ = nullptr; | 165 write(reply_pipe_, &message, sizeof(message)); |
94 } | 166 } |
95 | 167 |
96 private: | 168 private: |
97 bool Run() override { | 169 bool Run() override { |
98 if (!task_->Run()) | 170 if (!task_->Run()) |
99 task_.release(); | 171 task_.release(); |
100 | 172 reply_task_owner_->set_should_run_task(); |
101 CritScope lock(&lock_); | |
102 if (reply_queue_) | |
103 reply_queue_->PostTask(std::move(reply_)); | |
104 return true; | 173 return true; |
105 } | 174 } |
106 | 175 |
107 CriticalSection lock_; | |
108 std::unique_ptr<QueuedTask> task_; | 176 std::unique_ptr<QueuedTask> task_; |
109 std::unique_ptr<QueuedTask> reply_; | 177 int reply_pipe_; |
110 TaskQueue* reply_queue_ GUARDED_BY(lock_); | 178 scoped_refptr<RefCountedObject<ReplyTaskOwner>> reply_task_owner_; |
111 }; | 179 }; |
112 | 180 |
113 class TaskQueue::SetTimerTask : public QueuedTask { | 181 class TaskQueue::SetTimerTask : public QueuedTask { |
114 public: | 182 public: |
115 SetTimerTask(std::unique_ptr<QueuedTask> task, uint32_t milliseconds) | 183 SetTimerTask(std::unique_ptr<QueuedTask> task, uint32_t milliseconds) |
116 : task_(std::move(task)), | 184 : task_(std::move(task)), |
117 milliseconds_(milliseconds), | 185 milliseconds_(milliseconds), |
118 posted_(Time32()) {} | 186 posted_(Time32()) {} |
119 | 187 |
120 private: | 188 private: |
(...skipping 16 matching lines...) Expand all Loading... |
137 : event_base_(event_base_new()), | 205 : event_base_(event_base_new()), |
138 wakeup_event_(new event()), | 206 wakeup_event_(new event()), |
139 thread_(&TaskQueue::ThreadMain, this, queue_name) { | 207 thread_(&TaskQueue::ThreadMain, this, queue_name) { |
140 RTC_DCHECK(queue_name); | 208 RTC_DCHECK(queue_name); |
141 int fds[2]; | 209 int fds[2]; |
142 RTC_CHECK(pipe(fds) == 0); | 210 RTC_CHECK(pipe(fds) == 0); |
143 SetNonBlocking(fds[0]); | 211 SetNonBlocking(fds[0]); |
144 SetNonBlocking(fds[1]); | 212 SetNonBlocking(fds[1]); |
145 wakeup_pipe_out_ = fds[0]; | 213 wakeup_pipe_out_ = fds[0]; |
146 wakeup_pipe_in_ = fds[1]; | 214 wakeup_pipe_in_ = fds[1]; |
| 215 |
147 EventAssign(wakeup_event_.get(), event_base_, wakeup_pipe_out_, | 216 EventAssign(wakeup_event_.get(), event_base_, wakeup_pipe_out_, |
148 EV_READ | EV_PERSIST, OnWakeup, this); | 217 EV_READ | EV_PERSIST, OnWakeup, this); |
149 event_add(wakeup_event_.get(), 0); | 218 event_add(wakeup_event_.get(), 0); |
150 thread_.Start(); | 219 thread_.Start(); |
151 } | 220 } |
152 | 221 |
153 TaskQueue::~TaskQueue() { | 222 TaskQueue::~TaskQueue() { |
154 RTC_DCHECK(!IsCurrent()); | 223 RTC_DCHECK(!IsCurrent()); |
155 struct timespec ts; | 224 struct timespec ts; |
156 char message = kQuit; | 225 char message = kQuit; |
157 while (write(wakeup_pipe_in_, &message, sizeof(message)) != sizeof(message)) { | 226 while (write(wakeup_pipe_in_, &message, sizeof(message)) != sizeof(message)) { |
158 // The queue is full, so we have no choice but to wait and retry. | 227 // The queue is full, so we have no choice but to wait and retry. |
159 RTC_CHECK_EQ(EAGAIN, errno); | 228 RTC_CHECK_EQ(EAGAIN, errno); |
160 ts.tv_sec = 0; | 229 ts.tv_sec = 0; |
161 ts.tv_nsec = 1000000; | 230 ts.tv_nsec = 1000000; |
162 nanosleep(&ts, nullptr); | 231 nanosleep(&ts, nullptr); |
163 } | 232 } |
164 | 233 |
165 thread_.Stop(); | 234 thread_.Stop(); |
166 | 235 |
167 event_del(wakeup_event_.get()); | 236 event_del(wakeup_event_.get()); |
| 237 |
| 238 IgnoreSigPipeSignalOnCurrentThread(); |
| 239 |
168 close(wakeup_pipe_in_); | 240 close(wakeup_pipe_in_); |
169 close(wakeup_pipe_out_); | 241 close(wakeup_pipe_out_); |
170 wakeup_pipe_in_ = -1; | 242 wakeup_pipe_in_ = -1; |
171 wakeup_pipe_out_ = -1; | 243 wakeup_pipe_out_ = -1; |
172 | 244 |
173 { | |
174 // Synchronize against any pending reply tasks that might be running on | |
175 // other queues. | |
176 CritScope lock(&pending_lock_); | |
177 for (auto* reply : pending_replies_) | |
178 reply->OnReplyQueueGone(); | |
179 pending_replies_.clear(); | |
180 } | |
181 | |
182 event_base_free(event_base_); | 245 event_base_free(event_base_); |
183 } | 246 } |
184 | 247 |
185 // static | 248 // static |
186 TaskQueue* TaskQueue::Current() { | 249 TaskQueue* TaskQueue::Current() { |
187 QueueContext* ctx = | 250 QueueContext* ctx = |
188 static_cast<QueueContext*>(pthread_getspecific(GetQueuePtrTls())); | 251 static_cast<QueueContext*>(pthread_getspecific(GetQueuePtrTls())); |
189 return ctx ? ctx->queue : nullptr; | 252 return ctx ? ctx->queue : nullptr; |
190 } | 253 } |
191 | 254 |
(...skipping 47 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
239 } else { | 302 } else { |
240 PostTask(std::unique_ptr<QueuedTask>( | 303 PostTask(std::unique_ptr<QueuedTask>( |
241 new SetTimerTask(std::move(task), milliseconds))); | 304 new SetTimerTask(std::move(task), milliseconds))); |
242 } | 305 } |
243 } | 306 } |
244 | 307 |
245 void TaskQueue::PostTaskAndReply(std::unique_ptr<QueuedTask> task, | 308 void TaskQueue::PostTaskAndReply(std::unique_ptr<QueuedTask> task, |
246 std::unique_ptr<QueuedTask> reply, | 309 std::unique_ptr<QueuedTask> reply, |
247 TaskQueue* reply_queue) { | 310 TaskQueue* reply_queue) { |
248 std::unique_ptr<QueuedTask> wrapper_task( | 311 std::unique_ptr<QueuedTask> wrapper_task( |
249 new PostAndReplyTask(std::move(task), std::move(reply), reply_queue)); | 312 new PostAndReplyTask(std::move(task), std::move(reply), reply_queue, |
| 313 reply_queue->wakeup_pipe_in_)); |
250 PostTask(std::move(wrapper_task)); | 314 PostTask(std::move(wrapper_task)); |
251 } | 315 } |
252 | 316 |
253 void TaskQueue::PostTaskAndReply(std::unique_ptr<QueuedTask> task, | 317 void TaskQueue::PostTaskAndReply(std::unique_ptr<QueuedTask> task, |
254 std::unique_ptr<QueuedTask> reply) { | 318 std::unique_ptr<QueuedTask> reply) { |
255 return PostTaskAndReply(std::move(task), std::move(reply), Current()); | 319 return PostTaskAndReply(std::move(task), std::move(reply), Current()); |
256 } | 320 } |
257 | 321 |
258 // static | 322 // static |
259 bool TaskQueue::ThreadMain(void* context) { | 323 bool TaskQueue::ThreadMain(void* context) { |
(...skipping 31 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
291 CritScope lock(&ctx->queue->pending_lock_); | 355 CritScope lock(&ctx->queue->pending_lock_); |
292 RTC_DCHECK(!ctx->queue->pending_.empty()); | 356 RTC_DCHECK(!ctx->queue->pending_.empty()); |
293 task = std::move(ctx->queue->pending_.front()); | 357 task = std::move(ctx->queue->pending_.front()); |
294 ctx->queue->pending_.pop_front(); | 358 ctx->queue->pending_.pop_front(); |
295 RTC_DCHECK(task.get()); | 359 RTC_DCHECK(task.get()); |
296 } | 360 } |
297 if (!task->Run()) | 361 if (!task->Run()) |
298 task.release(); | 362 task.release(); |
299 break; | 363 break; |
300 } | 364 } |
| 365 case kRunReplyTask: { |
| 366 scoped_refptr<ReplyTaskOwnerRef> reply_task; |
| 367 { |
| 368 CritScope lock(&ctx->queue->pending_lock_); |
| 369 for (auto it = ctx->queue->pending_replies_.begin(); |
| 370 it != ctx->queue->pending_replies_.end(); ++it) { |
| 371 if ((*it)->HasOneRef()) { |
| 372 reply_task = std::move(*it); |
| 373 ctx->queue->pending_replies_.erase(it); |
| 374 break; |
| 375 } |
| 376 } |
| 377 } |
| 378 reply_task->Run(); |
| 379 break; |
| 380 } |
301 default: | 381 default: |
302 RTC_NOTREACHED(); | 382 RTC_NOTREACHED(); |
303 break; | 383 break; |
304 } | 384 } |
305 } | 385 } |
306 | 386 |
307 // static | 387 // static |
308 void TaskQueue::RunTask(int fd, short flags, void* context) { // NOLINT | 388 void TaskQueue::RunTask(int fd, short flags, void* context) { // NOLINT |
309 auto* task = static_cast<QueuedTask*>(context); | 389 auto* task = static_cast<QueuedTask*>(context); |
310 if (task->Run()) | 390 if (task->Run()) |
311 delete task; | 391 delete task; |
312 } | 392 } |
313 | 393 |
314 // static | 394 // static |
315 void TaskQueue::RunTimer(int fd, short flags, void* context) { // NOLINT | 395 void TaskQueue::RunTimer(int fd, short flags, void* context) { // NOLINT |
316 TimerEvent* timer = static_cast<TimerEvent*>(context); | 396 TimerEvent* timer = static_cast<TimerEvent*>(context); |
317 if (!timer->task->Run()) | 397 if (!timer->task->Run()) |
318 timer->task.release(); | 398 timer->task.release(); |
319 QueueContext* ctx = | 399 QueueContext* ctx = |
320 static_cast<QueueContext*>(pthread_getspecific(GetQueuePtrTls())); | 400 static_cast<QueueContext*>(pthread_getspecific(GetQueuePtrTls())); |
321 ctx->pending_timers_.remove(timer); | 401 ctx->pending_timers_.remove(timer); |
322 delete timer; | 402 delete timer; |
323 } | 403 } |
324 | 404 |
325 void TaskQueue::PrepareReplyTask(PostAndReplyTask* reply_task) { | 405 void TaskQueue::PrepareReplyTask(scoped_refptr<ReplyTaskOwnerRef> reply_task) { |
326 RTC_DCHECK(reply_task); | 406 RTC_DCHECK(reply_task); |
327 CritScope lock(&pending_lock_); | 407 CritScope lock(&pending_lock_); |
328 pending_replies_.push_back(reply_task); | 408 pending_replies_.push_back(std::move(reply_task)); |
329 } | |
330 | |
331 void TaskQueue::ReplyTaskDone(PostAndReplyTask* reply_task) { | |
332 CritScope lock(&pending_lock_); | |
333 pending_replies_.remove(reply_task); | |
334 } | 409 } |
335 | 410 |
336 } // namespace rtc | 411 } // namespace rtc |
OLD | NEW |