OLD | NEW |
| (Empty) |
1 /* | |
2 * Copyright 2016 The WebRTC Project Authors. All rights reserved. | |
3 * | |
4 * Use of this source code is governed by a BSD-style license | |
5 * that can be found in the LICENSE file in the root of the source | |
6 * tree. An additional intellectual property rights grant can be found | |
7 * in the file PATENTS. All contributing project authors may | |
8 * be found in the AUTHORS file in the root of the source tree. | |
9 */ | |
10 | |
11 #include "webrtc/base/task_queue.h" | |
12 | |
13 #include <fcntl.h> | |
14 #include <string.h> | |
15 #include <unistd.h> | |
16 | |
17 #include "base/third_party/libevent/event.h" | |
18 #include "webrtc/base/checks.h" | |
19 #include "webrtc/base/logging.h" | |
20 #include "webrtc/base/task_queue_posix.h" | |
21 #include "webrtc/base/timeutils.h" | |
22 | |
23 namespace rtc { | |
24 using internal::GetQueuePtrTls; | |
25 using internal::AutoSetCurrentQueuePtr; | |
26 | |
27 namespace { | |
28 static const char kQuit = 1; | |
29 static const char kRunTask = 2; | |
30 | |
31 struct TimerEvent { | |
32 explicit TimerEvent(std::unique_ptr<QueuedTask> task) | |
33 : task(std::move(task)) {} | |
34 ~TimerEvent() { event_del(&ev); } | |
35 event ev; | |
36 std::unique_ptr<QueuedTask> task; | |
37 }; | |
38 | |
39 bool SetNonBlocking(int fd) { | |
40 const int flags = fcntl(fd, F_GETFL); | |
41 RTC_CHECK(flags != -1); | |
42 return (flags & O_NONBLOCK) || fcntl(fd, F_SETFL, flags | O_NONBLOCK) != -1; | |
43 } | |
44 } // namespace | |
45 | |
46 struct TaskQueue::QueueContext { | |
47 explicit QueueContext(TaskQueue* q) : queue(q), is_active(true) {} | |
48 TaskQueue* queue; | |
49 bool is_active; | |
50 // Holds a list of events pending timers for cleanup when the loop exits. | |
51 std::list<TimerEvent*> pending_timers_; | |
52 }; | |
53 | |
54 class TaskQueue::PostAndReplyTask : public QueuedTask { | |
55 public: | |
56 PostAndReplyTask(std::unique_ptr<QueuedTask> task, | |
57 std::unique_ptr<QueuedTask> reply, | |
58 TaskQueue* reply_queue) | |
59 : task_(std::move(task)), | |
60 reply_(std::move(reply)), | |
61 reply_queue_(reply_queue) { | |
62 reply_queue->PrepareReplyTask(this); | |
63 } | |
64 | |
65 ~PostAndReplyTask() override { | |
66 CritScope lock(&lock_); | |
67 if (reply_queue_) | |
68 reply_queue_->ReplyTaskDone(this); | |
69 } | |
70 | |
71 void OnReplyQueueGone() { | |
72 CritScope lock(&lock_); | |
73 reply_queue_ = nullptr; | |
74 } | |
75 | |
76 private: | |
77 bool Run() override { | |
78 if (!task_->Run()) | |
79 task_.release(); | |
80 | |
81 CritScope lock(&lock_); | |
82 if (reply_queue_) | |
83 reply_queue_->PostTask(std::move(reply_)); | |
84 return true; | |
85 } | |
86 | |
87 CriticalSection lock_; | |
88 std::unique_ptr<QueuedTask> task_; | |
89 std::unique_ptr<QueuedTask> reply_; | |
90 TaskQueue* reply_queue_ GUARDED_BY(lock_); | |
91 }; | |
92 | |
93 class TaskQueue::SetTimerTask : public QueuedTask { | |
94 public: | |
95 SetTimerTask(std::unique_ptr<QueuedTask> task, uint32_t milliseconds) | |
96 : task_(std::move(task)), | |
97 milliseconds_(milliseconds), | |
98 posted_(Time32()) {} | |
99 | |
100 private: | |
101 bool Run() override { | |
102 // Compensate for the time that has passed since construction | |
103 // and until we got here. | |
104 uint32_t post_time = Time32() - posted_; | |
105 TaskQueue::Current()->PostDelayedTask( | |
106 std::move(task_), | |
107 post_time > milliseconds_ ? 0 : milliseconds_ - post_time); | |
108 return true; | |
109 } | |
110 | |
111 std::unique_ptr<QueuedTask> task_; | |
112 const uint32_t milliseconds_; | |
113 const uint32_t posted_; | |
114 }; | |
115 | |
116 TaskQueue::TaskQueue(const char* queue_name) | |
117 : event_base_(event_base_new()), | |
118 wakeup_event_(new event()), | |
119 thread_(&TaskQueue::ThreadMain, this, queue_name) { | |
120 RTC_DCHECK(queue_name); | |
121 int fds[2]; | |
122 RTC_CHECK(pipe(fds) == 0); | |
123 SetNonBlocking(fds[0]); | |
124 SetNonBlocking(fds[1]); | |
125 wakeup_pipe_out_ = fds[0]; | |
126 wakeup_pipe_in_ = fds[1]; | |
127 event_set(wakeup_event_.get(), wakeup_pipe_out_, EV_READ | EV_PERSIST, | |
128 OnWakeup, this); | |
129 event_base_set(event_base_, wakeup_event_.get()); | |
130 event_add(wakeup_event_.get(), 0); | |
131 thread_.Start(); | |
132 } | |
133 | |
134 TaskQueue::~TaskQueue() { | |
135 RTC_DCHECK(!IsCurrent()); | |
136 struct timespec ts; | |
137 char message = kQuit; | |
138 while (write(wakeup_pipe_in_, &message, sizeof(message)) != sizeof(message)) { | |
139 // The queue is full, so we have no choice but to wait and retry. | |
140 RTC_CHECK_EQ(EAGAIN, errno); | |
141 ts.tv_sec = 0; | |
142 ts.tv_nsec = 1000000; | |
143 nanosleep(&ts, nullptr); | |
144 } | |
145 | |
146 thread_.Stop(); | |
147 | |
148 event_del(wakeup_event_.get()); | |
149 close(wakeup_pipe_in_); | |
150 close(wakeup_pipe_out_); | |
151 wakeup_pipe_in_ = -1; | |
152 wakeup_pipe_out_ = -1; | |
153 | |
154 { | |
155 // Synchronize against any pending reply tasks that might be running on | |
156 // other queues. | |
157 CritScope lock(&pending_lock_); | |
158 for (auto* reply : pending_replies_) | |
159 reply->OnReplyQueueGone(); | |
160 pending_replies_.clear(); | |
161 } | |
162 | |
163 event_base_free(event_base_); | |
164 } | |
165 | |
166 // static | |
167 TaskQueue* TaskQueue::Current() { | |
168 QueueContext* ctx = | |
169 static_cast<QueueContext*>(pthread_getspecific(GetQueuePtrTls())); | |
170 return ctx ? ctx->queue : nullptr; | |
171 } | |
172 | |
173 // static | |
174 bool TaskQueue::IsCurrent(const char* queue_name) { | |
175 TaskQueue* current = Current(); | |
176 return current && current->thread_.name().compare(queue_name) == 0; | |
177 } | |
178 | |
179 bool TaskQueue::IsCurrent() const { | |
180 return IsThreadRefEqual(thread_.GetThreadRef(), CurrentThreadRef()); | |
181 } | |
182 | |
183 void TaskQueue::PostTask(std::unique_ptr<QueuedTask> task) { | |
184 RTC_DCHECK(task.get()); | |
185 // libevent isn't thread safe. This means that we can't use methods such | |
186 // as event_base_once to post tasks to the worker thread from a different | |
187 // thread. However, we can use it when posting from the worker thread itself. | |
188 if (IsCurrent()) { | |
189 if (event_base_once(event_base_, -1, EV_TIMEOUT, &TaskQueue::RunTask, | |
190 task.get(), nullptr) == 0) { | |
191 task.release(); | |
192 } | |
193 } else { | |
194 QueuedTask* task_id = task.get(); // Only used for comparison. | |
195 { | |
196 CritScope lock(&pending_lock_); | |
197 pending_.push_back(std::move(task)); | |
198 } | |
199 char message = kRunTask; | |
200 if (write(wakeup_pipe_in_, &message, sizeof(message)) != sizeof(message)) { | |
201 LOG(WARNING) << "Failed to queue task."; | |
202 CritScope lock(&pending_lock_); | |
203 pending_.remove_if([task_id](std::unique_ptr<QueuedTask>& t) { | |
204 return t.get() == task_id; | |
205 }); | |
206 } | |
207 } | |
208 } | |
209 | |
210 void TaskQueue::PostDelayedTask(std::unique_ptr<QueuedTask> task, | |
211 uint32_t milliseconds) { | |
212 if (IsCurrent()) { | |
213 TimerEvent* timer = new TimerEvent(std::move(task)); | |
214 evtimer_set(&timer->ev, &TaskQueue::RunTimer, timer); | |
215 event_base_set(event_base_, &timer->ev); | |
216 QueueContext* ctx = | |
217 static_cast<QueueContext*>(pthread_getspecific(GetQueuePtrTls())); | |
218 ctx->pending_timers_.push_back(timer); | |
219 timeval tv = {milliseconds / 1000, (milliseconds % 1000) * 1000}; | |
220 event_add(&timer->ev, &tv); | |
221 } else { | |
222 PostTask(std::unique_ptr<QueuedTask>( | |
223 new SetTimerTask(std::move(task), milliseconds))); | |
224 } | |
225 } | |
226 | |
227 void TaskQueue::PostTaskAndReply(std::unique_ptr<QueuedTask> task, | |
228 std::unique_ptr<QueuedTask> reply, | |
229 TaskQueue* reply_queue) { | |
230 std::unique_ptr<QueuedTask> wrapper_task( | |
231 new PostAndReplyTask(std::move(task), std::move(reply), reply_queue)); | |
232 PostTask(std::move(wrapper_task)); | |
233 } | |
234 | |
235 void TaskQueue::PostTaskAndReply(std::unique_ptr<QueuedTask> task, | |
236 std::unique_ptr<QueuedTask> reply) { | |
237 return PostTaskAndReply(std::move(task), std::move(reply), Current()); | |
238 } | |
239 | |
240 // static | |
241 bool TaskQueue::ThreadMain(void* context) { | |
242 TaskQueue* me = static_cast<TaskQueue*>(context); | |
243 | |
244 QueueContext queue_context(me); | |
245 pthread_setspecific(GetQueuePtrTls(), &queue_context); | |
246 | |
247 while (queue_context.is_active) | |
248 event_base_loop(me->event_base_, 0); | |
249 | |
250 pthread_setspecific(GetQueuePtrTls(), nullptr); | |
251 | |
252 for (TimerEvent* timer : queue_context.pending_timers_) | |
253 delete timer; | |
254 | |
255 return false; | |
256 } | |
257 | |
258 // static | |
259 void TaskQueue::OnWakeup(int socket, short flags, void* context) { // NOLINT | |
260 QueueContext* ctx = | |
261 static_cast<QueueContext*>(pthread_getspecific(GetQueuePtrTls())); | |
262 RTC_DCHECK(ctx->queue->wakeup_pipe_out_ == socket); | |
263 char buf; | |
264 RTC_CHECK(sizeof(buf) == read(socket, &buf, sizeof(buf))); | |
265 switch (buf) { | |
266 case kQuit: | |
267 ctx->is_active = false; | |
268 event_base_loopbreak(ctx->queue->event_base_); | |
269 break; | |
270 case kRunTask: { | |
271 std::unique_ptr<QueuedTask> task; | |
272 { | |
273 CritScope lock(&ctx->queue->pending_lock_); | |
274 RTC_DCHECK(!ctx->queue->pending_.empty()); | |
275 task = std::move(ctx->queue->pending_.front()); | |
276 ctx->queue->pending_.pop_front(); | |
277 RTC_DCHECK(task.get()); | |
278 } | |
279 if (!task->Run()) | |
280 task.release(); | |
281 break; | |
282 } | |
283 default: | |
284 RTC_NOTREACHED(); | |
285 break; | |
286 } | |
287 } | |
288 | |
289 // static | |
290 void TaskQueue::RunTask(int fd, short flags, void* context) { // NOLINT | |
291 auto* task = static_cast<QueuedTask*>(context); | |
292 if (task->Run()) | |
293 delete task; | |
294 } | |
295 | |
296 // static | |
297 void TaskQueue::RunTimer(int fd, short flags, void* context) { // NOLINT | |
298 TimerEvent* timer = static_cast<TimerEvent*>(context); | |
299 if (!timer->task->Run()) | |
300 timer->task.release(); | |
301 QueueContext* ctx = | |
302 static_cast<QueueContext*>(pthread_getspecific(GetQueuePtrTls())); | |
303 ctx->pending_timers_.remove(timer); | |
304 delete timer; | |
305 } | |
306 | |
307 void TaskQueue::PrepareReplyTask(PostAndReplyTask* reply_task) { | |
308 RTC_DCHECK(reply_task); | |
309 CritScope lock(&pending_lock_); | |
310 pending_replies_.push_back(reply_task); | |
311 } | |
312 | |
313 void TaskQueue::ReplyTaskDone(PostAndReplyTask* reply_task) { | |
314 CritScope lock(&pending_lock_); | |
315 pending_replies_.remove(reply_task); | |
316 } | |
317 | |
318 } // namespace rtc | |
OLD | NEW |