Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(194)

Side by Side Diff: webrtc/base/asyncinvoker.cc

Issue 2876273002: Fixing race between ~AsyncInvoker and ~AsyncClosure. (Closed)
Patch Set: Created 3 years, 7 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « webrtc/base/asyncinvoker.h ('k') | webrtc/base/asyncinvoker-inl.h » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 /* 1 /*
2 * Copyright 2014 The WebRTC Project Authors. All rights reserved. 2 * Copyright 2014 The WebRTC Project Authors. All rights reserved.
3 * 3 *
4 * Use of this source code is governed by a BSD-style license 4 * Use of this source code is governed by a BSD-style license
5 * that can be found in the LICENSE file in the root of the source 5 * that can be found in the LICENSE file in the root of the source
6 * tree. An additional intellectual property rights grant can be found 6 * tree. An additional intellectual property rights grant can be found
7 * in the file PATENTS. All contributing project authors may 7 * in the file PATENTS. All contributing project authors may
8 * be found in the AUTHORS file in the root of the source tree. 8 * be found in the AUTHORS file in the root of the source tree.
9 */ 9 */
10 10
11 #include "webrtc/base/asyncinvoker.h" 11 #include "webrtc/base/asyncinvoker.h"
12 12
13 #include "webrtc/base/atomicops.h" 13 #include "webrtc/base/atomicops.h"
14 #include "webrtc/base/checks.h" 14 #include "webrtc/base/checks.h"
15 #include "webrtc/base/logging.h" 15 #include "webrtc/base/logging.h"
16 16
17 namespace rtc { 17 namespace rtc {
18 18
19 AsyncInvoker::AsyncInvoker() : invocation_complete_(false, false) {} 19 AsyncInvoker::AsyncInvoker() {}
20 20
21 AsyncInvoker::~AsyncInvoker() { 21 AsyncInvoker::~AsyncInvoker() {
22 destroying_ = true; 22 destroying_ = true;
23 // Messages for this need to be cleared *before* our destructor is complete. 23 // Messages for this need to be cleared *before* our destructor is complete.
24 MessageQueueManager::Clear(this); 24 MessageQueueManager::Clear(this);
25 // And we need to wait for any invocations that are still in progress on 25 // And we need to wait for any invocations that are still in progress on
26 // other threads. 26 // other threads.
27 while (AtomicOps::AcquireLoad(&pending_invocations_)) { 27 while (AtomicOps::AcquireLoad(&pending_invocations_)) {
28 // If the destructor was called while AsyncInvoke was being called by 28 // Release time slice to allow other async-invoked tasks to make progress.
29 // another thread, WITHIN an AsyncInvoked functor, it may do another 29 rtc::Thread::Current()->SleepMs(0);
nisse-webrtc 2017/05/19 07:16:59 I think it's better with some small but non-zero v
30 // Thread::Post even after we called MessageQueueManager::Clear(this). So 30 // One of the async-invoked tasks in progress could have posted an
31 // we need to keep calling Clear to discard these posts. 31 // additional task to the current thread. So we need to keep calling Clear
32 // to discard these posts.
32 MessageQueueManager::Clear(this); 33 MessageQueueManager::Clear(this);
nisse-webrtc 2017/05/19 07:16:59 According to the comment, we only need to clear th
33 invocation_complete_.Wait(Event::kForever);
34 } 34 }
35 } 35 }
36 36
37 void AsyncInvoker::OnMessage(Message* msg) { 37 void AsyncInvoker::OnMessage(Message* msg) {
38 // Get the AsyncClosure shared ptr from this message's data. 38 // Get the AsyncClosure shared ptr from this message's data.
39 ScopedMessageData<AsyncClosure>* data = 39 ScopedMessageData<AsyncClosure>* data =
40 static_cast<ScopedMessageData<AsyncClosure>*>(msg->pdata); 40 static_cast<ScopedMessageData<AsyncClosure>*>(msg->pdata);
41 // Execute the closure and trigger the return message if needed. 41 // Execute the closure and trigger the return message if needed.
42 data->inner_data().Execute(); 42 data->inner_data().Execute();
43 delete data; 43 delete data;
(...skipping 14 matching lines...) Expand all
58 for (MessageList::iterator it = removed.begin(); it != removed.end(); ++it) { 58 for (MessageList::iterator it = removed.begin(); it != removed.end(); ++it) {
59 // This message was pending on this thread, so run it now. 59 // This message was pending on this thread, so run it now.
60 thread->Send(it->posted_from, it->phandler, it->message_id, it->pdata); 60 thread->Send(it->posted_from, it->phandler, it->message_id, it->pdata);
61 } 61 }
62 } 62 }
63 63
64 void AsyncInvoker::DoInvoke(const Location& posted_from, 64 void AsyncInvoker::DoInvoke(const Location& posted_from,
65 Thread* thread, 65 Thread* thread,
66 std::unique_ptr<AsyncClosure> closure, 66 std::unique_ptr<AsyncClosure> closure,
67 uint32_t id) { 67 uint32_t id) {
68 if (destroying_) { 68 if (destroying_) {
nisse-webrtc 2017/05/19 07:16:59 I'm still a bit confused by this flag. My current
69 LOG(LS_WARNING) << "Tried to invoke while destroying the invoker."; 69 LOG(LS_WARNING) << "Tried to invoke while destroying the invoker.";
70 return; 70 return;
71 } 71 }
72 AtomicOps::Increment(&pending_invocations_);
73 thread->Post(posted_from, this, id, 72 thread->Post(posted_from, this, id,
74 new ScopedMessageData<AsyncClosure>(std::move(closure))); 73 new ScopedMessageData<AsyncClosure>(std::move(closure)));
75 } 74 }
76 75
77 void AsyncInvoker::DoInvokeDelayed(const Location& posted_from, 76 void AsyncInvoker::DoInvokeDelayed(const Location& posted_from,
78 Thread* thread, 77 Thread* thread,
79 std::unique_ptr<AsyncClosure> closure, 78 std::unique_ptr<AsyncClosure> closure,
80 uint32_t delay_ms, 79 uint32_t delay_ms,
81 uint32_t id) { 80 uint32_t id) {
82 if (destroying_) { 81 if (destroying_) {
83 LOG(LS_WARNING) << "Tried to invoke while destroying the invoker."; 82 LOG(LS_WARNING) << "Tried to invoke while destroying the invoker.";
84 return; 83 return;
85 } 84 }
86 AtomicOps::Increment(&pending_invocations_);
87 thread->PostDelayed(posted_from, delay_ms, this, id, 85 thread->PostDelayed(posted_from, delay_ms, this, id,
88 new ScopedMessageData<AsyncClosure>(std::move(closure))); 86 new ScopedMessageData<AsyncClosure>(std::move(closure)));
89 } 87 }
90 88
91 GuardedAsyncInvoker::GuardedAsyncInvoker() : thread_(Thread::Current()) { 89 GuardedAsyncInvoker::GuardedAsyncInvoker() : thread_(Thread::Current()) {
92 thread_->SignalQueueDestroyed.connect(this, 90 thread_->SignalQueueDestroyed.connect(this,
93 &GuardedAsyncInvoker::ThreadDestroyed); 91 &GuardedAsyncInvoker::ThreadDestroyed);
94 } 92 }
95 93
96 GuardedAsyncInvoker::~GuardedAsyncInvoker() { 94 GuardedAsyncInvoker::~GuardedAsyncInvoker() {
97 } 95 }
98 96
99 bool GuardedAsyncInvoker::Flush(uint32_t id) { 97 bool GuardedAsyncInvoker::Flush(uint32_t id) {
100 rtc::CritScope cs(&crit_); 98 rtc::CritScope cs(&crit_);
101 if (thread_ == nullptr) 99 if (thread_ == nullptr)
102 return false; 100 return false;
103 invoker_.Flush(thread_, id); 101 invoker_.Flush(thread_, id);
104 return true; 102 return true;
105 } 103 }
106 104
107 void GuardedAsyncInvoker::ThreadDestroyed() { 105 void GuardedAsyncInvoker::ThreadDestroyed() {
108 rtc::CritScope cs(&crit_); 106 rtc::CritScope cs(&crit_);
109 // We should never get more than one notification about the thread dying. 107 // We should never get more than one notification about the thread dying.
110 RTC_DCHECK(thread_ != nullptr); 108 RTC_DCHECK(thread_ != nullptr);
111 thread_ = nullptr; 109 thread_ = nullptr;
112 } 110 }
113 111
112 AsyncClosure::AsyncClosure(AsyncInvoker* invoker) : invoker_(invoker) {
113 AtomicOps::Increment(&invoker_->pending_invocations_);
114 }
115
114 AsyncClosure::~AsyncClosure() { 116 AsyncClosure::~AsyncClosure() {
115 AtomicOps::Decrement(&invoker_->pending_invocations_); 117 AtomicOps::Decrement(&invoker_->pending_invocations_);
116 invoker_->invocation_complete_.Set();
117 } 118 }
118 119
119 NotifyingAsyncClosureBase::NotifyingAsyncClosureBase( 120 NotifyingAsyncClosureBase::NotifyingAsyncClosureBase(
120 AsyncInvoker* invoker, 121 AsyncInvoker* invoker,
121 const Location& callback_posted_from, 122 const Location& callback_posted_from,
122 Thread* calling_thread) 123 Thread* calling_thread)
123 : AsyncClosure(invoker), 124 : AsyncClosure(invoker),
124 callback_posted_from_(callback_posted_from), 125 callback_posted_from_(callback_posted_from),
125 calling_thread_(calling_thread) { 126 calling_thread_(calling_thread) {
126 calling_thread->SignalQueueDestroyed.connect( 127 calling_thread->SignalQueueDestroyed.connect(
(...skipping 18 matching lines...) Expand all
145 void NotifyingAsyncClosureBase::CancelCallback() { 146 void NotifyingAsyncClosureBase::CancelCallback() {
146 // If the callback is triggering when this is called, block the 147 // If the callback is triggering when this is called, block the
147 // destructor of the dying object here by waiting until the callback 148 // destructor of the dying object here by waiting until the callback
148 // is done triggering. 149 // is done triggering.
149 CritScope cs(&crit_); 150 CritScope cs(&crit_);
150 // calling_thread_ == nullptr means do not trigger the callback. 151 // calling_thread_ == nullptr means do not trigger the callback.
151 calling_thread_ = nullptr; 152 calling_thread_ = nullptr;
152 } 153 }
153 154
154 } // namespace rtc 155 } // namespace rtc
OLDNEW
« no previous file with comments | « webrtc/base/asyncinvoker.h ('k') | webrtc/base/asyncinvoker-inl.h » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698