OLD | NEW |
| (Empty) |
1 /* | |
2 * Copyright 2016 The WebRTC Project Authors. All rights reserved. | |
3 * | |
4 * Use of this source code is governed by a BSD-style license | |
5 * that can be found in the LICENSE file in the root of the source | |
6 * tree. An additional intellectual property rights grant can be found | |
7 * in the file PATENTS. All contributing project authors may | |
8 * be found in the AUTHORS file in the root of the source tree. | |
9 */ | |
10 | |
11 #include "webrtc/base/task_queue.h" | |
12 | |
13 #include <fcntl.h> | |
14 #include <signal.h> | |
15 #include <string.h> | |
16 #include <unistd.h> | |
17 | |
18 #include "base/third_party/libevent/event.h" | |
19 #include "webrtc/base/checks.h" | |
20 #include "webrtc/base/logging.h" | |
21 #include "webrtc/base/task_queue_posix.h" | |
22 #include "webrtc/base/timeutils.h" | |
23 | |
24 namespace rtc { | |
25 using internal::GetQueuePtrTls; | |
26 using internal::AutoSetCurrentQueuePtr; | |
27 | |
28 namespace { | |
29 static const char kQuit = 1; | |
30 static const char kRunTask = 2; | |
31 static const char kRunReplyTask = 3; | |
32 | |
33 using Priority = TaskQueue::Priority; | |
34 | |
35 // This ignores the SIGPIPE signal on the calling thread. | |
36 // This signal can be fired when trying to write() to a pipe that's being | |
37 // closed or while closing a pipe that's being written to. | |
38 // We can run into that situation (e.g. reply tasks that don't get a chance to | |
39 // run because the task queue is being deleted) so we ignore this signal and | |
40 // continue as normal. | |
41 // As a side note for this implementation, it would be great if we could safely | |
42 // restore the sigmask, but unfortunately the operation of restoring it, can | |
43 // itself actually cause SIGPIPE to be signaled :-| (e.g. on MacOS) | |
44 // The SIGPIPE signal by default causes the process to be terminated, so we | |
45 // don't want to risk that. | |
46 // An alternative to this approach is to ignore the signal for the whole | |
47 // process: | |
48 // signal(SIGPIPE, SIG_IGN); | |
49 void IgnoreSigPipeSignalOnCurrentThread() { | |
50 sigset_t sigpipe_mask; | |
51 sigemptyset(&sigpipe_mask); | |
52 sigaddset(&sigpipe_mask, SIGPIPE); | |
53 pthread_sigmask(SIG_BLOCK, &sigpipe_mask, nullptr); | |
54 } | |
55 | |
56 struct TimerEvent { | |
57 explicit TimerEvent(std::unique_ptr<QueuedTask> task) | |
58 : task(std::move(task)) {} | |
59 ~TimerEvent() { event_del(&ev); } | |
60 event ev; | |
61 std::unique_ptr<QueuedTask> task; | |
62 }; | |
63 | |
64 bool SetNonBlocking(int fd) { | |
65 const int flags = fcntl(fd, F_GETFL); | |
66 RTC_CHECK(flags != -1); | |
67 return (flags & O_NONBLOCK) || fcntl(fd, F_SETFL, flags | O_NONBLOCK) != -1; | |
68 } | |
69 | |
70 // TODO(tommi): This is a hack to support two versions of libevent that we're | |
71 // compatible with. The method we really want to call is event_assign(), | |
72 // since event_set() has been marked as deprecated (and doesn't accept | |
73 // passing event_base__ as a parameter). However, the version of libevent | |
74 // that we have in Chromium, doesn't have event_assign(), so we need to call | |
75 // event_set() there. | |
76 void EventAssign(struct event* ev, | |
77 struct event_base* base, | |
78 int fd, | |
79 short events, | |
80 void (*callback)(int, short, void*), | |
81 void* arg) { | |
82 #if defined(_EVENT2_EVENT_H_) | |
83 RTC_CHECK_EQ(0, event_assign(ev, base, fd, events, callback, arg)); | |
84 #else | |
85 event_set(ev, fd, events, callback, arg); | |
86 RTC_CHECK_EQ(0, event_base_set(base, ev)); | |
87 #endif | |
88 } | |
89 | |
90 ThreadPriority TaskQueuePriorityToThreadPriority(Priority priority) { | |
91 switch (priority) { | |
92 case Priority::HIGH: | |
93 return kRealtimePriority; | |
94 case Priority::LOW: | |
95 return kLowPriority; | |
96 case Priority::NORMAL: | |
97 return kNormalPriority; | |
98 default: | |
99 RTC_NOTREACHED(); | |
100 break; | |
101 } | |
102 return kNormalPriority; | |
103 } | |
104 } // namespace | |
105 | |
106 struct TaskQueue::QueueContext { | |
107 explicit QueueContext(TaskQueue* q) : queue(q), is_active(true) {} | |
108 TaskQueue* queue; | |
109 bool is_active; | |
110 // Holds a list of events pending timers for cleanup when the loop exits. | |
111 std::list<TimerEvent*> pending_timers_; | |
112 }; | |
113 | |
114 // Posting a reply task is tricky business. This class owns the reply task | |
115 // and a reference to it is held by both the reply queue and the first task. | |
116 // Here's an outline of what happens when dealing with a reply task. | |
117 // * The ReplyTaskOwner owns the |reply_| task. | |
118 // * One ref owned by PostAndReplyTask | |
119 // * One ref owned by the reply TaskQueue | |
120 // * ReplyTaskOwner has a flag |run_task_| initially set to false. | |
121 // * ReplyTaskOwner has a method: HasOneRef() (provided by RefCountedObject). | |
122 // * After successfully running the original |task_|, PostAndReplyTask() calls | |
123 // set_should_run_task(). This sets |run_task_| to true. | |
124 // * In PostAndReplyTask's dtor: | |
125 // * It releases its reference to ReplyTaskOwner (important to do this first). | |
126 // * Sends (write()) a kRunReplyTask message to the reply queue's pipe. | |
127 // * PostAndReplyTask doesn't care if write() fails, but when it does: | |
128 // * The reply queue is gone. | |
129 // * ReplyTaskOwner has already been deleted and the reply task too. | |
130 // * If write() succeeds: | |
131 // * ReplyQueue receives the kRunReplyTask message | |
132 // * Goes through all pending tasks, finding the first that HasOneRef() | |
133 // * Calls ReplyTaskOwner::Run() | |
134 // * if set_should_run_task() was called, the reply task will be run | |
135 // * Release the reference to ReplyTaskOwner | |
136 // * ReplyTaskOwner and associated |reply_| are deleted. | |
137 class TaskQueue::ReplyTaskOwner { | |
138 public: | |
139 ReplyTaskOwner(std::unique_ptr<QueuedTask> reply) | |
140 : reply_(std::move(reply)) {} | |
141 | |
142 void Run() { | |
143 RTC_DCHECK(reply_); | |
144 if (run_task_) { | |
145 if (!reply_->Run()) | |
146 reply_.release(); | |
147 } | |
148 reply_.reset(); | |
149 } | |
150 | |
151 void set_should_run_task() { | |
152 RTC_DCHECK(!run_task_); | |
153 run_task_ = true; | |
154 } | |
155 | |
156 private: | |
157 std::unique_ptr<QueuedTask> reply_; | |
158 bool run_task_ = false; | |
159 }; | |
160 | |
161 class TaskQueue::PostAndReplyTask : public QueuedTask { | |
162 public: | |
163 PostAndReplyTask(std::unique_ptr<QueuedTask> task, | |
164 std::unique_ptr<QueuedTask> reply, | |
165 TaskQueue* reply_queue, | |
166 int reply_pipe) | |
167 : task_(std::move(task)), | |
168 reply_pipe_(reply_pipe), | |
169 reply_task_owner_( | |
170 new RefCountedObject<ReplyTaskOwner>(std::move(reply))) { | |
171 reply_queue->PrepareReplyTask(reply_task_owner_); | |
172 } | |
173 | |
174 ~PostAndReplyTask() override { | |
175 reply_task_owner_ = nullptr; | |
176 IgnoreSigPipeSignalOnCurrentThread(); | |
177 // Send a signal to the reply queue that the reply task can run now. | |
178 // Depending on whether |set_should_run_task()| was called by the | |
179 // PostAndReplyTask(), the reply task may or may not actually run. | |
180 // In either case, it will be deleted. | |
181 char message = kRunReplyTask; | |
182 write(reply_pipe_, &message, sizeof(message)); | |
183 } | |
184 | |
185 private: | |
186 bool Run() override { | |
187 if (!task_->Run()) | |
188 task_.release(); | |
189 reply_task_owner_->set_should_run_task(); | |
190 return true; | |
191 } | |
192 | |
193 std::unique_ptr<QueuedTask> task_; | |
194 int reply_pipe_; | |
195 scoped_refptr<RefCountedObject<ReplyTaskOwner>> reply_task_owner_; | |
196 }; | |
197 | |
198 class TaskQueue::SetTimerTask : public QueuedTask { | |
199 public: | |
200 SetTimerTask(std::unique_ptr<QueuedTask> task, uint32_t milliseconds) | |
201 : task_(std::move(task)), | |
202 milliseconds_(milliseconds), | |
203 posted_(Time32()) {} | |
204 | |
205 private: | |
206 bool Run() override { | |
207 // Compensate for the time that has passed since construction | |
208 // and until we got here. | |
209 uint32_t post_time = Time32() - posted_; | |
210 TaskQueue::Current()->PostDelayedTask( | |
211 std::move(task_), | |
212 post_time > milliseconds_ ? 0 : milliseconds_ - post_time); | |
213 return true; | |
214 } | |
215 | |
216 std::unique_ptr<QueuedTask> task_; | |
217 const uint32_t milliseconds_; | |
218 const uint32_t posted_; | |
219 }; | |
220 | |
221 TaskQueue::TaskQueue(const char* queue_name, Priority priority /*= NORMAL*/) | |
222 : event_base_(event_base_new()), | |
223 wakeup_event_(new event()), | |
224 thread_(&TaskQueue::ThreadMain, | |
225 this, | |
226 queue_name, | |
227 TaskQueuePriorityToThreadPriority(priority)) { | |
228 RTC_DCHECK(queue_name); | |
229 int fds[2]; | |
230 RTC_CHECK(pipe(fds) == 0); | |
231 SetNonBlocking(fds[0]); | |
232 SetNonBlocking(fds[1]); | |
233 wakeup_pipe_out_ = fds[0]; | |
234 wakeup_pipe_in_ = fds[1]; | |
235 | |
236 EventAssign(wakeup_event_.get(), event_base_, wakeup_pipe_out_, | |
237 EV_READ | EV_PERSIST, OnWakeup, this); | |
238 event_add(wakeup_event_.get(), 0); | |
239 thread_.Start(); | |
240 } | |
241 | |
242 TaskQueue::~TaskQueue() { | |
243 RTC_DCHECK(!IsCurrent()); | |
244 struct timespec ts; | |
245 char message = kQuit; | |
246 while (write(wakeup_pipe_in_, &message, sizeof(message)) != sizeof(message)) { | |
247 // The queue is full, so we have no choice but to wait and retry. | |
248 RTC_CHECK_EQ(EAGAIN, errno); | |
249 ts.tv_sec = 0; | |
250 ts.tv_nsec = 1000000; | |
251 nanosleep(&ts, nullptr); | |
252 } | |
253 | |
254 thread_.Stop(); | |
255 | |
256 event_del(wakeup_event_.get()); | |
257 | |
258 IgnoreSigPipeSignalOnCurrentThread(); | |
259 | |
260 close(wakeup_pipe_in_); | |
261 close(wakeup_pipe_out_); | |
262 wakeup_pipe_in_ = -1; | |
263 wakeup_pipe_out_ = -1; | |
264 | |
265 event_base_free(event_base_); | |
266 } | |
267 | |
268 // static | |
269 TaskQueue* TaskQueue::Current() { | |
270 QueueContext* ctx = | |
271 static_cast<QueueContext*>(pthread_getspecific(GetQueuePtrTls())); | |
272 return ctx ? ctx->queue : nullptr; | |
273 } | |
274 | |
275 // static | |
276 bool TaskQueue::IsCurrent(const char* queue_name) { | |
277 TaskQueue* current = Current(); | |
278 return current && current->thread_.name().compare(queue_name) == 0; | |
279 } | |
280 | |
281 bool TaskQueue::IsCurrent() const { | |
282 return IsThreadRefEqual(thread_.GetThreadRef(), CurrentThreadRef()); | |
283 } | |
284 | |
285 void TaskQueue::PostTask(std::unique_ptr<QueuedTask> task) { | |
286 RTC_DCHECK(task.get()); | |
287 // libevent isn't thread safe. This means that we can't use methods such | |
288 // as event_base_once to post tasks to the worker thread from a different | |
289 // thread. However, we can use it when posting from the worker thread itself. | |
290 if (IsCurrent()) { | |
291 if (event_base_once(event_base_, -1, EV_TIMEOUT, &TaskQueue::RunTask, | |
292 task.get(), nullptr) == 0) { | |
293 task.release(); | |
294 } | |
295 } else { | |
296 QueuedTask* task_id = task.get(); // Only used for comparison. | |
297 { | |
298 CritScope lock(&pending_lock_); | |
299 pending_.push_back(std::move(task)); | |
300 } | |
301 char message = kRunTask; | |
302 if (write(wakeup_pipe_in_, &message, sizeof(message)) != sizeof(message)) { | |
303 LOG(WARNING) << "Failed to queue task."; | |
304 CritScope lock(&pending_lock_); | |
305 pending_.remove_if([task_id](std::unique_ptr<QueuedTask>& t) { | |
306 return t.get() == task_id; | |
307 }); | |
308 } | |
309 } | |
310 } | |
311 | |
312 void TaskQueue::PostDelayedTask(std::unique_ptr<QueuedTask> task, | |
313 uint32_t milliseconds) { | |
314 if (IsCurrent()) { | |
315 TimerEvent* timer = new TimerEvent(std::move(task)); | |
316 EventAssign(&timer->ev, event_base_, -1, 0, &TaskQueue::RunTimer, timer); | |
317 QueueContext* ctx = | |
318 static_cast<QueueContext*>(pthread_getspecific(GetQueuePtrTls())); | |
319 ctx->pending_timers_.push_back(timer); | |
320 timeval tv = {milliseconds / 1000, (milliseconds % 1000) * 1000}; | |
321 event_add(&timer->ev, &tv); | |
322 } else { | |
323 PostTask(std::unique_ptr<QueuedTask>( | |
324 new SetTimerTask(std::move(task), milliseconds))); | |
325 } | |
326 } | |
327 | |
328 void TaskQueue::PostTaskAndReply(std::unique_ptr<QueuedTask> task, | |
329 std::unique_ptr<QueuedTask> reply, | |
330 TaskQueue* reply_queue) { | |
331 std::unique_ptr<QueuedTask> wrapper_task( | |
332 new PostAndReplyTask(std::move(task), std::move(reply), reply_queue, | |
333 reply_queue->wakeup_pipe_in_)); | |
334 PostTask(std::move(wrapper_task)); | |
335 } | |
336 | |
337 void TaskQueue::PostTaskAndReply(std::unique_ptr<QueuedTask> task, | |
338 std::unique_ptr<QueuedTask> reply) { | |
339 return PostTaskAndReply(std::move(task), std::move(reply), Current()); | |
340 } | |
341 | |
342 // static | |
343 void TaskQueue::ThreadMain(void* context) { | |
344 TaskQueue* me = static_cast<TaskQueue*>(context); | |
345 | |
346 QueueContext queue_context(me); | |
347 pthread_setspecific(GetQueuePtrTls(), &queue_context); | |
348 | |
349 while (queue_context.is_active) | |
350 event_base_loop(me->event_base_, 0); | |
351 | |
352 pthread_setspecific(GetQueuePtrTls(), nullptr); | |
353 | |
354 for (TimerEvent* timer : queue_context.pending_timers_) | |
355 delete timer; | |
356 } | |
357 | |
358 // static | |
359 void TaskQueue::OnWakeup(int socket, short flags, void* context) { // NOLINT | |
360 QueueContext* ctx = | |
361 static_cast<QueueContext*>(pthread_getspecific(GetQueuePtrTls())); | |
362 RTC_DCHECK(ctx->queue->wakeup_pipe_out_ == socket); | |
363 char buf; | |
364 RTC_CHECK(sizeof(buf) == read(socket, &buf, sizeof(buf))); | |
365 switch (buf) { | |
366 case kQuit: | |
367 ctx->is_active = false; | |
368 event_base_loopbreak(ctx->queue->event_base_); | |
369 break; | |
370 case kRunTask: { | |
371 std::unique_ptr<QueuedTask> task; | |
372 { | |
373 CritScope lock(&ctx->queue->pending_lock_); | |
374 RTC_DCHECK(!ctx->queue->pending_.empty()); | |
375 task = std::move(ctx->queue->pending_.front()); | |
376 ctx->queue->pending_.pop_front(); | |
377 RTC_DCHECK(task.get()); | |
378 } | |
379 if (!task->Run()) | |
380 task.release(); | |
381 break; | |
382 } | |
383 case kRunReplyTask: { | |
384 scoped_refptr<ReplyTaskOwnerRef> reply_task; | |
385 { | |
386 CritScope lock(&ctx->queue->pending_lock_); | |
387 for (auto it = ctx->queue->pending_replies_.begin(); | |
388 it != ctx->queue->pending_replies_.end(); ++it) { | |
389 if ((*it)->HasOneRef()) { | |
390 reply_task = std::move(*it); | |
391 ctx->queue->pending_replies_.erase(it); | |
392 break; | |
393 } | |
394 } | |
395 } | |
396 reply_task->Run(); | |
397 break; | |
398 } | |
399 default: | |
400 RTC_NOTREACHED(); | |
401 break; | |
402 } | |
403 } | |
404 | |
405 // static | |
406 void TaskQueue::RunTask(int fd, short flags, void* context) { // NOLINT | |
407 auto* task = static_cast<QueuedTask*>(context); | |
408 if (task->Run()) | |
409 delete task; | |
410 } | |
411 | |
412 // static | |
413 void TaskQueue::RunTimer(int fd, short flags, void* context) { // NOLINT | |
414 TimerEvent* timer = static_cast<TimerEvent*>(context); | |
415 if (!timer->task->Run()) | |
416 timer->task.release(); | |
417 QueueContext* ctx = | |
418 static_cast<QueueContext*>(pthread_getspecific(GetQueuePtrTls())); | |
419 ctx->pending_timers_.remove(timer); | |
420 delete timer; | |
421 } | |
422 | |
423 void TaskQueue::PrepareReplyTask(scoped_refptr<ReplyTaskOwnerRef> reply_task) { | |
424 RTC_DCHECK(reply_task); | |
425 CritScope lock(&pending_lock_); | |
426 pending_replies_.push_back(std::move(reply_task)); | |
427 } | |
428 | |
429 } // namespace rtc | |
OLD | NEW |