Chromium Code Reviews| OLD | NEW |
|---|---|
| (Empty) | |
| 1 /* | |
| 2 * Copyright 2016 The WebRTC Project Authors. All rights reserved. | |
| 3 * | |
| 4 * Use of this source code is governed by a BSD-style license | |
| 5 * that can be found in the LICENSE file in the root of the source | |
| 6 * tree. An additional intellectual property rights grant can be found | |
| 7 * in the file PATENTS. All contributing project authors may | |
| 8 * be found in the AUTHORS file in the root of the source tree. | |
| 9 */ | |
| 10 | |
| 11 #include "webrtc/base/task_queue.h" | |
| 12 | |
| 13 #include <fcntl.h> | |
| 14 #include <string.h> | |
| 15 #include <unistd.h> | |
| 16 | |
| 17 #include "base/third_party/libevent/event.h" | |
| 18 #include "webrtc/base/checks.h" | |
| 19 #include "webrtc/base/logging.h" | |
| 20 #include "webrtc/base/task_queue_posix.h" | |
| 21 #include "webrtc/base/timeutils.h" | |
| 22 | |
| 23 namespace rtc { | |
| 24 using internal::GetQueuePtrTls; | |
| 25 using internal::AutoSetCurrentQueuePtr; | |
| 26 | |
| 27 namespace { | |
| 28 static const char kQuit = 1; | |
| 29 static const char kRunTask = 2; | |
| 30 | |
| 31 struct TimerEvent { | |
| 32 explicit TimerEvent(std::unique_ptr<QueuedTask> task) | |
| 33 : task(std::move(task)) {} | |
| 34 ~TimerEvent() { event_del(&ev); } | |
| 35 event ev; | |
| 36 std::unique_ptr<QueuedTask> task; | |
| 37 }; | |
| 38 | |
| 39 bool SetNonBlocking(int fd) { | |
| 40 const int flags = fcntl(fd, F_GETFL); | |
| 41 RTC_CHECK(flags != -1); | |
| 42 return (flags & O_NONBLOCK) || fcntl(fd, F_SETFL, flags | O_NONBLOCK) != -1; | |
| 43 } | |
| 44 } // namespace | |
| 45 | |
| 46 struct TaskQueue::QueueContext { | |
| 47 explicit QueueContext(TaskQueue* q) : queue(q), is_active(true) {} | |
| 48 TaskQueue* queue; | |
| 49 bool is_active; | |
| 50 // Holds a list of events pending timers for cleanup when the loop exits. | |
| 51 std::list<TimerEvent*> pending_timers_; | |
| 52 }; | |
| 53 | |
| 54 class TaskQueue::PostAndReplyTask : public QueuedTask { | |
| 55 public: | |
| 56 PostAndReplyTask(std::unique_ptr<QueuedTask> task, | |
| 57 std::unique_ptr<QueuedTask> reply, | |
| 58 TaskQueue* reply_queue) | |
| 59 : task_(std::move(task)), | |
| 60 reply_(std::move(reply)), | |
| 61 reply_queue_(reply_queue) { | |
| 62 reply_queue->PrepareReplyTask(this); | |
| 63 } | |
| 64 | |
| 65 ~PostAndReplyTask() override { | |
| 66 CritScope lock(&lock_); | |
| 67 if (reply_queue_) | |
| 68 reply_queue_->ReplyTaskDone(this); | |
| 69 } | |
| 70 | |
| 71 void OnReplyQueueGone() { | |
| 72 CritScope lock(&lock_); | |
| 73 reply_queue_ = nullptr; | |
| 74 } | |
| 75 | |
| 76 private: | |
| 77 bool Run() override { | |
| 78 if (!task_->Run()) | |
| 79 task_.release(); | |
| 80 | |
| 81 CritScope lock(&lock_); | |
| 82 if (reply_queue_) | |
| 83 reply_queue_->PostTask(std::move(reply_)); | |
| 84 return true; | |
| 85 } | |
| 86 | |
| 87 CriticalSection lock_; | |
| 88 std::unique_ptr<QueuedTask> task_, reply_; | |
|
perkj_webrtc
2016/04/26 14:30:38
reply_ on separate line please
tommi
2016/04/28 12:04:02
Done.
| |
| 89 TaskQueue* reply_queue_ GUARDED_BY(lock_); | |
| 90 }; | |
| 91 | |
| 92 class TaskQueue::SetTimerTask : public QueuedTask { | |
| 93 public: | |
| 94 SetTimerTask(std::unique_ptr<QueuedTask> task, uint32_t milliseconds) | |
| 95 : task_(std::move(task)), | |
| 96 milliseconds_(milliseconds), | |
| 97 posted_(Time32()) {} | |
| 98 | |
| 99 private: | |
| 100 bool Run() override { | |
| 101 // Compensate for the time that has passed since construction | |
| 102 // and until we got here. | |
| 103 uint32_t post_time = Time32() - posted_; | |
|
perkj_webrtc
2016/04/26 14:30:38
Please check with nisse@ that we use the same cloc
tommi
2016/04/28 12:04:02
Yes, I'm following those changes. I might switch
| |
| 104 TaskQueue::Current()->PostDelayedTask( | |
| 105 std::move(task_), | |
| 106 post_time > milliseconds_ ? 0 : milliseconds_ - post_time); | |
| 107 return true; | |
| 108 } | |
| 109 | |
| 110 std::unique_ptr<QueuedTask> task_; | |
| 111 const uint32_t milliseconds_; | |
| 112 const uint32_t posted_; | |
| 113 }; | |
| 114 | |
| 115 TaskQueue::TaskQueue(const char* queue_name) | |
| 116 : event_base_(event_base_new()), | |
| 117 wakeup_event_(new event()), | |
| 118 thread_(&TaskQueue::ThreadMain, this, queue_name) { | |
| 119 RTC_DCHECK(queue_name); | |
| 120 int fds[2]; | |
| 121 RTC_CHECK(pipe(fds) == 0); | |
| 122 SetNonBlocking(fds[0]); | |
| 123 SetNonBlocking(fds[1]); | |
| 124 wakeup_pipe_out_ = fds[0]; | |
| 125 wakeup_pipe_in_ = fds[1]; | |
| 126 event_set(wakeup_event_.get(), wakeup_pipe_out_, EV_READ | EV_PERSIST, | |
| 127 OnWakeup, this); | |
| 128 event_base_set(event_base_, wakeup_event_.get()); | |
| 129 event_add(wakeup_event_.get(), 0); | |
| 130 thread_.Start(); | |
| 131 } | |
| 132 | |
| 133 TaskQueue::~TaskQueue() { | |
| 134 RTC_DCHECK(!IsCurrent()); | |
| 135 struct timespec ts; | |
| 136 char message = kQuit; | |
| 137 while (write(wakeup_pipe_in_, &message, sizeof(message)) != sizeof(message)) { | |
| 138 // The queue is full, so we have no choice but to wait and retry. | |
| 139 RTC_CHECK_EQ(EAGAIN, errno); | |
| 140 ts.tv_sec = 0; | |
| 141 ts.tv_nsec = 1000000; | |
| 142 nanosleep(&ts, nullptr); | |
| 143 } | |
| 144 | |
| 145 thread_.Stop(); | |
| 146 | |
| 147 event_del(wakeup_event_.get()); | |
| 148 close(wakeup_pipe_in_); | |
| 149 close(wakeup_pipe_out_); | |
| 150 wakeup_pipe_in_ = -1; | |
| 151 wakeup_pipe_out_ = -1; | |
| 152 | |
| 153 for (auto* reply : pending_replies_) | |
| 154 reply->OnReplyQueueGone(); | |
| 155 pending_replies_.clear(); | |
|
perkj_webrtc
2016/04/26 14:30:38
Who owns the PostAndReplyTasks?
tommi
2016/04/28 12:04:02
They are stored in |pending_| or, if they can't be
| |
| 156 | |
| 157 event_base_free(event_base_); | |
| 158 } | |
| 159 | |
| 160 // static | |
| 161 TaskQueue* TaskQueue::Current() { | |
| 162 QueueContext* ctx = | |
| 163 static_cast<QueueContext*>(pthread_getspecific(GetQueuePtrTls())); | |
| 164 return ctx ? ctx->queue : nullptr; | |
| 165 } | |
| 166 | |
| 167 // static | |
| 168 bool TaskQueue::IsCurrent(const char* queue_name) { | |
| 169 TaskQueue* current = Current(); | |
| 170 return current && current->thread_.name().compare(queue_name) == 0; | |
| 171 } | |
| 172 | |
| 173 bool TaskQueue::IsCurrent() const { | |
| 174 return IsThreadRefEqual(thread_.GetThreadRef(), CurrentThreadRef()); | |
| 175 } | |
| 176 | |
| 177 void TaskQueue::PostTask(std::unique_ptr<QueuedTask> task) { | |
| 178 RTC_DCHECK(task.get()); | |
| 179 // libevent isn't thread safe. This means that we can't use methods such | |
|
perkj_webrtc
2016/04/26 14:30:38
? Didn't you say you fixed that in libevent?
tommi
2016/04/28 12:04:02
I didn't make libevent thread safe but I did remov
| |
| 180 // as event_base_once to post tasks to the worker thread from a different | |
| 181 // thread. However, we can use it when posting from the worker thread itself. | |
| 182 if (IsCurrent()) { | |
| 183 if (event_base_once(event_base_, -1, EV_TIMEOUT, &TaskQueue::RunTask, | |
| 184 task.get(), nullptr) == 0) { | |
| 185 task.release(); | |
| 186 } | |
| 187 } else { | |
| 188 QueuedTask* task_id = task.get(); // Only used for comparison. | |
| 189 { | |
| 190 CritScope lock(&pending_lock_); | |
| 191 pending_.push_back(std::move(task)); | |
| 192 } | |
| 193 char message = kRunTask; | |
| 194 if (write(wakeup_pipe_in_, &message, sizeof(message)) != sizeof(message)) { | |
| 195 LOG(WARNING) << "Failed to queue task."; | |
| 196 CritScope lock(&pending_lock_); | |
| 197 pending_.remove_if([task_id](std::unique_ptr<QueuedTask>& t) { | |
| 198 return t.get() == task_id; | |
| 199 }); | |
| 200 } | |
| 201 } | |
| 202 } | |
| 203 | |
| 204 void TaskQueue::PostDelayedTask(std::unique_ptr<QueuedTask> task, | |
| 205 uint32_t milliseconds) { | |
| 206 if (IsCurrent()) { | |
| 207 TimerEvent* timer = new TimerEvent(std::move(task)); | |
| 208 evtimer_set(&timer->ev, &TaskQueue::RunTimer, timer); | |
| 209 event_base_set(event_base_, &timer->ev); | |
| 210 QueueContext* ctx = | |
| 211 static_cast<QueueContext*>(pthread_getspecific(GetQueuePtrTls())); | |
| 212 ctx->pending_timers_.push_back(timer); | |
| 213 timeval tv = {milliseconds / 1000, (milliseconds % 1000) * 1000}; | |
| 214 event_add(&timer->ev, &tv); | |
| 215 } else { | |
| 216 PostTask(std::unique_ptr<QueuedTask>( | |
| 217 new SetTimerTask(std::move(task), milliseconds))); | |
| 218 } | |
| 219 } | |
| 220 | |
| 221 void TaskQueue::PostTaskAndReply(std::unique_ptr<QueuedTask> task, | |
| 222 std::unique_ptr<QueuedTask> reply, | |
| 223 TaskQueue* reply_queue) { | |
| 224 std::unique_ptr<QueuedTask> wrapper_task( | |
| 225 new PostAndReplyTask(std::move(task), std::move(reply), reply_queue)); | |
| 226 PostTask(std::move(wrapper_task)); | |
|
tommi
2016/04/28 12:04:02
Here is where ownership of PostAndReplyTasks is ha
| |
| 227 } | |
| 228 | |
| 229 void TaskQueue::PostTaskAndReply(std::unique_ptr<QueuedTask> task, | |
| 230 std::unique_ptr<QueuedTask> reply) { | |
| 231 return PostTaskAndReply(std::move(task), std::move(reply), Current()); | |
| 232 } | |
| 233 | |
| 234 // static | |
| 235 bool TaskQueue::ThreadMain(void* context) { | |
| 236 TaskQueue* me = static_cast<TaskQueue*>(context); | |
| 237 | |
| 238 QueueContext queue_context(me); | |
| 239 pthread_setspecific(GetQueuePtrTls(), &queue_context); | |
| 240 | |
| 241 while (queue_context.is_active) | |
| 242 event_base_loop(me->event_base_, 0); | |
| 243 | |
| 244 pthread_setspecific(GetQueuePtrTls(), nullptr); | |
| 245 | |
| 246 for (TimerEvent* timer : queue_context.pending_timers_) | |
| 247 delete timer; | |
| 248 | |
| 249 return false; | |
| 250 } | |
| 251 | |
| 252 // static | |
| 253 void TaskQueue::OnWakeup(int socket, short flags, void* context) { // NOLINT | |
| 254 QueueContext* ctx = | |
| 255 static_cast<QueueContext*>(pthread_getspecific(GetQueuePtrTls())); | |
| 256 RTC_DCHECK(ctx->queue->wakeup_pipe_out_ == socket); | |
| 257 char buf; | |
| 258 RTC_CHECK(sizeof(buf) == read(socket, &buf, sizeof(buf))); | |
| 259 switch (buf) { | |
| 260 case kQuit: | |
| 261 ctx->is_active = false; | |
| 262 event_base_loopbreak(ctx->queue->event_base_); | |
| 263 break; | |
| 264 case kRunTask: { | |
| 265 std::unique_ptr<QueuedTask> task; | |
| 266 { | |
| 267 CritScope lock(&ctx->queue->pending_lock_); | |
| 268 RTC_DCHECK(!ctx->queue->pending_.empty()); | |
| 269 task = std::move(ctx->queue->pending_.front()); | |
| 270 ctx->queue->pending_.pop_front(); | |
| 271 RTC_DCHECK(task.get()); | |
| 272 } | |
| 273 if (!task->Run()) | |
| 274 task.release(); | |
| 275 break; | |
| 276 } | |
| 277 default: | |
| 278 RTC_NOTREACHED(); | |
| 279 break; | |
| 280 } | |
| 281 } | |
| 282 | |
| 283 // static | |
| 284 void TaskQueue::RunTask(int fd, short flags, void* context) { // NOLINT | |
| 285 auto* task = static_cast<QueuedTask*>(context); | |
| 286 if (task->Run()) | |
| 287 delete task; | |
| 288 } | |
| 289 | |
| 290 // static | |
| 291 void TaskQueue::RunTimer(int fd, short flags, void* context) { // NOLINT | |
| 292 TimerEvent* timer = static_cast<TimerEvent*>(context); | |
| 293 if (!timer->task->Run()) | |
| 294 timer->task.release(); | |
| 295 QueueContext* ctx = | |
| 296 static_cast<QueueContext*>(pthread_getspecific(GetQueuePtrTls())); | |
| 297 ctx->pending_timers_.remove(timer); | |
| 298 delete timer; | |
| 299 } | |
| 300 | |
| 301 void TaskQueue::PrepareReplyTask(PostAndReplyTask* reply_task) { | |
| 302 RTC_DCHECK(reply_task); | |
| 303 CritScope lock(&pending_lock_); | |
| 304 pending_replies_.push_back(reply_task); | |
| 305 } | |
| 306 | |
| 307 void TaskQueue::ReplyTaskDone(PostAndReplyTask* reply_task) { | |
| 308 CritScope lock(&pending_lock_); | |
| 309 pending_replies_.remove(reply_task); | |
| 310 } | |
| 311 | |
| 312 } // namespace rtc | |
| OLD | NEW |