OLD | NEW |
1 /* | 1 /* |
2 * Copyright 2016 The WebRTC Project Authors. All rights reserved. | 2 * Copyright 2016 The WebRTC Project Authors. All rights reserved. |
3 * | 3 * |
4 * Use of this source code is governed by a BSD-style license | 4 * Use of this source code is governed by a BSD-style license |
5 * that can be found in the LICENSE file in the root of the source | 5 * that can be found in the LICENSE file in the root of the source |
6 * tree. An additional intellectual property rights grant can be found | 6 * tree. An additional intellectual property rights grant can be found |
7 * in the file PATENTS. All contributing project authors may | 7 * in the file PATENTS. All contributing project authors may |
8 * be found in the AUTHORS file in the root of the source tree. | 8 * be found in the AUTHORS file in the root of the source tree. |
9 */ | 9 */ |
10 | 10 |
11 // This file contains the implementation of TaskQueue for Mac and iOS. | 11 // This file contains the implementation of TaskQueue for Mac and iOS. |
12 // The implementation uses Grand Central Dispatch queues (GCD) to | 12 // The implementation uses Grand Central Dispatch queues (GCD) to |
13 // do the actual task queuing. | 13 // do the actual task queuing. |
14 | 14 |
15 #include "webrtc/rtc_base/task_queue.h" | 15 #include "webrtc/rtc_base/task_queue.h" |
16 | 16 |
17 #include <string.h> | 17 #include <string.h> |
18 | 18 |
| 19 #include <dispatch/dispatch.h> |
| 20 |
19 #include "webrtc/rtc_base/checks.h" | 21 #include "webrtc/rtc_base/checks.h" |
20 #include "webrtc/rtc_base/logging.h" | 22 #include "webrtc/rtc_base/logging.h" |
| 23 #include "webrtc/rtc_base/refcount.h" |
| 24 #include "webrtc/rtc_base/refcountedobject.h" |
21 #include "webrtc/rtc_base/task_queue_posix.h" | 25 #include "webrtc/rtc_base/task_queue_posix.h" |
22 | 26 |
23 namespace rtc { | 27 namespace rtc { |
24 namespace { | 28 namespace { |
25 | 29 |
26 using Priority = TaskQueue::Priority; | 30 using Priority = TaskQueue::Priority; |
27 | 31 |
28 int TaskQueuePriorityToGCD(Priority priority) { | 32 int TaskQueuePriorityToGCD(Priority priority) { |
29 switch (priority) { | 33 switch (priority) { |
30 case Priority::NORMAL: | 34 case Priority::NORMAL: |
31 return DISPATCH_QUEUE_PRIORITY_DEFAULT; | 35 return DISPATCH_QUEUE_PRIORITY_DEFAULT; |
32 case Priority::HIGH: | 36 case Priority::HIGH: |
33 return DISPATCH_QUEUE_PRIORITY_HIGH; | 37 return DISPATCH_QUEUE_PRIORITY_HIGH; |
34 case Priority::LOW: | 38 case Priority::LOW: |
35 return DISPATCH_QUEUE_PRIORITY_LOW; | 39 return DISPATCH_QUEUE_PRIORITY_LOW; |
36 } | 40 } |
37 } | 41 } |
38 } | 42 } |
39 | 43 |
40 using internal::GetQueuePtrTls; | 44 using internal::GetQueuePtrTls; |
41 using internal::AutoSetCurrentQueuePtr; | 45 using internal::AutoSetCurrentQueuePtr; |
42 | 46 |
43 struct TaskQueue::QueueContext { | 47 class TaskQueue::Impl : public RefCountInterface { |
44 explicit QueueContext(TaskQueue* q) : queue(q), is_active(true) {} | 48 public: |
| 49 Impl(const char* queue_name, TaskQueue* task_queue, Priority priority); |
| 50 ~Impl() override; |
45 | 51 |
46 static void SetNotActive(void* context) { | 52 static TaskQueue* Current(); |
47 QueueContext* qc = static_cast<QueueContext*>(context); | |
48 qc->is_active = false; | |
49 } | |
50 | 53 |
51 static void DeleteContext(void* context) { | 54 // Used for DCHECKing the current queue. |
52 QueueContext* qc = static_cast<QueueContext*>(context); | 55 bool IsCurrent() const; |
53 delete qc; | |
54 } | |
55 | 56 |
56 TaskQueue* const queue; | 57 void PostTask(std::unique_ptr<QueuedTask> task); |
57 bool is_active; | 58 void PostTaskAndReply(std::unique_ptr<QueuedTask> task, |
| 59 std::unique_ptr<QueuedTask> reply, |
| 60 TaskQueue::Impl* reply_queue); |
| 61 |
| 62 void PostDelayedTask(std::unique_ptr<QueuedTask> task, uint32_t milliseconds); |
| 63 |
| 64 private: |
| 65 struct QueueContext { |
| 66 explicit QueueContext(TaskQueue* q) : queue(q), is_active(true) {} |
| 67 |
| 68 static void SetNotActive(void* context) { |
| 69 QueueContext* qc = static_cast<QueueContext*>(context); |
| 70 qc->is_active = false; |
| 71 } |
| 72 |
| 73 static void DeleteContext(void* context) { |
| 74 QueueContext* qc = static_cast<QueueContext*>(context); |
| 75 delete qc; |
| 76 } |
| 77 |
| 78 TaskQueue* const queue; |
| 79 bool is_active; |
| 80 }; |
| 81 |
| 82 struct TaskContext { |
| 83 TaskContext(QueueContext* queue_ctx, std::unique_ptr<QueuedTask> task) |
| 84 : queue_ctx(queue_ctx), task(std::move(task)) {} |
| 85 virtual ~TaskContext() {} |
| 86 |
| 87 static void RunTask(void* context) { |
| 88 std::unique_ptr<TaskContext> tc(static_cast<TaskContext*>(context)); |
| 89 if (tc->queue_ctx->is_active) { |
| 90 AutoSetCurrentQueuePtr set_current(tc->queue_ctx->queue); |
| 91 if (!tc->task->Run()) |
| 92 tc->task.release(); |
| 93 } |
| 94 } |
| 95 |
| 96 QueueContext* const queue_ctx; |
| 97 std::unique_ptr<QueuedTask> task; |
| 98 }; |
| 99 |
| 100 // Special case context for holding two tasks, a |first_task| + the task |
| 101 // that's owned by the parent struct, TaskContext, that then becomes the |
| 102 // second (i.e. 'reply') task. |
| 103 struct PostTaskAndReplyContext : public TaskContext { |
| 104 explicit PostTaskAndReplyContext(QueueContext* first_queue_ctx, |
| 105 std::unique_ptr<QueuedTask> first_task, |
| 106 QueueContext* second_queue_ctx, |
| 107 std::unique_ptr<QueuedTask> second_task) |
| 108 : TaskContext(second_queue_ctx, std::move(second_task)), |
| 109 first_queue_ctx(first_queue_ctx), |
| 110 first_task(std::move(first_task)), |
| 111 reply_queue_(second_queue_ctx->queue->impl_->queue_) { |
| 112 // Retain the reply queue for as long as this object lives. |
| 113 // If we don't, we may have memory leaks and/or failures. |
| 114 dispatch_retain(reply_queue_); |
| 115 } |
| 116 ~PostTaskAndReplyContext() override { dispatch_release(reply_queue_); } |
| 117 |
| 118 static void RunTask(void* context) { |
| 119 auto* rc = static_cast<PostTaskAndReplyContext*>(context); |
| 120 if (rc->first_queue_ctx->is_active) { |
| 121 AutoSetCurrentQueuePtr set_current(rc->first_queue_ctx->queue); |
| 122 if (!rc->first_task->Run()) |
| 123 rc->first_task.release(); |
| 124 } |
| 125 // Post the reply task. This hands the work over to the parent struct. |
| 126 // This task will eventually delete |this|. |
| 127 dispatch_async_f(rc->reply_queue_, rc, &TaskContext::RunTask); |
| 128 } |
| 129 |
| 130 QueueContext* const first_queue_ctx; |
| 131 std::unique_ptr<QueuedTask> first_task; |
| 132 dispatch_queue_t reply_queue_; |
| 133 }; |
| 134 |
| 135 dispatch_queue_t queue_; |
| 136 QueueContext* const context_; |
58 }; | 137 }; |
59 | 138 |
60 struct TaskQueue::TaskContext { | 139 TaskQueue::Impl::Impl(const char* queue_name, |
61 TaskContext(QueueContext* queue_ctx, std::unique_ptr<QueuedTask> task) | 140 TaskQueue* task_queue, |
62 : queue_ctx(queue_ctx), task(std::move(task)) {} | 141 Priority priority) |
63 virtual ~TaskContext() {} | |
64 | |
65 static void RunTask(void* context) { | |
66 std::unique_ptr<TaskContext> tc(static_cast<TaskContext*>(context)); | |
67 if (tc->queue_ctx->is_active) { | |
68 AutoSetCurrentQueuePtr set_current(tc->queue_ctx->queue); | |
69 if (!tc->task->Run()) | |
70 tc->task.release(); | |
71 } | |
72 } | |
73 | |
74 QueueContext* const queue_ctx; | |
75 std::unique_ptr<QueuedTask> task; | |
76 }; | |
77 | |
78 // Special case context for holding two tasks, a |first_task| + the task | |
79 // that's owned by the parent struct, TaskContext, that then becomes the | |
80 // second (i.e. 'reply') task. | |
81 struct TaskQueue::PostTaskAndReplyContext : public TaskQueue::TaskContext { | |
82 explicit PostTaskAndReplyContext(QueueContext* first_queue_ctx, | |
83 std::unique_ptr<QueuedTask> first_task, | |
84 QueueContext* second_queue_ctx, | |
85 std::unique_ptr<QueuedTask> second_task) | |
86 : TaskContext(second_queue_ctx, std::move(second_task)), | |
87 first_queue_ctx(first_queue_ctx), | |
88 first_task(std::move(first_task)), | |
89 reply_queue_(second_queue_ctx->queue->queue_) { | |
90 // Retain the reply queue for as long as this object lives. | |
91 // If we don't, we may have memory leaks and/or failures. | |
92 dispatch_retain(reply_queue_); | |
93 } | |
94 ~PostTaskAndReplyContext() override { dispatch_release(reply_queue_); } | |
95 | |
96 static void RunTask(void* context) { | |
97 auto* rc = static_cast<PostTaskAndReplyContext*>(context); | |
98 if (rc->first_queue_ctx->is_active) { | |
99 AutoSetCurrentQueuePtr set_current(rc->first_queue_ctx->queue); | |
100 if (!rc->first_task->Run()) | |
101 rc->first_task.release(); | |
102 } | |
103 // Post the reply task. This hands the work over to the parent struct. | |
104 // This task will eventually delete |this|. | |
105 dispatch_async_f(rc->reply_queue_, rc, &TaskContext::RunTask); | |
106 } | |
107 | |
108 QueueContext* const first_queue_ctx; | |
109 std::unique_ptr<QueuedTask> first_task; | |
110 dispatch_queue_t reply_queue_; | |
111 }; | |
112 | |
113 TaskQueue::TaskQueue(const char* queue_name, Priority priority /*= NORMAL*/) | |
114 : queue_(dispatch_queue_create(queue_name, DISPATCH_QUEUE_SERIAL)), | 142 : queue_(dispatch_queue_create(queue_name, DISPATCH_QUEUE_SERIAL)), |
115 context_(new QueueContext(this)) { | 143 context_(new QueueContext(task_queue)) { |
116 RTC_DCHECK(queue_name); | 144 RTC_DCHECK(queue_name); |
117 RTC_CHECK(queue_); | 145 RTC_CHECK(queue_); |
118 dispatch_set_context(queue_, context_); | 146 dispatch_set_context(queue_, context_); |
119 // Assign a finalizer that will delete the context when the last reference | 147 // Assign a finalizer that will delete the context when the last reference |
120 // to the queue is released. This may run after the TaskQueue object has | 148 // to the queue is released. This may run after the TaskQueue object has |
121 // been deleted. | 149 // been deleted. |
122 dispatch_set_finalizer_f(queue_, &QueueContext::DeleteContext); | 150 dispatch_set_finalizer_f(queue_, &QueueContext::DeleteContext); |
123 | 151 |
124 dispatch_set_target_queue( | 152 dispatch_set_target_queue( |
125 queue_, dispatch_get_global_queue(TaskQueuePriorityToGCD(priority), 0)); | 153 queue_, dispatch_get_global_queue(TaskQueuePriorityToGCD(priority), 0)); |
126 } | 154 } |
127 | 155 |
128 TaskQueue::~TaskQueue() { | 156 TaskQueue::Impl::~Impl() { |
129 RTC_DCHECK(!IsCurrent()); | 157 RTC_DCHECK(!IsCurrent()); |
130 // Implementation/behavioral note: | 158 // Implementation/behavioral note: |
131 // Dispatch queues are reference counted via calls to dispatch_retain and | 159 // Dispatch queues are reference counted via calls to dispatch_retain and |
132 // dispatch_release. Pending blocks submitted to a queue also hold a | 160 // dispatch_release. Pending blocks submitted to a queue also hold a |
133 // reference to the queue until they have finished. Once all references to a | 161 // reference to the queue until they have finished. Once all references to a |
134 // queue have been released, the queue will be deallocated by the system. | 162 // queue have been released, the queue will be deallocated by the system. |
135 // This is why we check the context before running tasks. | 163 // This is why we check the context before running tasks. |
136 | 164 |
137 // Use dispatch_sync to set the context to null to guarantee that there's not | 165 // Use dispatch_sync to set the context to null to guarantee that there's not |
138 // a race between checking the context and using it from a task. | 166 // a race between checking the context and using it from a task. |
139 dispatch_sync_f(queue_, context_, &QueueContext::SetNotActive); | 167 dispatch_sync_f(queue_, context_, &QueueContext::SetNotActive); |
140 dispatch_release(queue_); | 168 dispatch_release(queue_); |
141 } | 169 } |
142 | 170 |
143 // static | 171 // static |
144 TaskQueue* TaskQueue::Current() { | 172 TaskQueue* TaskQueue::Impl::Current() { |
145 return static_cast<TaskQueue*>(pthread_getspecific(GetQueuePtrTls())); | 173 return static_cast<TaskQueue*>(pthread_getspecific(GetQueuePtrTls())); |
146 } | 174 } |
147 | 175 |
148 bool TaskQueue::IsCurrent() const { | 176 bool TaskQueue::Impl::IsCurrent() const { |
149 RTC_DCHECK(queue_); | 177 RTC_DCHECK(queue_); |
150 return this == Current(); | 178 const TaskQueue* current = Current(); |
| 179 return current && this == current->impl_.get(); |
151 } | 180 } |
152 | 181 |
153 void TaskQueue::PostTask(std::unique_ptr<QueuedTask> task) { | 182 void TaskQueue::Impl::PostTask(std::unique_ptr<QueuedTask> task) { |
154 auto* context = new TaskContext(context_, std::move(task)); | 183 auto* context = new TaskContext(context_, std::move(task)); |
155 dispatch_async_f(queue_, context, &TaskContext::RunTask); | 184 dispatch_async_f(queue_, context, &TaskContext::RunTask); |
156 } | 185 } |
157 | 186 |
158 void TaskQueue::PostDelayedTask(std::unique_ptr<QueuedTask> task, | 187 void TaskQueue::Impl::PostDelayedTask(std::unique_ptr<QueuedTask> task, |
159 uint32_t milliseconds) { | 188 uint32_t milliseconds) { |
160 auto* context = new TaskContext(context_, std::move(task)); | 189 auto* context = new TaskContext(context_, std::move(task)); |
161 dispatch_after_f( | 190 dispatch_after_f( |
162 dispatch_time(DISPATCH_TIME_NOW, milliseconds * NSEC_PER_MSEC), queue_, | 191 dispatch_time(DISPATCH_TIME_NOW, milliseconds * NSEC_PER_MSEC), queue_, |
163 context, &TaskContext::RunTask); | 192 context, &TaskContext::RunTask); |
164 } | 193 } |
165 | 194 |
166 void TaskQueue::PostTaskAndReply(std::unique_ptr<QueuedTask> task, | 195 void TaskQueue::Impl::PostTaskAndReply(std::unique_ptr<QueuedTask> task, |
167 std::unique_ptr<QueuedTask> reply, | 196 std::unique_ptr<QueuedTask> reply, |
168 TaskQueue* reply_queue) { | 197 TaskQueue::Impl* reply_queue) { |
169 auto* context = new PostTaskAndReplyContext( | 198 auto* context = new PostTaskAndReplyContext( |
170 context_, std::move(task), reply_queue->context_, std::move(reply)); | 199 context_, std::move(task), reply_queue->context_, std::move(reply)); |
171 dispatch_async_f(queue_, context, &PostTaskAndReplyContext::RunTask); | 200 dispatch_async_f(queue_, context, &PostTaskAndReplyContext::RunTask); |
172 } | 201 } |
173 | 202 |
| 203 // Boilerplate for the PIMPL pattern. |
| 204 TaskQueue::TaskQueue(const char* queue_name, Priority priority) |
| 205 : impl_(new RefCountedObject<TaskQueue::Impl>(queue_name, this, priority)) { |
| 206 } |
| 207 |
| 208 TaskQueue::~TaskQueue() {} |
| 209 |
| 210 // static |
| 211 TaskQueue* TaskQueue::Current() { |
| 212 return TaskQueue::Impl::Current(); |
| 213 } |
| 214 |
| 215 // Used for DCHECKing the current queue. |
| 216 bool TaskQueue::IsCurrent() const { |
| 217 return impl_->IsCurrent(); |
| 218 } |
| 219 |
| 220 void TaskQueue::PostTask(std::unique_ptr<QueuedTask> task) { |
| 221 return TaskQueue::impl_->PostTask(std::move(task)); |
| 222 } |
| 223 |
| 224 void TaskQueue::PostTaskAndReply(std::unique_ptr<QueuedTask> task, |
| 225 std::unique_ptr<QueuedTask> reply, |
| 226 TaskQueue* reply_queue) { |
| 227 return TaskQueue::impl_->PostTaskAndReply(std::move(task), std::move(reply), |
| 228 reply_queue->impl_.get()); |
| 229 } |
| 230 |
174 void TaskQueue::PostTaskAndReply(std::unique_ptr<QueuedTask> task, | 231 void TaskQueue::PostTaskAndReply(std::unique_ptr<QueuedTask> task, |
175 std::unique_ptr<QueuedTask> reply) { | 232 std::unique_ptr<QueuedTask> reply) { |
176 return PostTaskAndReply(std::move(task), std::move(reply), Current()); | 233 return TaskQueue::impl_->PostTaskAndReply(std::move(task), std::move(reply), |
| 234 impl_.get()); |
| 235 } |
| 236 |
| 237 void TaskQueue::PostDelayedTask(std::unique_ptr<QueuedTask> task, |
| 238 uint32_t milliseconds) { |
| 239 return TaskQueue::impl_->PostDelayedTask(std::move(task), milliseconds); |
177 } | 240 } |
178 | 241 |
179 } // namespace rtc | 242 } // namespace rtc |
OLD | NEW |