Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(3)

Side by Side Diff: webrtc/rtc_base/asyncinvoker.cc

Issue 2885143005: Fixing race between ~AsyncInvoker and ~AsyncClosure, using ref-counting. (Closed)
Patch Set: Only clear current thread's message queue in destructor loop Created 3 years, 4 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 /* 1 /*
2 * Copyright 2014 The WebRTC Project Authors. All rights reserved. 2 * Copyright 2014 The WebRTC Project Authors. All rights reserved.
3 * 3 *
4 * Use of this source code is governed by a BSD-style license 4 * Use of this source code is governed by a BSD-style license
5 * that can be found in the LICENSE file in the root of the source 5 * that can be found in the LICENSE file in the root of the source
6 * tree. An additional intellectual property rights grant can be found 6 * tree. An additional intellectual property rights grant can be found
7 * in the file PATENTS. All contributing project authors may 7 * in the file PATENTS. All contributing project authors may
8 * be found in the AUTHORS file in the root of the source tree. 8 * be found in the AUTHORS file in the root of the source tree.
9 */ 9 */
10 10
11 #include "webrtc/rtc_base/asyncinvoker.h" 11 #include "webrtc/rtc_base/asyncinvoker.h"
12 12
13 #include "webrtc/rtc_base/atomicops.h" 13 #include "webrtc/rtc_base/atomicops.h"
14 #include "webrtc/rtc_base/checks.h" 14 #include "webrtc/rtc_base/checks.h"
15 #include "webrtc/rtc_base/logging.h" 15 #include "webrtc/rtc_base/logging.h"
16 16
17 namespace rtc { 17 namespace rtc {
18 18
19 AsyncInvoker::AsyncInvoker() : invocation_complete_(false, false) {} 19 AsyncInvoker::AsyncInvoker()
20 : invocation_complete_(new RefCountedObject<Event>(false, false)) {}
20 21
21 AsyncInvoker::~AsyncInvoker() { 22 AsyncInvoker::~AsyncInvoker() {
22 destroying_ = true; 23 AtomicOps::Increment(&destroying_);
23 // Messages for this need to be cleared *before* our destructor is complete. 24 // Messages for this need to be cleared *before* our destructor is complete.
24 MessageQueueManager::Clear(this); 25 MessageQueueManager::Clear(this);
25 // And we need to wait for any invocations that are still in progress on 26 // And we need to wait for any invocations that are still in progress on
26 // other threads. 27 // other threads.
27 while (AtomicOps::AcquireLoad(&pending_invocations_)) { 28 while (AtomicOps::AcquireLoad(&pending_invocations_)) {
28 // If the destructor was called while AsyncInvoke was being called by 29 // If the destructor was called while AsyncInvoke was being called by
29 // another thread, WITHIN an AsyncInvoked functor, it may do another 30 // another thread, WITHIN an AsyncInvoked functor, it may do another
30 // Thread::Post even after we called MessageQueueManager::Clear(this). So 31 // Thread::Post even after we called MessageQueueManager::Clear(this). So
31 // we need to keep calling Clear to discard these posts. 32 // we need to keep calling Clear to discard these posts.
32 MessageQueueManager::Clear(this); 33 Thread::Current()->Clear(this);
kwiberg-webrtc 2017/08/04 09:28:09 What does the change on this line do?
Taylor Brandstetter 2017/08/04 19:14:22 It only clears events on the current thread, which
33 invocation_complete_.Wait(Event::kForever); 34 invocation_complete_->Wait(Event::kForever);
34 } 35 }
35 } 36 }
36 37
37 void AsyncInvoker::OnMessage(Message* msg) { 38 void AsyncInvoker::OnMessage(Message* msg) {
38 // Get the AsyncClosure shared ptr from this message's data. 39 // Get the AsyncClosure shared ptr from this message's data.
39 ScopedMessageData<AsyncClosure>* data = 40 ScopedMessageData<AsyncClosure>* data =
40 static_cast<ScopedMessageData<AsyncClosure>*>(msg->pdata); 41 static_cast<ScopedMessageData<AsyncClosure>*>(msg->pdata);
41 // Execute the closure and trigger the return message if needed. 42 // Execute the closure and trigger the return message if needed.
42 data->inner_data().Execute(); 43 data->inner_data().Execute();
43 delete data; 44 delete data;
44 } 45 }
45 46
46 void AsyncInvoker::Flush(Thread* thread, uint32_t id /*= MQID_ANY*/) { 47 void AsyncInvoker::Flush(Thread* thread, uint32_t id /*= MQID_ANY*/) {
47 if (destroying_) return; 48 if (AtomicOps::AcquireLoad(&destroying_))
49 return;
kwiberg-webrtc 2017/08/04 09:28:09 A comment explaining why this is a no-op if we're
Taylor Brandstetter 2017/08/04 19:14:22 I don't know exactly. I went back to the CL where
48 50
49 // Run this on |thread| to reduce the number of context switches. 51 // Run this on |thread| to reduce the number of context switches.
50 if (Thread::Current() != thread) { 52 if (Thread::Current() != thread) {
51 thread->Invoke<void>(RTC_FROM_HERE, 53 thread->Invoke<void>(RTC_FROM_HERE,
52 Bind(&AsyncInvoker::Flush, this, thread, id)); 54 Bind(&AsyncInvoker::Flush, this, thread, id));
53 return; 55 return;
54 } 56 }
55 57
56 MessageList removed; 58 MessageList removed;
57 thread->Clear(this, id, &removed); 59 thread->Clear(this, id, &removed);
58 for (MessageList::iterator it = removed.begin(); it != removed.end(); ++it) { 60 for (MessageList::iterator it = removed.begin(); it != removed.end(); ++it) {
59 // This message was pending on this thread, so run it now. 61 // This message was pending on this thread, so run it now.
60 thread->Send(it->posted_from, it->phandler, it->message_id, it->pdata); 62 thread->Send(it->posted_from, it->phandler, it->message_id, it->pdata);
61 } 63 }
62 } 64 }
63 65
64 void AsyncInvoker::DoInvoke(const Location& posted_from, 66 void AsyncInvoker::DoInvoke(const Location& posted_from,
65 Thread* thread, 67 Thread* thread,
66 std::unique_ptr<AsyncClosure> closure, 68 std::unique_ptr<AsyncClosure> closure,
67 uint32_t id) { 69 uint32_t id) {
68 if (destroying_) { 70 if (AtomicOps::AcquireLoad(&destroying_)) {
71 // Note that this may be expected, if the application is AsyncInvoking
72 // tasks that AsyncInvoke other tasks. But otherwise it indicates a race
73 // between a thread destroying the AsyncInvoker and a thread still trying
74 // to use it.
kwiberg-webrtc 2017/08/04 09:28:09 Doesn't that mean that the warning message is flak
Taylor Brandstetter 2017/08/04 19:14:22 My thinking was "I'd rather have a flaky warning t
kwiberg-webrtc 2017/08/06 04:17:12 Hmm, OK.
69 LOG(LS_WARNING) << "Tried to invoke while destroying the invoker."; 75 LOG(LS_WARNING) << "Tried to invoke while destroying the invoker.";
70 return; 76 return;
71 } 77 }
72 AtomicOps::Increment(&pending_invocations_);
73 thread->Post(posted_from, this, id, 78 thread->Post(posted_from, this, id,
74 new ScopedMessageData<AsyncClosure>(std::move(closure))); 79 new ScopedMessageData<AsyncClosure>(std::move(closure)));
75 } 80 }
76 81
77 void AsyncInvoker::DoInvokeDelayed(const Location& posted_from, 82 void AsyncInvoker::DoInvokeDelayed(const Location& posted_from,
78 Thread* thread, 83 Thread* thread,
79 std::unique_ptr<AsyncClosure> closure, 84 std::unique_ptr<AsyncClosure> closure,
80 uint32_t delay_ms, 85 uint32_t delay_ms,
81 uint32_t id) { 86 uint32_t id) {
82 if (destroying_) { 87 if (AtomicOps::AcquireLoad(&destroying_)) {
88 // See above comment.
83 LOG(LS_WARNING) << "Tried to invoke while destroying the invoker."; 89 LOG(LS_WARNING) << "Tried to invoke while destroying the invoker.";
84 return; 90 return;
85 } 91 }
86 AtomicOps::Increment(&pending_invocations_);
87 thread->PostDelayed(posted_from, delay_ms, this, id, 92 thread->PostDelayed(posted_from, delay_ms, this, id,
88 new ScopedMessageData<AsyncClosure>(std::move(closure))); 93 new ScopedMessageData<AsyncClosure>(std::move(closure)));
89 } 94 }
90 95
91 GuardedAsyncInvoker::GuardedAsyncInvoker() : thread_(Thread::Current()) { 96 GuardedAsyncInvoker::GuardedAsyncInvoker() : thread_(Thread::Current()) {
92 thread_->SignalQueueDestroyed.connect(this, 97 thread_->SignalQueueDestroyed.connect(this,
93 &GuardedAsyncInvoker::ThreadDestroyed); 98 &GuardedAsyncInvoker::ThreadDestroyed);
94 } 99 }
95 100
96 GuardedAsyncInvoker::~GuardedAsyncInvoker() { 101 GuardedAsyncInvoker::~GuardedAsyncInvoker() {
97 } 102 }
98 103
99 bool GuardedAsyncInvoker::Flush(uint32_t id) { 104 bool GuardedAsyncInvoker::Flush(uint32_t id) {
100 rtc::CritScope cs(&crit_); 105 CritScope cs(&crit_);
101 if (thread_ == nullptr) 106 if (thread_ == nullptr)
102 return false; 107 return false;
103 invoker_.Flush(thread_, id); 108 invoker_.Flush(thread_, id);
104 return true; 109 return true;
105 } 110 }
106 111
107 void GuardedAsyncInvoker::ThreadDestroyed() { 112 void GuardedAsyncInvoker::ThreadDestroyed() {
108 rtc::CritScope cs(&crit_); 113 CritScope cs(&crit_);
109 // We should never get more than one notification about the thread dying. 114 // We should never get more than one notification about the thread dying.
110 RTC_DCHECK(thread_ != nullptr); 115 RTC_DCHECK(thread_ != nullptr);
111 thread_ = nullptr; 116 thread_ = nullptr;
112 } 117 }
113 118
119 AsyncClosure::AsyncClosure(AsyncInvoker* invoker)
120 : invoker_(invoker), invocation_complete_(invoker_->invocation_complete_) {
121 AtomicOps::Increment(&invoker_->pending_invocations_);
122 }
123
114 AsyncClosure::~AsyncClosure() { 124 AsyncClosure::~AsyncClosure() {
115 AtomicOps::Decrement(&invoker_->pending_invocations_); 125 AtomicOps::Decrement(&invoker_->pending_invocations_);
116 invoker_->invocation_complete_.Set(); 126 // After |pending_invocations_| is decremented, we may need to signal
127 // |invocation_complete_| in case the AsyncInvoker is being destroyed and
128 // waiting for pending tasks to complete.
129 //
130 // It's also possible that the destructor finishes before "Set()" is called,
131 // which is safe because the event is reference counted (and in a thread-safe
132 // way).
133 invocation_complete_->Set();
117 } 134 }
118 135
119 } // namespace rtc 136 } // namespace rtc
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698