| OLD | NEW |
| 1 /* | 1 /* |
| 2 * Copyright 2016 The WebRTC Project Authors. All rights reserved. | 2 * Copyright 2016 The WebRTC Project Authors. All rights reserved. |
| 3 * | 3 * |
| 4 * Use of this source code is governed by a BSD-style license | 4 * Use of this source code is governed by a BSD-style license |
| 5 * that can be found in the LICENSE file in the root of the source | 5 * that can be found in the LICENSE file in the root of the source |
| 6 * tree. An additional intellectual property rights grant can be found | 6 * tree. An additional intellectual property rights grant can be found |
| 7 * in the file PATENTS. All contributing project authors may | 7 * in the file PATENTS. All contributing project authors may |
| 8 * be found in the AUTHORS file in the root of the source tree. | 8 * be found in the AUTHORS file in the root of the source tree. |
| 9 */ | 9 */ |
| 10 | 10 |
| 11 #include "webrtc/base/task_queue.h" | 11 #include "webrtc/base/task_queue.h" |
| 12 | 12 |
| 13 #include <fcntl.h> | 13 #include <fcntl.h> |
| 14 #include <string.h> | 14 #include <string.h> |
| 15 #include <unistd.h> | 15 #include <unistd.h> |
| 16 #include <time.h> |
| 16 | 17 |
| 17 #include "base/third_party/libevent/event.h" | 18 #include "base/third_party/libevent/event.h" |
| 18 #include "webrtc/base/checks.h" | 19 #include "webrtc/base/checks.h" |
| 19 #include "webrtc/base/logging.h" | 20 #include "webrtc/base/logging.h" |
| 20 #include "webrtc/base/task_queue_posix.h" | 21 #include "webrtc/base/task_queue_posix.h" |
| 21 #include "webrtc/base/timeutils.h" | 22 #include "webrtc/base/timeutils.h" |
| 22 | 23 |
| 23 namespace rtc { | 24 namespace rtc { |
| 24 using internal::GetQueuePtrTls; | 25 using internal::GetQueuePtrTls; |
| 25 using internal::AutoSetCurrentQueuePtr; | 26 using internal::AutoSetCurrentQueuePtr; |
| (...skipping 38 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 64 } // namespace | 65 } // namespace |
| 65 | 66 |
| 66 struct TaskQueue::QueueContext { | 67 struct TaskQueue::QueueContext { |
| 67 explicit QueueContext(TaskQueue* q) : queue(q), is_active(true) {} | 68 explicit QueueContext(TaskQueue* q) : queue(q), is_active(true) {} |
| 68 TaskQueue* queue; | 69 TaskQueue* queue; |
| 69 bool is_active; | 70 bool is_active; |
| 70 // Holds a list of events pending timers for cleanup when the loop exits. | 71 // Holds a list of events pending timers for cleanup when the loop exits. |
| 71 std::list<TimerEvent*> pending_timers_; | 72 std::list<TimerEvent*> pending_timers_; |
| 72 }; | 73 }; |
| 73 | 74 |
| 74 class TaskQueue::PostAndReplyTask : public QueuedTask { | 75 class TaskQueue::PostAndReplyTask { |
| 75 public: | 76 public: |
| 76 PostAndReplyTask(std::unique_ptr<QueuedTask> task, | 77 PostAndReplyTask(std::unique_ptr<QueuedTask> task, |
| 77 std::unique_ptr<QueuedTask> reply, | 78 std::unique_ptr<QueuedTask> reply, |
| 78 TaskQueue* reply_queue) | 79 TaskQueue* reply_queue) |
| 79 : task_(std::move(task)), | 80 : task_(std::move(task)), |
| 80 reply_(std::move(reply)), | 81 reply_(std::move(reply)), |
| 81 reply_queue_(reply_queue) { | 82 reply_queue_(reply_queue) { |
| 82 reply_queue->PrepareReplyTask(this); | |
| 83 } | |
| 84 | |
| 85 ~PostAndReplyTask() override { | |
| 86 CritScope lock(&lock_); | |
| 87 if (reply_queue_) | |
| 88 reply_queue_->ReplyTaskDone(this); | |
| 89 } | 83 } |
| 90 | 84 |
| 91 void OnReplyQueueGone() { | 85 void OnReplyQueueGone() { |
| 92 CritScope lock(&lock_); | 86 CritScope lock(&lock_); |
| 93 reply_queue_ = nullptr; | 87 reply_queue_ = nullptr; |
| 94 } | 88 } |
| 95 | 89 |
| 96 private: | 90 private: |
| 97 bool Run() override { | 91 bool Run() { |
| 98 if (!task_->Run()) | 92 if (!task_->Run()) |
| 99 task_.release(); | 93 task_.release(); |
| 100 | 94 |
| 101 CritScope lock(&lock_); | 95 CritScope lock(&lock_); |
| 102 if (reply_queue_) | 96 if (reply_queue_) { |
| 103 reply_queue_->PostTask(std::move(reply_)); | 97 reply_queue_->PostTask(std::move(reply_)); |
| 98 reply_queue_->PostAndReplyTaskDone(this); |
| 99 } |
| 104 return true; | 100 return true; |
| 105 } | 101 } |
| 106 | 102 |
| 107 CriticalSection lock_; | 103 CriticalSection lock_; |
| 108 std::unique_ptr<QueuedTask> task_; | 104 std::unique_ptr<QueuedTask> task_; |
| 109 std::unique_ptr<QueuedTask> reply_; | 105 std::unique_ptr<QueuedTask> reply_; |
| 110 TaskQueue* reply_queue_ GUARDED_BY(lock_); | 106 TaskQueue* reply_queue_ GUARDED_BY(lock_); |
| 107 friend class PostAndReplyTaskRefOwner; |
| 108 }; |
| 109 |
| 110 // Inherits from QueuedTask and owns a ref to PostAndReplyTaskRef. |
| 111 class TaskQueue::PostAndReplyTaskRefOwner : public QueuedTask { |
| 112 public: |
| 113 PostAndReplyTaskRefOwner(scoped_refptr<PostAndReplyTaskRef> task) |
| 114 : task_(task) {} |
| 115 |
| 116 private: |
| 117 bool Run() override { return task_->Run(); } |
| 118 |
| 119 scoped_refptr<PostAndReplyTaskRef> task_; |
| 111 }; | 120 }; |
| 112 | 121 |
| 113 class TaskQueue::SetTimerTask : public QueuedTask { | 122 class TaskQueue::SetTimerTask : public QueuedTask { |
| 114 public: | 123 public: |
| 115 SetTimerTask(std::unique_ptr<QueuedTask> task, uint32_t milliseconds) | 124 SetTimerTask(std::unique_ptr<QueuedTask> task, uint32_t milliseconds) |
| 116 : task_(std::move(task)), | 125 : task_(std::move(task)), |
| 117 milliseconds_(milliseconds), | 126 milliseconds_(milliseconds), |
| 118 posted_(Time32()) {} | 127 posted_(Time32()) {} |
| 119 | 128 |
| 120 private: | 129 private: |
| (...skipping 24 matching lines...) Expand all Loading... |
| 145 wakeup_pipe_out_ = fds[0]; | 154 wakeup_pipe_out_ = fds[0]; |
| 146 wakeup_pipe_in_ = fds[1]; | 155 wakeup_pipe_in_ = fds[1]; |
| 147 EventAssign(wakeup_event_.get(), event_base_, wakeup_pipe_out_, | 156 EventAssign(wakeup_event_.get(), event_base_, wakeup_pipe_out_, |
| 148 EV_READ | EV_PERSIST, OnWakeup, this); | 157 EV_READ | EV_PERSIST, OnWakeup, this); |
| 149 event_add(wakeup_event_.get(), 0); | 158 event_add(wakeup_event_.get(), 0); |
| 150 thread_.Start(); | 159 thread_.Start(); |
| 151 } | 160 } |
| 152 | 161 |
| 153 TaskQueue::~TaskQueue() { | 162 TaskQueue::~TaskQueue() { |
| 154 RTC_DCHECK(!IsCurrent()); | 163 RTC_DCHECK(!IsCurrent()); |
| 164 destroying_ = true; |
| 155 struct timespec ts; | 165 struct timespec ts; |
| 156 char message = kQuit; | 166 char message = kQuit; |
| 157 while (write(wakeup_pipe_in_, &message, sizeof(message)) != sizeof(message)) { | 167 while (write(wakeup_pipe_in_, &message, sizeof(message)) != sizeof(message)) { |
| 158 // The queue is full, so we have no choice but to wait and retry. | 168 // The queue is full, so we have no choice but to wait and retry. |
| 159 RTC_CHECK_EQ(EAGAIN, errno); | 169 RTC_CHECK_EQ(EAGAIN, errno); |
| 160 ts.tv_sec = 0; | 170 ts.tv_sec = 0; |
| 161 ts.tv_nsec = 1000000; | 171 ts.tv_nsec = 1000000; |
| 162 nanosleep(&ts, nullptr); | 172 nanosleep(&ts, nullptr); |
| 163 } | 173 } |
| 164 | 174 |
| 165 thread_.Stop(); | 175 thread_.Stop(); |
| 166 | 176 |
| 167 event_del(wakeup_event_.get()); | 177 event_del(wakeup_event_.get()); |
| 168 close(wakeup_pipe_in_); | 178 close(wakeup_pipe_in_); |
| 169 close(wakeup_pipe_out_); | 179 close(wakeup_pipe_out_); |
| 170 wakeup_pipe_in_ = -1; | 180 wakeup_pipe_in_ = -1; |
| 171 wakeup_pipe_out_ = -1; | 181 wakeup_pipe_out_ = -1; |
| 172 | 182 |
| 183 std::list<scoped_refptr<PostAndReplyTaskRef>> pending_replies; |
| 173 { | 184 { |
| 174 // Synchronize against any pending reply tasks that might be running on | 185 // Synchronize against any pending reply tasks that might be running on |
| 175 // other queues. | 186 // other queues. Note that, since |destroying_| was set to true above, it |
| 187 // will be impossible for |pending_replies_| to grow after this. |
| 176 CritScope lock(&pending_lock_); | 188 CritScope lock(&pending_lock_); |
| 177 for (auto* reply : pending_replies_) | 189 pending_replies.swap(pending_replies_); |
| 178 reply->OnReplyQueueGone(); | |
| 179 pending_replies_.clear(); | |
| 180 } | 190 } |
| 191 for (auto reply : pending_replies) { |
| 192 reply->OnReplyQueueGone(); |
| 193 } |
| 194 pending_replies.clear(); |
| 181 | 195 |
| 182 event_base_free(event_base_); | 196 event_base_free(event_base_); |
| 183 } | 197 } |
| 184 | 198 |
| 185 // static | 199 // static |
| 186 TaskQueue* TaskQueue::Current() { | 200 TaskQueue* TaskQueue::Current() { |
| 187 QueueContext* ctx = | 201 QueueContext* ctx = |
| 188 static_cast<QueueContext*>(pthread_getspecific(GetQueuePtrTls())); | 202 static_cast<QueueContext*>(pthread_getspecific(GetQueuePtrTls())); |
| 189 return ctx ? ctx->queue : nullptr; | 203 return ctx ? ctx->queue : nullptr; |
| 190 } | 204 } |
| (...skipping 47 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 238 event_add(&timer->ev, &tv); | 252 event_add(&timer->ev, &tv); |
| 239 } else { | 253 } else { |
| 240 PostTask(std::unique_ptr<QueuedTask>( | 254 PostTask(std::unique_ptr<QueuedTask>( |
| 241 new SetTimerTask(std::move(task), milliseconds))); | 255 new SetTimerTask(std::move(task), milliseconds))); |
| 242 } | 256 } |
| 243 } | 257 } |
| 244 | 258 |
| 245 void TaskQueue::PostTaskAndReply(std::unique_ptr<QueuedTask> task, | 259 void TaskQueue::PostTaskAndReply(std::unique_ptr<QueuedTask> task, |
| 246 std::unique_ptr<QueuedTask> reply, | 260 std::unique_ptr<QueuedTask> reply, |
| 247 TaskQueue* reply_queue) { | 261 TaskQueue* reply_queue) { |
| 248 std::unique_ptr<QueuedTask> wrapper_task( | 262 scoped_refptr<PostAndReplyTaskRef> ref( |
| 249 new PostAndReplyTask(std::move(task), std::move(reply), reply_queue)); | 263 new PostAndReplyTaskRef(std::move(task), std::move(reply), reply_queue)); |
| 250 PostTask(std::move(wrapper_task)); | 264 reply_queue->PrepareReplyTask(ref); |
| 265 PostTask(std::unique_ptr<QueuedTask>(new PostAndReplyTaskRefOwner(ref))); |
| 251 } | 266 } |
| 252 | 267 |
| 253 void TaskQueue::PostTaskAndReply(std::unique_ptr<QueuedTask> task, | 268 void TaskQueue::PostTaskAndReply(std::unique_ptr<QueuedTask> task, |
| 254 std::unique_ptr<QueuedTask> reply) { | 269 std::unique_ptr<QueuedTask> reply) { |
| 255 return PostTaskAndReply(std::move(task), std::move(reply), Current()); | 270 return PostTaskAndReply(std::move(task), std::move(reply), Current()); |
| 256 } | 271 } |
| 257 | 272 |
| 258 // static | 273 // static |
| 259 bool TaskQueue::ThreadMain(void* context) { | 274 bool TaskQueue::ThreadMain(void* context) { |
| 260 TaskQueue* me = static_cast<TaskQueue*>(context); | 275 TaskQueue* me = static_cast<TaskQueue*>(context); |
| (...skipping 54 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 315 void TaskQueue::RunTimer(int fd, short flags, void* context) { // NOLINT | 330 void TaskQueue::RunTimer(int fd, short flags, void* context) { // NOLINT |
| 316 TimerEvent* timer = static_cast<TimerEvent*>(context); | 331 TimerEvent* timer = static_cast<TimerEvent*>(context); |
| 317 if (!timer->task->Run()) | 332 if (!timer->task->Run()) |
| 318 timer->task.release(); | 333 timer->task.release(); |
| 319 QueueContext* ctx = | 334 QueueContext* ctx = |
| 320 static_cast<QueueContext*>(pthread_getspecific(GetQueuePtrTls())); | 335 static_cast<QueueContext*>(pthread_getspecific(GetQueuePtrTls())); |
| 321 ctx->pending_timers_.remove(timer); | 336 ctx->pending_timers_.remove(timer); |
| 322 delete timer; | 337 delete timer; |
| 323 } | 338 } |
| 324 | 339 |
| 325 void TaskQueue::PrepareReplyTask(PostAndReplyTask* reply_task) { | 340 void TaskQueue::PrepareReplyTask( |
| 341 scoped_refptr<PostAndReplyTaskRef> reply_task) { |
| 326 RTC_DCHECK(reply_task); | 342 RTC_DCHECK(reply_task); |
| 327 CritScope lock(&pending_lock_); | 343 CritScope lock(&pending_lock_); |
| 344 if (destroying_) { |
| 345 return; |
| 346 } |
| 328 pending_replies_.push_back(reply_task); | 347 pending_replies_.push_back(reply_task); |
| 329 } | 348 } |
| 330 | 349 |
| 331 void TaskQueue::ReplyTaskDone(PostAndReplyTask* reply_task) { | 350 void TaskQueue::PostAndReplyTaskDone(PostAndReplyTask* reply_task) { |
| 332 CritScope lock(&pending_lock_); | 351 CritScope lock(&pending_lock_); |
| 333 pending_replies_.remove(reply_task); | 352 // Note: If the destructor is currently being run, and "pending_replies_" is |
| 353 // already swapped out, this may fail to find |reply_task|, which is ok. |
| 354 pending_replies_.remove_if( |
| 355 [reply_task](scoped_refptr<PostAndReplyTaskRef> task) { |
| 356 return task.get() == reply_task; |
| 357 }); |
| 334 } | 358 } |
| 335 | 359 |
| 336 } // namespace rtc | 360 } // namespace rtc |
| OLD | NEW |