Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(158)

Side by Side Diff: webrtc/base/task_queue_libevent.cc

Issue 2708353003: Add support for priorities to TaskQueue. (Closed)
Patch Set: Address comments Created 3 years, 9 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « webrtc/base/task_queue_gcd.cc ('k') | webrtc/base/task_queue_win.cc » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 /* 1 /*
2 * Copyright 2016 The WebRTC Project Authors. All rights reserved. 2 * Copyright 2016 The WebRTC Project Authors. All rights reserved.
3 * 3 *
4 * Use of this source code is governed by a BSD-style license 4 * Use of this source code is governed by a BSD-style license
5 * that can be found in the LICENSE file in the root of the source 5 * that can be found in the LICENSE file in the root of the source
6 * tree. An additional intellectual property rights grant can be found 6 * tree. An additional intellectual property rights grant can be found
7 * in the file PATENTS. All contributing project authors may 7 * in the file PATENTS. All contributing project authors may
8 * be found in the AUTHORS file in the root of the source tree. 8 * be found in the AUTHORS file in the root of the source tree.
9 */ 9 */
10 10
(...skipping 12 matching lines...) Expand all
23 23
24 namespace rtc { 24 namespace rtc {
25 using internal::GetQueuePtrTls; 25 using internal::GetQueuePtrTls;
26 using internal::AutoSetCurrentQueuePtr; 26 using internal::AutoSetCurrentQueuePtr;
27 27
28 namespace { 28 namespace {
29 static const char kQuit = 1; 29 static const char kQuit = 1;
30 static const char kRunTask = 2; 30 static const char kRunTask = 2;
31 static const char kRunReplyTask = 3; 31 static const char kRunReplyTask = 3;
32 32
33 using Priority = TaskQueue::Priority;
34
33 // This ignores the SIGPIPE signal on the calling thread. 35 // This ignores the SIGPIPE signal on the calling thread.
34 // This signal can be fired when trying to write() to a pipe that's being 36 // This signal can be fired when trying to write() to a pipe that's being
35 // closed or while closing a pipe that's being written to. 37 // closed or while closing a pipe that's being written to.
36 // We can run into that situation (e.g. reply tasks that don't get a chance to 38 // We can run into that situation (e.g. reply tasks that don't get a chance to
37 // run because the task queue is being deleted) so we ignore this signal and 39 // run because the task queue is being deleted) so we ignore this signal and
38 // continue as normal. 40 // continue as normal.
39 // As a side note for this implementation, it would be great if we could safely 41 // As a side note for this implementation, it would be great if we could safely
40 // restore the sigmask, but unfortunately the operation of restoring it, can 42 // restore the sigmask, but unfortunately the operation of restoring it, can
41 // itself actually cause SIGPIPE to be signaled :-| (e.g. on MacOS) 43 // itself actually cause SIGPIPE to be signaled :-| (e.g. on MacOS)
42 // The SIGPIPE signal by default causes the process to be terminated, so we 44 // The SIGPIPE signal by default causes the process to be terminated, so we
(...skipping 34 matching lines...) Expand 10 before | Expand all | Expand 10 after
77 short events, 79 short events,
78 void (*callback)(int, short, void*), 80 void (*callback)(int, short, void*),
79 void* arg) { 81 void* arg) {
80 #if defined(_EVENT2_EVENT_H_) 82 #if defined(_EVENT2_EVENT_H_)
81 RTC_CHECK_EQ(0, event_assign(ev, base, fd, events, callback, arg)); 83 RTC_CHECK_EQ(0, event_assign(ev, base, fd, events, callback, arg));
82 #else 84 #else
83 event_set(ev, fd, events, callback, arg); 85 event_set(ev, fd, events, callback, arg);
84 RTC_CHECK_EQ(0, event_base_set(base, ev)); 86 RTC_CHECK_EQ(0, event_base_set(base, ev));
85 #endif 87 #endif
86 } 88 }
89
90 ThreadPriority TaskQueuePriorityToThreadPriority(Priority priority) {
91 switch (priority) {
92 case Priority::HIGH:
93 return kRealtimePriority;
94 case Priority::LOW:
95 return kLowPriority;
96 case Priority::NORMAL:
97 return kNormalPriority;
98 default:
99 RTC_NOTREACHED();
100 break;
101 }
102 return kNormalPriority;
103 }
87 } // namespace 104 } // namespace
88 105
89 struct TaskQueue::QueueContext { 106 struct TaskQueue::QueueContext {
90 explicit QueueContext(TaskQueue* q) : queue(q), is_active(true) {} 107 explicit QueueContext(TaskQueue* q) : queue(q), is_active(true) {}
91 TaskQueue* queue; 108 TaskQueue* queue;
92 bool is_active; 109 bool is_active;
93 // Holds a list of events pending timers for cleanup when the loop exits. 110 // Holds a list of events pending timers for cleanup when the loop exits.
94 std::list<TimerEvent*> pending_timers_; 111 std::list<TimerEvent*> pending_timers_;
95 }; 112 };
96 113
(...skipping 97 matching lines...) Expand 10 before | Expand all | Expand 10 after
194 std::move(task_), 211 std::move(task_),
195 post_time > milliseconds_ ? 0 : milliseconds_ - post_time); 212 post_time > milliseconds_ ? 0 : milliseconds_ - post_time);
196 return true; 213 return true;
197 } 214 }
198 215
199 std::unique_ptr<QueuedTask> task_; 216 std::unique_ptr<QueuedTask> task_;
200 const uint32_t milliseconds_; 217 const uint32_t milliseconds_;
201 const uint32_t posted_; 218 const uint32_t posted_;
202 }; 219 };
203 220
204 TaskQueue::TaskQueue(const char* queue_name) 221 TaskQueue::TaskQueue(const char* queue_name, Priority priority /*= NORMAL*/)
205 : event_base_(event_base_new()), 222 : event_base_(event_base_new()),
206 wakeup_event_(new event()), 223 wakeup_event_(new event()),
207 thread_(&TaskQueue::ThreadMain, this, queue_name) { 224 thread_(&TaskQueue::ThreadMain,
225 this,
226 queue_name,
227 TaskQueuePriorityToThreadPriority(priority)) {
208 RTC_DCHECK(queue_name); 228 RTC_DCHECK(queue_name);
209 int fds[2]; 229 int fds[2];
210 RTC_CHECK(pipe(fds) == 0); 230 RTC_CHECK(pipe(fds) == 0);
211 SetNonBlocking(fds[0]); 231 SetNonBlocking(fds[0]);
212 SetNonBlocking(fds[1]); 232 SetNonBlocking(fds[1]);
213 wakeup_pipe_out_ = fds[0]; 233 wakeup_pipe_out_ = fds[0];
214 wakeup_pipe_in_ = fds[1]; 234 wakeup_pipe_in_ = fds[1];
215 235
216 EventAssign(wakeup_event_.get(), event_base_, wakeup_pipe_out_, 236 EventAssign(wakeup_event_.get(), event_base_, wakeup_pipe_out_,
217 EV_READ | EV_PERSIST, OnWakeup, this); 237 EV_READ | EV_PERSIST, OnWakeup, this);
(...skipping 182 matching lines...) Expand 10 before | Expand all | Expand 10 after
400 delete timer; 420 delete timer;
401 } 421 }
402 422
403 void TaskQueue::PrepareReplyTask(scoped_refptr<ReplyTaskOwnerRef> reply_task) { 423 void TaskQueue::PrepareReplyTask(scoped_refptr<ReplyTaskOwnerRef> reply_task) {
404 RTC_DCHECK(reply_task); 424 RTC_DCHECK(reply_task);
405 CritScope lock(&pending_lock_); 425 CritScope lock(&pending_lock_);
406 pending_replies_.push_back(std::move(reply_task)); 426 pending_replies_.push_back(std::move(reply_task));
407 } 427 }
408 428
409 } // namespace rtc 429 } // namespace rtc
OLDNEW
« no previous file with comments | « webrtc/base/task_queue_gcd.cc ('k') | webrtc/base/task_queue_win.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698