OLD | NEW |
---|---|
(Empty) | |
1 /* | |
2 * Copyright 2016 The WebRTC Project Authors. All rights reserved. | |
3 * | |
4 * Use of this source code is governed by a BSD-style license | |
5 * that can be found in the LICENSE file in the root of the source | |
6 * tree. An additional intellectual property rights grant can be found | |
7 * in the file PATENTS. All contributing project authors may | |
8 * be found in the AUTHORS file in the root of the source tree. | |
9 */ | |
10 | |
11 #include "webrtc/base/task_queue.h" | |
12 | |
13 #include <fcntl.h> | |
14 #include <string.h> | |
15 #include <unistd.h> | |
16 | |
17 #include "base/third_party/libevent/event.h" | |
18 #include "webrtc/base/checks.h" | |
19 #include "webrtc/base/logging.h" | |
20 #include "webrtc/base/task_queue_posix.h" | |
21 #include "webrtc/base/timeutils.h" | |
22 | |
23 namespace rtc { | |
24 using internal::GetQueuePtrTls; | |
25 using internal::AutoSetCurrentQueuePtr; | |
26 | |
27 namespace { | |
28 static const char kQuit = 1; | |
29 static const char kRunTask = 2; | |
30 | |
31 struct TimerEvent { | |
32 explicit TimerEvent(std::unique_ptr<QueuedTask> task) | |
33 : task(std::move(task)) {} | |
34 ~TimerEvent() { event_del(&ev); } | |
35 event ev; | |
36 std::unique_ptr<QueuedTask> task; | |
37 }; | |
38 | |
39 bool SetNonBlocking(int fd) { | |
40 const int flags = fcntl(fd, F_GETFL); | |
41 RTC_CHECK(flags != -1); | |
42 return (flags & O_NONBLOCK) || fcntl(fd, F_SETFL, flags | O_NONBLOCK) != -1; | |
43 } | |
44 } // namespace | |
45 | |
46 struct TaskQueue::QueueContext { | |
47 explicit QueueContext(TaskQueue* q) : queue(q), is_active(true) {} | |
48 TaskQueue* queue; | |
49 bool is_active; | |
50 // Holds a list of events pending timers for cleanup when the loop exits. | |
51 std::list<TimerEvent*> pending_timers_; | |
52 }; | |
53 | |
54 class TaskQueue::PostAndReplyTask : public QueuedTask { | |
55 public: | |
56 PostAndReplyTask(std::unique_ptr<QueuedTask> task, | |
57 std::unique_ptr<QueuedTask> reply, | |
58 TaskQueue* reply_queue) | |
59 : task_(std::move(task)), | |
60 reply_(std::move(reply)), | |
61 reply_queue_(reply_queue) { | |
62 reply_queue->PrepareReplyTask(this); | |
63 } | |
64 | |
65 ~PostAndReplyTask() override { | |
66 CritScope lock(&lock_); | |
67 if (reply_queue_) | |
68 reply_queue_->ReplyTaskDone(this); | |
69 } | |
70 | |
71 void OnReplyQueueGone() { | |
72 CritScope lock(&lock_); | |
73 reply_queue_ = nullptr; | |
74 } | |
75 | |
76 private: | |
77 bool Run() override { | |
78 if (!task_->Run()) | |
79 task_.release(); | |
80 | |
81 CritScope lock(&lock_); | |
82 if (reply_queue_) | |
83 reply_queue_->PostTask(std::move(reply_)); | |
84 return true; | |
85 } | |
86 | |
87 CriticalSection lock_; | |
88 std::unique_ptr<QueuedTask> task_, reply_; | |
perkj_webrtc
2016/04/26 14:30:38
reply_ on separate line please
tommi
2016/04/28 12:04:02
Done.
| |
89 TaskQueue* reply_queue_ GUARDED_BY(lock_); | |
90 }; | |
91 | |
92 class TaskQueue::SetTimerTask : public QueuedTask { | |
93 public: | |
94 SetTimerTask(std::unique_ptr<QueuedTask> task, uint32_t milliseconds) | |
95 : task_(std::move(task)), | |
96 milliseconds_(milliseconds), | |
97 posted_(Time32()) {} | |
98 | |
99 private: | |
100 bool Run() override { | |
101 // Compensate for the time that has passed since construction | |
102 // and until we got here. | |
103 uint32_t post_time = Time32() - posted_; | |
perkj_webrtc
2016/04/26 14:30:38
Please check with nisse@ that we use the same cloc
tommi
2016/04/28 12:04:02
Yes, I'm following those changes. I might switch
| |
104 TaskQueue::Current()->PostDelayedTask( | |
105 std::move(task_), | |
106 post_time > milliseconds_ ? 0 : milliseconds_ - post_time); | |
107 return true; | |
108 } | |
109 | |
110 std::unique_ptr<QueuedTask> task_; | |
111 const uint32_t milliseconds_; | |
112 const uint32_t posted_; | |
113 }; | |
114 | |
115 TaskQueue::TaskQueue(const char* queue_name) | |
116 : event_base_(event_base_new()), | |
117 wakeup_event_(new event()), | |
118 thread_(&TaskQueue::ThreadMain, this, queue_name) { | |
119 RTC_DCHECK(queue_name); | |
120 int fds[2]; | |
121 RTC_CHECK(pipe(fds) == 0); | |
122 SetNonBlocking(fds[0]); | |
123 SetNonBlocking(fds[1]); | |
124 wakeup_pipe_out_ = fds[0]; | |
125 wakeup_pipe_in_ = fds[1]; | |
126 event_set(wakeup_event_.get(), wakeup_pipe_out_, EV_READ | EV_PERSIST, | |
127 OnWakeup, this); | |
128 event_base_set(event_base_, wakeup_event_.get()); | |
129 event_add(wakeup_event_.get(), 0); | |
130 thread_.Start(); | |
131 } | |
132 | |
133 TaskQueue::~TaskQueue() { | |
134 RTC_DCHECK(!IsCurrent()); | |
135 struct timespec ts; | |
136 char message = kQuit; | |
137 while (write(wakeup_pipe_in_, &message, sizeof(message)) != sizeof(message)) { | |
138 // The queue is full, so we have no choice but to wait and retry. | |
139 RTC_CHECK_EQ(EAGAIN, errno); | |
140 ts.tv_sec = 0; | |
141 ts.tv_nsec = 1000000; | |
142 nanosleep(&ts, nullptr); | |
143 } | |
144 | |
145 thread_.Stop(); | |
146 | |
147 event_del(wakeup_event_.get()); | |
148 close(wakeup_pipe_in_); | |
149 close(wakeup_pipe_out_); | |
150 wakeup_pipe_in_ = -1; | |
151 wakeup_pipe_out_ = -1; | |
152 | |
153 for (auto* reply : pending_replies_) | |
154 reply->OnReplyQueueGone(); | |
155 pending_replies_.clear(); | |
perkj_webrtc
2016/04/26 14:30:38
Who owns the PostAndReplyTasks?
tommi
2016/04/28 12:04:02
They are stored in |pending_| or, if they can't be
| |
156 | |
157 event_base_free(event_base_); | |
158 } | |
159 | |
160 // static | |
161 TaskQueue* TaskQueue::Current() { | |
162 QueueContext* ctx = | |
163 static_cast<QueueContext*>(pthread_getspecific(GetQueuePtrTls())); | |
164 return ctx ? ctx->queue : nullptr; | |
165 } | |
166 | |
167 // static | |
168 bool TaskQueue::IsCurrent(const char* queue_name) { | |
169 TaskQueue* current = Current(); | |
170 return current && current->thread_.name().compare(queue_name) == 0; | |
171 } | |
172 | |
173 bool TaskQueue::IsCurrent() const { | |
174 return IsThreadRefEqual(thread_.GetThreadRef(), CurrentThreadRef()); | |
175 } | |
176 | |
177 void TaskQueue::PostTask(std::unique_ptr<QueuedTask> task) { | |
178 RTC_DCHECK(task.get()); | |
179 // libevent isn't thread safe. This means that we can't use methods such | |
perkj_webrtc
2016/04/26 14:30:38
? Didn't you say you fixed that in libevent?
tommi
2016/04/28 12:04:02
I didn't make libevent thread safe but I did remov
| |
180 // as event_base_once to post tasks to the worker thread from a different | |
181 // thread. However, we can use it when posting from the worker thread itself. | |
182 if (IsCurrent()) { | |
183 if (event_base_once(event_base_, -1, EV_TIMEOUT, &TaskQueue::RunTask, | |
184 task.get(), nullptr) == 0) { | |
185 task.release(); | |
186 } | |
187 } else { | |
188 QueuedTask* task_id = task.get(); // Only used for comparison. | |
189 { | |
190 CritScope lock(&pending_lock_); | |
191 pending_.push_back(std::move(task)); | |
192 } | |
193 char message = kRunTask; | |
194 if (write(wakeup_pipe_in_, &message, sizeof(message)) != sizeof(message)) { | |
195 LOG(WARNING) << "Failed to queue task."; | |
196 CritScope lock(&pending_lock_); | |
197 pending_.remove_if([task_id](std::unique_ptr<QueuedTask>& t) { | |
198 return t.get() == task_id; | |
199 }); | |
200 } | |
201 } | |
202 } | |
203 | |
204 void TaskQueue::PostDelayedTask(std::unique_ptr<QueuedTask> task, | |
205 uint32_t milliseconds) { | |
206 if (IsCurrent()) { | |
207 TimerEvent* timer = new TimerEvent(std::move(task)); | |
208 evtimer_set(&timer->ev, &TaskQueue::RunTimer, timer); | |
209 event_base_set(event_base_, &timer->ev); | |
210 QueueContext* ctx = | |
211 static_cast<QueueContext*>(pthread_getspecific(GetQueuePtrTls())); | |
212 ctx->pending_timers_.push_back(timer); | |
213 timeval tv = {milliseconds / 1000, (milliseconds % 1000) * 1000}; | |
214 event_add(&timer->ev, &tv); | |
215 } else { | |
216 PostTask(std::unique_ptr<QueuedTask>( | |
217 new SetTimerTask(std::move(task), milliseconds))); | |
218 } | |
219 } | |
220 | |
221 void TaskQueue::PostTaskAndReply(std::unique_ptr<QueuedTask> task, | |
222 std::unique_ptr<QueuedTask> reply, | |
223 TaskQueue* reply_queue) { | |
224 std::unique_ptr<QueuedTask> wrapper_task( | |
225 new PostAndReplyTask(std::move(task), std::move(reply), reply_queue)); | |
226 PostTask(std::move(wrapper_task)); | |
tommi
2016/04/28 12:04:02
Here is where ownership of PostAndReplyTasks is ha
| |
227 } | |
228 | |
229 void TaskQueue::PostTaskAndReply(std::unique_ptr<QueuedTask> task, | |
230 std::unique_ptr<QueuedTask> reply) { | |
231 return PostTaskAndReply(std::move(task), std::move(reply), Current()); | |
232 } | |
233 | |
234 // static | |
235 bool TaskQueue::ThreadMain(void* context) { | |
236 TaskQueue* me = static_cast<TaskQueue*>(context); | |
237 | |
238 QueueContext queue_context(me); | |
239 pthread_setspecific(GetQueuePtrTls(), &queue_context); | |
240 | |
241 while (queue_context.is_active) | |
242 event_base_loop(me->event_base_, 0); | |
243 | |
244 pthread_setspecific(GetQueuePtrTls(), nullptr); | |
245 | |
246 for (TimerEvent* timer : queue_context.pending_timers_) | |
247 delete timer; | |
248 | |
249 return false; | |
250 } | |
251 | |
252 // static | |
253 void TaskQueue::OnWakeup(int socket, short flags, void* context) { // NOLINT | |
254 QueueContext* ctx = | |
255 static_cast<QueueContext*>(pthread_getspecific(GetQueuePtrTls())); | |
256 RTC_DCHECK(ctx->queue->wakeup_pipe_out_ == socket); | |
257 char buf; | |
258 RTC_CHECK(sizeof(buf) == read(socket, &buf, sizeof(buf))); | |
259 switch (buf) { | |
260 case kQuit: | |
261 ctx->is_active = false; | |
262 event_base_loopbreak(ctx->queue->event_base_); | |
263 break; | |
264 case kRunTask: { | |
265 std::unique_ptr<QueuedTask> task; | |
266 { | |
267 CritScope lock(&ctx->queue->pending_lock_); | |
268 RTC_DCHECK(!ctx->queue->pending_.empty()); | |
269 task = std::move(ctx->queue->pending_.front()); | |
270 ctx->queue->pending_.pop_front(); | |
271 RTC_DCHECK(task.get()); | |
272 } | |
273 if (!task->Run()) | |
274 task.release(); | |
275 break; | |
276 } | |
277 default: | |
278 RTC_NOTREACHED(); | |
279 break; | |
280 } | |
281 } | |
282 | |
283 // static | |
284 void TaskQueue::RunTask(int fd, short flags, void* context) { // NOLINT | |
285 auto* task = static_cast<QueuedTask*>(context); | |
286 if (task->Run()) | |
287 delete task; | |
288 } | |
289 | |
290 // static | |
291 void TaskQueue::RunTimer(int fd, short flags, void* context) { // NOLINT | |
292 TimerEvent* timer = static_cast<TimerEvent*>(context); | |
293 if (!timer->task->Run()) | |
294 timer->task.release(); | |
295 QueueContext* ctx = | |
296 static_cast<QueueContext*>(pthread_getspecific(GetQueuePtrTls())); | |
297 ctx->pending_timers_.remove(timer); | |
298 delete timer; | |
299 } | |
300 | |
301 void TaskQueue::PrepareReplyTask(PostAndReplyTask* reply_task) { | |
302 RTC_DCHECK(reply_task); | |
303 CritScope lock(&pending_lock_); | |
304 pending_replies_.push_back(reply_task); | |
305 } | |
306 | |
307 void TaskQueue::ReplyTaskDone(PostAndReplyTask* reply_task) { | |
308 CritScope lock(&pending_lock_); | |
309 pending_replies_.remove(reply_task); | |
310 } | |
311 | |
312 } // namespace rtc | |
OLD | NEW |