Index: webrtc/rtc_base/asyncinvoker.cc |
diff --git a/webrtc/rtc_base/asyncinvoker.cc b/webrtc/rtc_base/asyncinvoker.cc |
index 94abfd5d087a59c66cb121270107b9be973282cd..89e4e778ea833b2a7986e656f340c40fee300966 100644 |
--- a/webrtc/rtc_base/asyncinvoker.cc |
+++ b/webrtc/rtc_base/asyncinvoker.cc |
@@ -10,27 +10,30 @@ |
#include "webrtc/rtc_base/asyncinvoker.h" |
-#include "webrtc/rtc_base/atomicops.h" |
#include "webrtc/rtc_base/checks.h" |
#include "webrtc/rtc_base/logging.h" |
namespace rtc { |
-AsyncInvoker::AsyncInvoker() : invocation_complete_(false, false) {} |
+AsyncInvoker::AsyncInvoker() |
+ : pending_invocations_(0), |
+ invocation_complete_(new RefCountedObject<Event>(false, false)), |
+ destroying_(false) {} |
AsyncInvoker::~AsyncInvoker() { |
- destroying_ = true; |
+ destroying_.store(true, std::memory_order_relaxed); |
// Messages for this need to be cleared *before* our destructor is complete. |
MessageQueueManager::Clear(this); |
// And we need to wait for any invocations that are still in progress on |
- // other threads. |
- while (AtomicOps::AcquireLoad(&pending_invocations_)) { |
+ // other threads. Using memory_order_acquire for synchronization with |
+ // AsyncClosure destructors. |
+ while (pending_invocations_.load(std::memory_order_acquire) > 0) { |
// If the destructor was called while AsyncInvoke was being called by |
// another thread, WITHIN an AsyncInvoked functor, it may do another |
// Thread::Post even after we called MessageQueueManager::Clear(this). So |
// we need to keep calling Clear to discard these posts. |
- MessageQueueManager::Clear(this); |
- invocation_complete_.Wait(Event::kForever); |
+ Thread::Current()->Clear(this); |
+ invocation_complete_->Wait(Event::kForever); |
} |
} |
@@ -44,7 +47,10 @@ void AsyncInvoker::OnMessage(Message* msg) { |
} |
void AsyncInvoker::Flush(Thread* thread, uint32_t id /*= MQID_ANY*/) { |
- if (destroying_) return; |
+ // If the destructor is waiting for invocations to finish, don't start |
+ // running even more tasks. |
+ if (destroying_.load(std::memory_order_relaxed)) |
+ return; |
// Run this on |thread| to reduce the number of context switches. |
if (Thread::Current() != thread) { |
@@ -65,11 +71,14 @@ void AsyncInvoker::DoInvoke(const Location& posted_from, |
Thread* thread, |
std::unique_ptr<AsyncClosure> closure, |
uint32_t id) { |
- if (destroying_) { |
+ if (destroying_.load(std::memory_order_relaxed)) { |
+ // Note that this may be expected, if the application is AsyncInvoking |
+ // tasks that AsyncInvoke other tasks. But otherwise it indicates a race |
+ // between a thread destroying the AsyncInvoker and a thread still trying |
+ // to use it. |
LOG(LS_WARNING) << "Tried to invoke while destroying the invoker."; |
return; |
} |
- AtomicOps::Increment(&pending_invocations_); |
thread->Post(posted_from, this, id, |
new ScopedMessageData<AsyncClosure>(std::move(closure))); |
} |
@@ -79,11 +88,11 @@ void AsyncInvoker::DoInvokeDelayed(const Location& posted_from, |
std::unique_ptr<AsyncClosure> closure, |
uint32_t delay_ms, |
uint32_t id) { |
- if (destroying_) { |
+ if (destroying_.load(std::memory_order_relaxed)) { |
+ // See above comment. |
LOG(LS_WARNING) << "Tried to invoke while destroying the invoker."; |
return; |
} |
- AtomicOps::Increment(&pending_invocations_); |
thread->PostDelayed(posted_from, delay_ms, this, id, |
new ScopedMessageData<AsyncClosure>(std::move(closure))); |
} |
@@ -97,7 +106,7 @@ GuardedAsyncInvoker::~GuardedAsyncInvoker() { |
} |
bool GuardedAsyncInvoker::Flush(uint32_t id) { |
- rtc::CritScope cs(&crit_); |
+ CritScope cs(&crit_); |
if (thread_ == nullptr) |
return false; |
invoker_.Flush(thread_, id); |
@@ -105,15 +114,30 @@ bool GuardedAsyncInvoker::Flush(uint32_t id) { |
} |
void GuardedAsyncInvoker::ThreadDestroyed() { |
- rtc::CritScope cs(&crit_); |
+ CritScope cs(&crit_); |
// We should never get more than one notification about the thread dying. |
RTC_DCHECK(thread_ != nullptr); |
thread_ = nullptr; |
} |
+AsyncClosure::AsyncClosure(AsyncInvoker* invoker) |
+ : invoker_(invoker), invocation_complete_(invoker_->invocation_complete_) { |
+ invoker_->pending_invocations_.fetch_add(1, std::memory_order_relaxed); |
+} |
+ |
AsyncClosure::~AsyncClosure() { |
- AtomicOps::Decrement(&invoker_->pending_invocations_); |
- invoker_->invocation_complete_.Set(); |
+ // Using memory_order_release for synchronization with the AsyncInvoker |
+ // destructor. |
+ invoker_->pending_invocations_.fetch_sub(1, std::memory_order_release); |
+ |
+ // After |pending_invocations_| is decremented, we may need to signal |
+ // |invocation_complete_| in case the AsyncInvoker is being destroyed and |
+ // waiting for pending tasks to complete. |
+ // |
+ // It's also possible that the destructor finishes before "Set()" is called, |
+ // which is safe because the event is reference counted (and in a thread-safe |
+ // way). |
+ invocation_complete_->Set(); |
} |
} // namespace rtc |