| Index: webrtc/rtc_base/asyncinvoker.cc
|
| diff --git a/webrtc/rtc_base/asyncinvoker.cc b/webrtc/rtc_base/asyncinvoker.cc
|
| index 94abfd5d087a59c66cb121270107b9be973282cd..89e4e778ea833b2a7986e656f340c40fee300966 100644
|
| --- a/webrtc/rtc_base/asyncinvoker.cc
|
| +++ b/webrtc/rtc_base/asyncinvoker.cc
|
| @@ -10,27 +10,30 @@
|
|
|
| #include "webrtc/rtc_base/asyncinvoker.h"
|
|
|
| -#include "webrtc/rtc_base/atomicops.h"
|
| #include "webrtc/rtc_base/checks.h"
|
| #include "webrtc/rtc_base/logging.h"
|
|
|
| namespace rtc {
|
|
|
| -AsyncInvoker::AsyncInvoker() : invocation_complete_(false, false) {}
|
| +AsyncInvoker::AsyncInvoker()
|
| + : pending_invocations_(0),
|
| + invocation_complete_(new RefCountedObject<Event>(false, false)),
|
| + destroying_(false) {}
|
|
|
| AsyncInvoker::~AsyncInvoker() {
|
| - destroying_ = true;
|
| + destroying_.store(true, std::memory_order_relaxed);
|
| // Messages for this need to be cleared *before* our destructor is complete.
|
| MessageQueueManager::Clear(this);
|
| // And we need to wait for any invocations that are still in progress on
|
| - // other threads.
|
| - while (AtomicOps::AcquireLoad(&pending_invocations_)) {
|
| + // other threads. Using memory_order_acquire for synchronization with
|
| + // AsyncClosure destructors.
|
| + while (pending_invocations_.load(std::memory_order_acquire) > 0) {
|
| // If the destructor was called while AsyncInvoke was being called by
|
| // another thread, WITHIN an AsyncInvoked functor, it may do another
|
| // Thread::Post even after we called MessageQueueManager::Clear(this). So
|
| // we need to keep calling Clear to discard these posts.
|
| - MessageQueueManager::Clear(this);
|
| - invocation_complete_.Wait(Event::kForever);
|
| + Thread::Current()->Clear(this);
|
| + invocation_complete_->Wait(Event::kForever);
|
| }
|
| }
|
|
|
| @@ -44,7 +47,10 @@ void AsyncInvoker::OnMessage(Message* msg) {
|
| }
|
|
|
| void AsyncInvoker::Flush(Thread* thread, uint32_t id /*= MQID_ANY*/) {
|
| - if (destroying_) return;
|
| + // If the destructor is waiting for invocations to finish, don't start
|
| + // running even more tasks.
|
| + if (destroying_.load(std::memory_order_relaxed))
|
| + return;
|
|
|
| // Run this on |thread| to reduce the number of context switches.
|
| if (Thread::Current() != thread) {
|
| @@ -65,11 +71,14 @@ void AsyncInvoker::DoInvoke(const Location& posted_from,
|
| Thread* thread,
|
| std::unique_ptr<AsyncClosure> closure,
|
| uint32_t id) {
|
| - if (destroying_) {
|
| + if (destroying_.load(std::memory_order_relaxed)) {
|
| + // Note that this may be expected, if the application is AsyncInvoking
|
| + // tasks that AsyncInvoke other tasks. But otherwise it indicates a race
|
| + // between a thread destroying the AsyncInvoker and a thread still trying
|
| + // to use it.
|
| LOG(LS_WARNING) << "Tried to invoke while destroying the invoker.";
|
| return;
|
| }
|
| - AtomicOps::Increment(&pending_invocations_);
|
| thread->Post(posted_from, this, id,
|
| new ScopedMessageData<AsyncClosure>(std::move(closure)));
|
| }
|
| @@ -79,11 +88,11 @@ void AsyncInvoker::DoInvokeDelayed(const Location& posted_from,
|
| std::unique_ptr<AsyncClosure> closure,
|
| uint32_t delay_ms,
|
| uint32_t id) {
|
| - if (destroying_) {
|
| + if (destroying_.load(std::memory_order_relaxed)) {
|
| + // See above comment.
|
| LOG(LS_WARNING) << "Tried to invoke while destroying the invoker.";
|
| return;
|
| }
|
| - AtomicOps::Increment(&pending_invocations_);
|
| thread->PostDelayed(posted_from, delay_ms, this, id,
|
| new ScopedMessageData<AsyncClosure>(std::move(closure)));
|
| }
|
| @@ -97,7 +106,7 @@ GuardedAsyncInvoker::~GuardedAsyncInvoker() {
|
| }
|
|
|
| bool GuardedAsyncInvoker::Flush(uint32_t id) {
|
| - rtc::CritScope cs(&crit_);
|
| + CritScope cs(&crit_);
|
| if (thread_ == nullptr)
|
| return false;
|
| invoker_.Flush(thread_, id);
|
| @@ -105,15 +114,30 @@ bool GuardedAsyncInvoker::Flush(uint32_t id) {
|
| }
|
|
|
| void GuardedAsyncInvoker::ThreadDestroyed() {
|
| - rtc::CritScope cs(&crit_);
|
| + CritScope cs(&crit_);
|
| // We should never get more than one notification about the thread dying.
|
| RTC_DCHECK(thread_ != nullptr);
|
| thread_ = nullptr;
|
| }
|
|
|
| +AsyncClosure::AsyncClosure(AsyncInvoker* invoker)
|
| + : invoker_(invoker), invocation_complete_(invoker_->invocation_complete_) {
|
| + invoker_->pending_invocations_.fetch_add(1, std::memory_order_relaxed);
|
| +}
|
| +
|
| AsyncClosure::~AsyncClosure() {
|
| - AtomicOps::Decrement(&invoker_->pending_invocations_);
|
| - invoker_->invocation_complete_.Set();
|
| + // Using memory_order_release for synchronization with the AsyncInvoker
|
| + // destructor.
|
| + invoker_->pending_invocations_.fetch_sub(1, std::memory_order_release);
|
| +
|
| + // After |pending_invocations_| is decremented, we may need to signal
|
| + // |invocation_complete_| in case the AsyncInvoker is being destroyed and
|
| + // waiting for pending tasks to complete.
|
| + //
|
| + // It's also possible that the destructor finishes before "Set()" is called,
|
| + // which is safe because the event is reference counted (and in a thread-safe
|
| + // way).
|
| + invocation_complete_->Set();
|
| }
|
|
|
| } // namespace rtc
|
|
|