OLD | NEW |
1 /* | 1 /* |
2 * Copyright 2016 The WebRTC Project Authors. All rights reserved. | 2 * Copyright 2016 The WebRTC Project Authors. All rights reserved. |
3 * | 3 * |
4 * Use of this source code is governed by a BSD-style license | 4 * Use of this source code is governed by a BSD-style license |
5 * that can be found in the LICENSE file in the root of the source | 5 * that can be found in the LICENSE file in the root of the source |
6 * tree. An additional intellectual property rights grant can be found | 6 * tree. An additional intellectual property rights grant can be found |
7 * in the file PATENTS. All contributing project authors may | 7 * in the file PATENTS. All contributing project authors may |
8 * be found in the AUTHORS file in the root of the source tree. | 8 * be found in the AUTHORS file in the root of the source tree. |
9 */ | 9 */ |
10 | 10 |
11 #include "webrtc/base/task_queue.h" | 11 #include "webrtc/base/task_queue.h" |
12 | 12 |
13 #include <fcntl.h> | 13 #include <fcntl.h> |
14 #include <string.h> | 14 #include <string.h> |
15 #include <unistd.h> | 15 #include <unistd.h> |
| 16 #include <time.h> |
16 | 17 |
17 #include "base/third_party/libevent/event.h" | 18 #include "base/third_party/libevent/event.h" |
18 #include "webrtc/base/checks.h" | 19 #include "webrtc/base/checks.h" |
19 #include "webrtc/base/logging.h" | 20 #include "webrtc/base/logging.h" |
20 #include "webrtc/base/task_queue_posix.h" | 21 #include "webrtc/base/task_queue_posix.h" |
21 #include "webrtc/base/timeutils.h" | 22 #include "webrtc/base/timeutils.h" |
22 | 23 |
23 namespace rtc { | 24 namespace rtc { |
24 using internal::GetQueuePtrTls; | 25 using internal::GetQueuePtrTls; |
25 using internal::AutoSetCurrentQueuePtr; | 26 using internal::AutoSetCurrentQueuePtr; |
(...skipping 38 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
64 } // namespace | 65 } // namespace |
65 | 66 |
66 struct TaskQueue::QueueContext { | 67 struct TaskQueue::QueueContext { |
67 explicit QueueContext(TaskQueue* q) : queue(q), is_active(true) {} | 68 explicit QueueContext(TaskQueue* q) : queue(q), is_active(true) {} |
68 TaskQueue* queue; | 69 TaskQueue* queue; |
69 bool is_active; | 70 bool is_active; |
70 // Holds a list of events pending timers for cleanup when the loop exits. | 71 // Holds a list of events pending timers for cleanup when the loop exits. |
71 std::list<TimerEvent*> pending_timers_; | 72 std::list<TimerEvent*> pending_timers_; |
72 }; | 73 }; |
73 | 74 |
74 class TaskQueue::PostAndReplyTask : public QueuedTask { | 75 class TaskQueue::PostAndReplyTask { |
75 public: | 76 public: |
76 PostAndReplyTask(std::unique_ptr<QueuedTask> task, | 77 PostAndReplyTask(std::unique_ptr<QueuedTask> task, |
77 std::unique_ptr<QueuedTask> reply, | 78 std::unique_ptr<QueuedTask> reply, |
78 TaskQueue* reply_queue) | 79 TaskQueue* reply_queue) |
79 : task_(std::move(task)), | 80 : task_(std::move(task)), |
80 reply_(std::move(reply)), | 81 reply_(std::move(reply)), |
81 reply_queue_(reply_queue) { | 82 reply_queue_(reply_queue) { |
82 reply_queue->PrepareReplyTask(this); | |
83 } | |
84 | |
85 ~PostAndReplyTask() override { | |
86 CritScope lock(&lock_); | |
87 if (reply_queue_) | |
88 reply_queue_->ReplyTaskDone(this); | |
89 } | 83 } |
90 | 84 |
91 void OnReplyQueueGone() { | 85 void OnReplyQueueGone() { |
92 CritScope lock(&lock_); | 86 CritScope lock(&lock_); |
93 reply_queue_ = nullptr; | 87 reply_queue_ = nullptr; |
94 } | 88 } |
95 | 89 |
96 private: | 90 private: |
97 bool Run() override { | 91 bool Run() { |
98 if (!task_->Run()) | 92 if (!task_->Run()) |
99 task_.release(); | 93 task_.release(); |
100 | 94 |
101 CritScope lock(&lock_); | 95 CritScope lock(&lock_); |
102 if (reply_queue_) | 96 if (reply_queue_) { |
103 reply_queue_->PostTask(std::move(reply_)); | 97 reply_queue_->PostTask(std::move(reply_)); |
| 98 reply_queue_->PostAndReplyTaskDone(this); |
| 99 } |
104 return true; | 100 return true; |
105 } | 101 } |
106 | 102 |
107 CriticalSection lock_; | 103 CriticalSection lock_; |
108 std::unique_ptr<QueuedTask> task_; | 104 std::unique_ptr<QueuedTask> task_; |
109 std::unique_ptr<QueuedTask> reply_; | 105 std::unique_ptr<QueuedTask> reply_; |
110 TaskQueue* reply_queue_ GUARDED_BY(lock_); | 106 TaskQueue* reply_queue_ GUARDED_BY(lock_); |
| 107 friend class PostAndReplyTaskRefOwner; |
| 108 }; |
| 109 |
| 110 // Inherits from QueuedTask and owns a ref to PostAndReplyTaskRef. |
| 111 class TaskQueue::PostAndReplyTaskRefOwner : public QueuedTask { |
| 112 public: |
| 113 PostAndReplyTaskRefOwner(scoped_refptr<PostAndReplyTaskRef> task) |
| 114 : task_(task) {} |
| 115 |
| 116 private: |
| 117 bool Run() override { return task_->Run(); } |
| 118 |
| 119 scoped_refptr<PostAndReplyTaskRef> task_; |
111 }; | 120 }; |
112 | 121 |
113 class TaskQueue::SetTimerTask : public QueuedTask { | 122 class TaskQueue::SetTimerTask : public QueuedTask { |
114 public: | 123 public: |
115 SetTimerTask(std::unique_ptr<QueuedTask> task, uint32_t milliseconds) | 124 SetTimerTask(std::unique_ptr<QueuedTask> task, uint32_t milliseconds) |
116 : task_(std::move(task)), | 125 : task_(std::move(task)), |
117 milliseconds_(milliseconds), | 126 milliseconds_(milliseconds), |
118 posted_(Time32()) {} | 127 posted_(Time32()) {} |
119 | 128 |
120 private: | 129 private: |
(...skipping 24 matching lines...) Expand all Loading... |
145 wakeup_pipe_out_ = fds[0]; | 154 wakeup_pipe_out_ = fds[0]; |
146 wakeup_pipe_in_ = fds[1]; | 155 wakeup_pipe_in_ = fds[1]; |
147 EventAssign(wakeup_event_.get(), event_base_, wakeup_pipe_out_, | 156 EventAssign(wakeup_event_.get(), event_base_, wakeup_pipe_out_, |
148 EV_READ | EV_PERSIST, OnWakeup, this); | 157 EV_READ | EV_PERSIST, OnWakeup, this); |
149 event_add(wakeup_event_.get(), 0); | 158 event_add(wakeup_event_.get(), 0); |
150 thread_.Start(); | 159 thread_.Start(); |
151 } | 160 } |
152 | 161 |
153 TaskQueue::~TaskQueue() { | 162 TaskQueue::~TaskQueue() { |
154 RTC_DCHECK(!IsCurrent()); | 163 RTC_DCHECK(!IsCurrent()); |
| 164 destroying_ = true; |
155 struct timespec ts; | 165 struct timespec ts; |
156 char message = kQuit; | 166 char message = kQuit; |
157 while (write(wakeup_pipe_in_, &message, sizeof(message)) != sizeof(message)) { | 167 while (write(wakeup_pipe_in_, &message, sizeof(message)) != sizeof(message)) { |
158 // The queue is full, so we have no choice but to wait and retry. | 168 // The queue is full, so we have no choice but to wait and retry. |
159 RTC_CHECK_EQ(EAGAIN, errno); | 169 RTC_CHECK_EQ(EAGAIN, errno); |
160 ts.tv_sec = 0; | 170 ts.tv_sec = 0; |
161 ts.tv_nsec = 1000000; | 171 ts.tv_nsec = 1000000; |
162 nanosleep(&ts, nullptr); | 172 nanosleep(&ts, nullptr); |
163 } | 173 } |
164 | 174 |
165 thread_.Stop(); | 175 thread_.Stop(); |
166 | 176 |
167 event_del(wakeup_event_.get()); | 177 event_del(wakeup_event_.get()); |
168 close(wakeup_pipe_in_); | 178 close(wakeup_pipe_in_); |
169 close(wakeup_pipe_out_); | 179 close(wakeup_pipe_out_); |
170 wakeup_pipe_in_ = -1; | 180 wakeup_pipe_in_ = -1; |
171 wakeup_pipe_out_ = -1; | 181 wakeup_pipe_out_ = -1; |
172 | 182 |
| 183 std::list<scoped_refptr<PostAndReplyTaskRef>> pending_replies; |
173 { | 184 { |
174 // Synchronize against any pending reply tasks that might be running on | 185 // Synchronize against any pending reply tasks that might be running on |
175 // other queues. | 186 // other queues. Note that, since |destroying_| was set to true above, it |
| 187 // will be impossible for |pending_replies_| to grow after this. |
176 CritScope lock(&pending_lock_); | 188 CritScope lock(&pending_lock_); |
177 for (auto* reply : pending_replies_) | 189 pending_replies.swap(pending_replies_); |
178 reply->OnReplyQueueGone(); | |
179 pending_replies_.clear(); | |
180 } | 190 } |
| 191 for (auto reply : pending_replies) { |
| 192 reply->OnReplyQueueGone(); |
| 193 } |
| 194 pending_replies.clear(); |
181 | 195 |
182 event_base_free(event_base_); | 196 event_base_free(event_base_); |
183 } | 197 } |
184 | 198 |
185 // static | 199 // static |
186 TaskQueue* TaskQueue::Current() { | 200 TaskQueue* TaskQueue::Current() { |
187 QueueContext* ctx = | 201 QueueContext* ctx = |
188 static_cast<QueueContext*>(pthread_getspecific(GetQueuePtrTls())); | 202 static_cast<QueueContext*>(pthread_getspecific(GetQueuePtrTls())); |
189 return ctx ? ctx->queue : nullptr; | 203 return ctx ? ctx->queue : nullptr; |
190 } | 204 } |
(...skipping 47 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
238 event_add(&timer->ev, &tv); | 252 event_add(&timer->ev, &tv); |
239 } else { | 253 } else { |
240 PostTask(std::unique_ptr<QueuedTask>( | 254 PostTask(std::unique_ptr<QueuedTask>( |
241 new SetTimerTask(std::move(task), milliseconds))); | 255 new SetTimerTask(std::move(task), milliseconds))); |
242 } | 256 } |
243 } | 257 } |
244 | 258 |
245 void TaskQueue::PostTaskAndReply(std::unique_ptr<QueuedTask> task, | 259 void TaskQueue::PostTaskAndReply(std::unique_ptr<QueuedTask> task, |
246 std::unique_ptr<QueuedTask> reply, | 260 std::unique_ptr<QueuedTask> reply, |
247 TaskQueue* reply_queue) { | 261 TaskQueue* reply_queue) { |
248 std::unique_ptr<QueuedTask> wrapper_task( | 262 scoped_refptr<PostAndReplyTaskRef> ref( |
249 new PostAndReplyTask(std::move(task), std::move(reply), reply_queue)); | 263 new PostAndReplyTaskRef(std::move(task), std::move(reply), reply_queue)); |
250 PostTask(std::move(wrapper_task)); | 264 reply_queue->PrepareReplyTask(ref); |
| 265 PostTask(std::unique_ptr<QueuedTask>(new PostAndReplyTaskRefOwner(ref))); |
251 } | 266 } |
252 | 267 |
253 void TaskQueue::PostTaskAndReply(std::unique_ptr<QueuedTask> task, | 268 void TaskQueue::PostTaskAndReply(std::unique_ptr<QueuedTask> task, |
254 std::unique_ptr<QueuedTask> reply) { | 269 std::unique_ptr<QueuedTask> reply) { |
255 return PostTaskAndReply(std::move(task), std::move(reply), Current()); | 270 return PostTaskAndReply(std::move(task), std::move(reply), Current()); |
256 } | 271 } |
257 | 272 |
258 // static | 273 // static |
259 bool TaskQueue::ThreadMain(void* context) { | 274 bool TaskQueue::ThreadMain(void* context) { |
260 TaskQueue* me = static_cast<TaskQueue*>(context); | 275 TaskQueue* me = static_cast<TaskQueue*>(context); |
(...skipping 54 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
315 void TaskQueue::RunTimer(int fd, short flags, void* context) { // NOLINT | 330 void TaskQueue::RunTimer(int fd, short flags, void* context) { // NOLINT |
316 TimerEvent* timer = static_cast<TimerEvent*>(context); | 331 TimerEvent* timer = static_cast<TimerEvent*>(context); |
317 if (!timer->task->Run()) | 332 if (!timer->task->Run()) |
318 timer->task.release(); | 333 timer->task.release(); |
319 QueueContext* ctx = | 334 QueueContext* ctx = |
320 static_cast<QueueContext*>(pthread_getspecific(GetQueuePtrTls())); | 335 static_cast<QueueContext*>(pthread_getspecific(GetQueuePtrTls())); |
321 ctx->pending_timers_.remove(timer); | 336 ctx->pending_timers_.remove(timer); |
322 delete timer; | 337 delete timer; |
323 } | 338 } |
324 | 339 |
325 void TaskQueue::PrepareReplyTask(PostAndReplyTask* reply_task) { | 340 void TaskQueue::PrepareReplyTask( |
| 341 scoped_refptr<PostAndReplyTaskRef> reply_task) { |
326 RTC_DCHECK(reply_task); | 342 RTC_DCHECK(reply_task); |
327 CritScope lock(&pending_lock_); | 343 CritScope lock(&pending_lock_); |
| 344 if (destroying_) { |
| 345 return; |
| 346 } |
328 pending_replies_.push_back(reply_task); | 347 pending_replies_.push_back(reply_task); |
329 } | 348 } |
330 | 349 |
331 void TaskQueue::ReplyTaskDone(PostAndReplyTask* reply_task) { | 350 void TaskQueue::PostAndReplyTaskDone(PostAndReplyTask* reply_task) { |
332 CritScope lock(&pending_lock_); | 351 CritScope lock(&pending_lock_); |
333 pending_replies_.remove(reply_task); | 352 // Note: If the destructor is currently being run, and "pending_replies_" is |
| 353 // already swapped out, this may fail to find |reply_task|, which is ok. |
| 354 pending_replies_.remove_if( |
| 355 [reply_task](scoped_refptr<PostAndReplyTaskRef> task) { |
| 356 return task.get() == reply_task; |
| 357 }); |
334 } | 358 } |
335 | 359 |
336 } // namespace rtc | 360 } // namespace rtc |
OLD | NEW |