Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(630)

Unified Diff: webrtc/base/task_queue.h

Issue 1919733002: New task queueing primitive for async tasks: TaskQueue. (Closed) Base URL: https://chromium.googlesource.com/external/webrtc.git@master
Patch Set: Fix variable destruction order in PostALot Created 4 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: webrtc/base/task_queue.h
diff --git a/webrtc/base/task_queue.h b/webrtc/base/task_queue.h
new file mode 100644
index 0000000000000000000000000000000000000000..162088b9cbd7a31ae86bf201fe44ed9a339426de
--- /dev/null
+++ b/webrtc/base/task_queue.h
@@ -0,0 +1,178 @@
+/*
+ * Copyright 2016 The WebRTC Project Authors. All rights reserved.
+ *
+ * Use of this source code is governed by a BSD-style license
+ * that can be found in the LICENSE file in the root of the source
+ * tree. An additional intellectual property rights grant can be found
+ * in the file PATENTS. All contributing project authors may
+ * be found in the AUTHORS file in the root of the source tree.
+ */
+
+#ifndef WEBRTC_BASE_TASK_QUEUE_H_
+#define WEBRTC_BASE_TASK_QUEUE_H_
+
+#if defined(WEBRTC_POSIX) && !defined(WEBRTC_MAC)
+#define LIBEVENT_TASK_QUEUE
+#endif
+
+#include <list>
+#include <memory>
+
+#if defined(WEBRTC_MAC)
+#include <dispatch/dispatch.h>
+#endif
+
+#include "webrtc/base/constructormagic.h"
+#include "webrtc/base/criticalsection.h"
+
+#if !defined(WEBRTC_MAC)
+#include "webrtc/base/platform_thread.h"
+#endif
+
+#if defined(LIBEVENT_TASK_QUEUE)
+struct event_base;
+struct event;
+#endif
+
+namespace rtc {
+
+class QueuedTask {
+ public:
+ QueuedTask() {}
+ virtual ~QueuedTask() {}
+
+ // Main routine that will run when the task is executed on the desired queue.
+ // The task should return |true| to indicate that it should be deleted or
+ // |false| to indicate that the queue should consider ownership of the task
+ // having been transferred. Returning |false| can be useful if a task has
+ // re-posted itself to a different queue or is otherwise being re-used.
+ virtual bool Run() = 0;
+
+ private:
+ RTC_DISALLOW_COPY_AND_ASSIGN(QueuedTask);
+};
+
+// Simple implementation of QueuedTask for use with rtc::Bind and lambdas.
+template <class Closure>
+class ClosureTask : public QueuedTask {
+ public:
+ explicit ClosureTask(const Closure& closure) : closure_(closure) {}
+
+ private:
+ bool Run() override {
+ closure_();
+ return true;
+ }
+
+ Closure closure_;
+};
+
+// Implements a task queue that asynchronously executes tasks in a way that
+// guarantees that they're executed in FIFO order and tasks never overlap.
+// Tasks may always execute on the same worker thread and they may not.
+// To DCHECK that tasks are executing on a known task queue, use IsCurrent().
+class TaskQueue {
perkj_webrtc 2016/04/26 14:30:37 please add usage exampled in the description.
tommi 2016/04/28 12:04:01 Done.
+ public:
+ explicit TaskQueue(const char* queue_name);
+ // TODO(tommi): Implement move semantics?
+ ~TaskQueue();
+
+ static TaskQueue* Current();
+
+ // Used for DCHECKing the current queue.
+ static bool IsCurrent(const char* queue_name);
+ bool IsCurrent() const;
+
+ // TODO(tommi): For better debuggability, implement FROM_HERE.
+
+ // Ownership of the task is passed to PostTask.
+ void PostTask(std::unique_ptr<QueuedTask> task);
+ // TODO(tommi): Should we expose this variant publicly
+ // (or only the other one)?
+ void PostTaskAndReply(std::unique_ptr<QueuedTask> task,
perkj_webrtc 2016/04/26 14:30:37 I vote for only the simple one until it is needed.
tommi 2016/04/28 12:04:01 Done.
+ std::unique_ptr<QueuedTask> reply,
+ TaskQueue* reply_queue);
+ void PostTaskAndReply(std::unique_ptr<QueuedTask> task,
+ std::unique_ptr<QueuedTask> reply);
+
+ void PostDelayedTask(std::unique_ptr<QueuedTask> task, uint32_t milliseconds);
+
+ template <class Closure>
+ void PostTask(const Closure& closure) {
+ PostTask(std::unique_ptr<QueuedTask>(new ClosureTask<Closure>(closure)));
+ }
+
+ template <class Closure>
+ void PostDelayedTask(const Closure& closure, uint32_t milliseconds) {
+ PostDelayedTask(
+ std::unique_ptr<QueuedTask>(new ClosureTask<Closure>(closure)),
+ milliseconds);
+ }
+
+ template <class Closure1, class Closure2>
+ void PostTaskAndReply(const Closure1& task,
+ const Closure2& reply,
+ TaskQueue* reply_queue) {
+ PostTaskAndReply(
+ std::unique_ptr<QueuedTask>(new ClosureTask<Closure1>(task)),
+ std::unique_ptr<QueuedTask>(new ClosureTask<Closure2>(reply)),
+ reply_queue);
+ }
+
+ template <class Closure1, class Closure2>
+ void PostTaskAndReply(const Closure1& task, const Closure2& reply) {
+ PostTaskAndReply(
+ std::unique_ptr<QueuedTask>(new ClosureTask<Closure1>(task)),
+ std::unique_ptr<QueuedTask>(new ClosureTask<Closure2>(reply)));
+ }
+
+ private:
+#if defined(LIBEVENT_TASK_QUEUE)
+ static bool ThreadMain(void* context);
+ static void OnWakeup(int socket, short flags, void* context); // NOLINT
+ static void RunTask(int fd, short flags, void* context); // NOLINT
+ static void RunTimer(int fd, short flags, void* context); // NOLINT
+
+ class PostAndReplyTask;
+ class SetTimerTask;
+
+ void PrepareReplyTask(PostAndReplyTask* reply_task);
+ void ReplyTaskDone(PostAndReplyTask* reply_task);
+
+ struct QueueContext;
+
+ int wakeup_pipe_in_ = -1;
+ int wakeup_pipe_out_ = -1;
+ event_base* event_base_;
+ std::unique_ptr<event> wakeup_event_;
+ PlatformThread thread_;
+ rtc::CriticalSection pending_lock_;
+ std::list<std::unique_ptr<QueuedTask>> pending_ GUARDED_BY(pending_lock_);
+ std::list<PostAndReplyTask*> pending_replies_ GUARDED_BY(pending_lock_);
+#elif defined(WEBRTC_MAC)
+ struct QueueContext;
+ dispatch_queue_t queue_;
+ QueueContext* const context_;
+#elif defined(WEBRTC_WIN)
+ static bool ThreadMain(void* context);
+
+ class WorkerThread : public PlatformThread {
+ public:
+ WorkerThread(ThreadRunFunction func, void* obj, const char* thread_name)
+ : PlatformThread(func, obj, thread_name) {}
+
+ bool QueueAPC(PAPCFUNC apc_function, ULONG_PTR data) {
+ return PlatformThread::QueueAPC(apc_function, data);
+ }
+ };
+ WorkerThread thread_;
+#else
+#error not supported.
+#endif
+
+ RTC_DISALLOW_COPY_AND_ASSIGN(TaskQueue);
+};
+
+} // namespace rtc
+
+#endif // WEBRTC_BASE_TASK_QUEUE_H_

Powered by Google App Engine
This is Rietveld 408576698