| OLD | NEW | 
|---|
| 1 /* | 1 /* | 
| 2  *  Copyright 2014 The WebRTC Project Authors. All rights reserved. | 2  *  Copyright 2014 The WebRTC Project Authors. All rights reserved. | 
| 3  * | 3  * | 
| 4  *  Use of this source code is governed by a BSD-style license | 4  *  Use of this source code is governed by a BSD-style license | 
| 5  *  that can be found in the LICENSE file in the root of the source | 5  *  that can be found in the LICENSE file in the root of the source | 
| 6  *  tree. An additional intellectual property rights grant can be found | 6  *  tree. An additional intellectual property rights grant can be found | 
| 7  *  in the file PATENTS.  All contributing project authors may | 7  *  in the file PATENTS.  All contributing project authors may | 
| 8  *  be found in the AUTHORS file in the root of the source tree. | 8  *  be found in the AUTHORS file in the root of the source tree. | 
| 9  */ | 9  */ | 
| 10 | 10 | 
| 11 #include "webrtc/rtc_base/asyncinvoker.h" | 11 #include "webrtc/rtc_base/asyncinvoker.h" | 
| 12 | 12 | 
| 13 #include "webrtc/rtc_base/atomicops.h" |  | 
| 14 #include "webrtc/rtc_base/checks.h" | 13 #include "webrtc/rtc_base/checks.h" | 
| 15 #include "webrtc/rtc_base/logging.h" | 14 #include "webrtc/rtc_base/logging.h" | 
| 16 | 15 | 
| 17 namespace rtc { | 16 namespace rtc { | 
| 18 | 17 | 
| 19 AsyncInvoker::AsyncInvoker() : invocation_complete_(false, false) {} | 18 AsyncInvoker::AsyncInvoker() | 
|  | 19     : pending_invocations_(0), | 
|  | 20       invocation_complete_(new RefCountedObject<Event>(false, false)), | 
|  | 21       destroying_(false) {} | 
| 20 | 22 | 
| 21 AsyncInvoker::~AsyncInvoker() { | 23 AsyncInvoker::~AsyncInvoker() { | 
| 22   destroying_ = true; | 24   destroying_.store(true, std::memory_order_relaxed); | 
| 23   // Messages for this need to be cleared *before* our destructor is complete. | 25   // Messages for this need to be cleared *before* our destructor is complete. | 
| 24   MessageQueueManager::Clear(this); | 26   MessageQueueManager::Clear(this); | 
| 25   // And we need to wait for any invocations that are still in progress on | 27   // And we need to wait for any invocations that are still in progress on | 
| 26   // other threads. | 28   // other threads. Using memory_order_acquire for synchronization with | 
| 27   while (AtomicOps::AcquireLoad(&pending_invocations_)) { | 29   // AsyncClosure destructors. | 
|  | 30   while (pending_invocations_.load(std::memory_order_acquire) > 0) { | 
| 28     // If the destructor was called while AsyncInvoke was being called by | 31     // If the destructor was called while AsyncInvoke was being called by | 
| 29     // another thread, WITHIN an AsyncInvoked functor, it may do another | 32     // another thread, WITHIN an AsyncInvoked functor, it may do another | 
| 30     // Thread::Post even after we called MessageQueueManager::Clear(this). So | 33     // Thread::Post even after we called MessageQueueManager::Clear(this). So | 
| 31     // we need to keep calling Clear to discard these posts. | 34     // we need to keep calling Clear to discard these posts. | 
| 32     MessageQueueManager::Clear(this); | 35     Thread::Current()->Clear(this); | 
| 33     invocation_complete_.Wait(Event::kForever); | 36     invocation_complete_->Wait(Event::kForever); | 
| 34   } | 37   } | 
| 35 } | 38 } | 
| 36 | 39 | 
| 37 void AsyncInvoker::OnMessage(Message* msg) { | 40 void AsyncInvoker::OnMessage(Message* msg) { | 
| 38   // Get the AsyncClosure shared ptr from this message's data. | 41   // Get the AsyncClosure shared ptr from this message's data. | 
| 39   ScopedMessageData<AsyncClosure>* data = | 42   ScopedMessageData<AsyncClosure>* data = | 
| 40       static_cast<ScopedMessageData<AsyncClosure>*>(msg->pdata); | 43       static_cast<ScopedMessageData<AsyncClosure>*>(msg->pdata); | 
| 41   // Execute the closure and trigger the return message if needed. | 44   // Execute the closure and trigger the return message if needed. | 
| 42   data->inner_data().Execute(); | 45   data->inner_data().Execute(); | 
| 43   delete data; | 46   delete data; | 
| 44 } | 47 } | 
| 45 | 48 | 
| 46 void AsyncInvoker::Flush(Thread* thread, uint32_t id /*= MQID_ANY*/) { | 49 void AsyncInvoker::Flush(Thread* thread, uint32_t id /*= MQID_ANY*/) { | 
| 47   if (destroying_) return; | 50   // If the destructor is waiting for invocations to finish, don't start | 
|  | 51   // running even more tasks. | 
|  | 52   if (destroying_.load(std::memory_order_relaxed)) | 
|  | 53     return; | 
| 48 | 54 | 
| 49   // Run this on |thread| to reduce the number of context switches. | 55   // Run this on |thread| to reduce the number of context switches. | 
| 50   if (Thread::Current() != thread) { | 56   if (Thread::Current() != thread) { | 
| 51     thread->Invoke<void>(RTC_FROM_HERE, | 57     thread->Invoke<void>(RTC_FROM_HERE, | 
| 52                          Bind(&AsyncInvoker::Flush, this, thread, id)); | 58                          Bind(&AsyncInvoker::Flush, this, thread, id)); | 
| 53     return; | 59     return; | 
| 54   } | 60   } | 
| 55 | 61 | 
| 56   MessageList removed; | 62   MessageList removed; | 
| 57   thread->Clear(this, id, &removed); | 63   thread->Clear(this, id, &removed); | 
| 58   for (MessageList::iterator it = removed.begin(); it != removed.end(); ++it) { | 64   for (MessageList::iterator it = removed.begin(); it != removed.end(); ++it) { | 
| 59     // This message was pending on this thread, so run it now. | 65     // This message was pending on this thread, so run it now. | 
| 60     thread->Send(it->posted_from, it->phandler, it->message_id, it->pdata); | 66     thread->Send(it->posted_from, it->phandler, it->message_id, it->pdata); | 
| 61   } | 67   } | 
| 62 } | 68 } | 
| 63 | 69 | 
| 64 void AsyncInvoker::DoInvoke(const Location& posted_from, | 70 void AsyncInvoker::DoInvoke(const Location& posted_from, | 
| 65                             Thread* thread, | 71                             Thread* thread, | 
| 66                             std::unique_ptr<AsyncClosure> closure, | 72                             std::unique_ptr<AsyncClosure> closure, | 
| 67                             uint32_t id) { | 73                             uint32_t id) { | 
| 68   if (destroying_) { | 74   if (destroying_.load(std::memory_order_relaxed)) { | 
|  | 75     // Note that this may be expected, if the application is AsyncInvoking | 
|  | 76     // tasks that AsyncInvoke other tasks. But otherwise it indicates a race | 
|  | 77     // between a thread destroying the AsyncInvoker and a thread still trying | 
|  | 78     // to use it. | 
| 69     LOG(LS_WARNING) << "Tried to invoke while destroying the invoker."; | 79     LOG(LS_WARNING) << "Tried to invoke while destroying the invoker."; | 
| 70     return; | 80     return; | 
| 71   } | 81   } | 
| 72   AtomicOps::Increment(&pending_invocations_); |  | 
| 73   thread->Post(posted_from, this, id, | 82   thread->Post(posted_from, this, id, | 
| 74                new ScopedMessageData<AsyncClosure>(std::move(closure))); | 83                new ScopedMessageData<AsyncClosure>(std::move(closure))); | 
| 75 } | 84 } | 
| 76 | 85 | 
| 77 void AsyncInvoker::DoInvokeDelayed(const Location& posted_from, | 86 void AsyncInvoker::DoInvokeDelayed(const Location& posted_from, | 
| 78                                    Thread* thread, | 87                                    Thread* thread, | 
| 79                                    std::unique_ptr<AsyncClosure> closure, | 88                                    std::unique_ptr<AsyncClosure> closure, | 
| 80                                    uint32_t delay_ms, | 89                                    uint32_t delay_ms, | 
| 81                                    uint32_t id) { | 90                                    uint32_t id) { | 
| 82   if (destroying_) { | 91   if (destroying_.load(std::memory_order_relaxed)) { | 
|  | 92     // See above comment. | 
| 83     LOG(LS_WARNING) << "Tried to invoke while destroying the invoker."; | 93     LOG(LS_WARNING) << "Tried to invoke while destroying the invoker."; | 
| 84     return; | 94     return; | 
| 85   } | 95   } | 
| 86   AtomicOps::Increment(&pending_invocations_); |  | 
| 87   thread->PostDelayed(posted_from, delay_ms, this, id, | 96   thread->PostDelayed(posted_from, delay_ms, this, id, | 
| 88                       new ScopedMessageData<AsyncClosure>(std::move(closure))); | 97                       new ScopedMessageData<AsyncClosure>(std::move(closure))); | 
| 89 } | 98 } | 
| 90 | 99 | 
| 91 GuardedAsyncInvoker::GuardedAsyncInvoker() : thread_(Thread::Current()) { | 100 GuardedAsyncInvoker::GuardedAsyncInvoker() : thread_(Thread::Current()) { | 
| 92   thread_->SignalQueueDestroyed.connect(this, | 101   thread_->SignalQueueDestroyed.connect(this, | 
| 93                                         &GuardedAsyncInvoker::ThreadDestroyed); | 102                                         &GuardedAsyncInvoker::ThreadDestroyed); | 
| 94 } | 103 } | 
| 95 | 104 | 
| 96 GuardedAsyncInvoker::~GuardedAsyncInvoker() { | 105 GuardedAsyncInvoker::~GuardedAsyncInvoker() { | 
| 97 } | 106 } | 
| 98 | 107 | 
| 99 bool GuardedAsyncInvoker::Flush(uint32_t id) { | 108 bool GuardedAsyncInvoker::Flush(uint32_t id) { | 
| 100   rtc::CritScope cs(&crit_); | 109   CritScope cs(&crit_); | 
| 101   if (thread_ == nullptr) | 110   if (thread_ == nullptr) | 
| 102     return false; | 111     return false; | 
| 103   invoker_.Flush(thread_, id); | 112   invoker_.Flush(thread_, id); | 
| 104   return true; | 113   return true; | 
| 105 } | 114 } | 
| 106 | 115 | 
| 107 void GuardedAsyncInvoker::ThreadDestroyed() { | 116 void GuardedAsyncInvoker::ThreadDestroyed() { | 
| 108   rtc::CritScope cs(&crit_); | 117   CritScope cs(&crit_); | 
| 109   // We should never get more than one notification about the thread dying. | 118   // We should never get more than one notification about the thread dying. | 
| 110   RTC_DCHECK(thread_ != nullptr); | 119   RTC_DCHECK(thread_ != nullptr); | 
| 111   thread_ = nullptr; | 120   thread_ = nullptr; | 
| 112 } | 121 } | 
| 113 | 122 | 
|  | 123 AsyncClosure::AsyncClosure(AsyncInvoker* invoker) | 
|  | 124     : invoker_(invoker), invocation_complete_(invoker_->invocation_complete_) { | 
|  | 125   invoker_->pending_invocations_.fetch_add(1, std::memory_order_relaxed); | 
|  | 126 } | 
|  | 127 | 
| 114 AsyncClosure::~AsyncClosure() { | 128 AsyncClosure::~AsyncClosure() { | 
| 115   AtomicOps::Decrement(&invoker_->pending_invocations_); | 129   // Using memory_order_release for synchronization with the AsyncInvoker | 
| 116   invoker_->invocation_complete_.Set(); | 130   // destructor. | 
|  | 131   invoker_->pending_invocations_.fetch_sub(1, std::memory_order_release); | 
|  | 132 | 
|  | 133   // After |pending_invocations_| is decremented, we may need to signal | 
|  | 134   // |invocation_complete_| in case the AsyncInvoker is being destroyed and | 
|  | 135   // waiting for pending tasks to complete. | 
|  | 136   // | 
|  | 137   // It's also possible that the destructor finishes before "Set()" is called, | 
|  | 138   // which is safe because the event is reference counted (and in a thread-safe | 
|  | 139   // way). | 
|  | 140   invocation_complete_->Set(); | 
| 117 } | 141 } | 
| 118 | 142 | 
| 119 }  // namespace rtc | 143 }  // namespace rtc | 
| OLD | NEW | 
|---|