Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(137)

Side by Side Diff: webrtc/base/messagequeue.cc

Issue 2024813004: Improving the fake clock and using it to fix a flaky STUN timeout test. (Closed) Base URL: https://chromium.googlesource.com/external/webrtc.git@master
Patch Set: Fixing another TSan warning. WebRtcSession wasn't completely shut down. Created 4 years, 6 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 /* 1 /*
2 * Copyright 2004 The WebRTC Project Authors. All rights reserved. 2 * Copyright 2004 The WebRTC Project Authors. All rights reserved.
3 * 3 *
4 * Use of this source code is governed by a BSD-style license 4 * Use of this source code is governed by a BSD-style license
5 * that can be found in the LICENSE file in the root of the source 5 * that can be found in the LICENSE file in the root of the source
6 * tree. An additional intellectual property rights grant can be found 6 * tree. An additional intellectual property rights grant can be found
7 * in the file PATENTS. All contributing project authors may 7 * in the file PATENTS. All contributing project authors may
8 * be found in the AUTHORS file in the root of the source tree. 8 * be found in the AUTHORS file in the root of the source tree.
9 */ 9 */
10 #include <algorithm> 10 #include <algorithm>
11 11
12 #include "webrtc/base/atomicops.h"
12 #include "webrtc/base/checks.h" 13 #include "webrtc/base/checks.h"
13 #include "webrtc/base/common.h" 14 #include "webrtc/base/common.h"
14 #include "webrtc/base/logging.h" 15 #include "webrtc/base/logging.h"
15 #include "webrtc/base/messagequeue.h" 16 #include "webrtc/base/messagequeue.h"
17 #include "webrtc/base/thread.h"
16 #include "webrtc/base/trace_event.h" 18 #include "webrtc/base/trace_event.h"
17 19
18 namespace {
19
20 enum { MSG_WAKE_MESSAGE_QUEUE = 1 };
21 }
22
23 namespace rtc { 20 namespace rtc {
24 21
25 const int kMaxMsgLatency = 150; // 150 ms 22 const int kMaxMsgLatency = 150; // 150 ms
26 23
27 //------------------------------------------------------------------ 24 //------------------------------------------------------------------
28 // MessageQueueManager 25 // MessageQueueManager
29 26
30 MessageQueueManager* MessageQueueManager::instance_ = NULL; 27 MessageQueueManager* MessageQueueManager::instance_ = NULL;
31 28
32 MessageQueueManager* MessageQueueManager::Instance() { 29 MessageQueueManager* MessageQueueManager::Instance() {
33 // Note: This is not thread safe, but it is first called before threads are 30 // Note: This is not thread safe, but it is first called before threads are
34 // spawned. 31 // spawned.
35 if (!instance_) 32 if (!instance_)
36 instance_ = new MessageQueueManager; 33 instance_ = new MessageQueueManager;
37 return instance_; 34 return instance_;
38 } 35 }
39 36
40 bool MessageQueueManager::IsInitialized() { 37 bool MessageQueueManager::IsInitialized() {
41 return instance_ != NULL; 38 return instance_ != NULL;
42 } 39 }
43 40
44 MessageQueueManager::MessageQueueManager() { 41 MessageQueueManager::MessageQueueManager() {}
45 }
46 42
47 MessageQueueManager::~MessageQueueManager() { 43 MessageQueueManager::~MessageQueueManager() {
48 } 44 }
49 45
50 void MessageQueueManager::Add(MessageQueue *message_queue) { 46 void MessageQueueManager::Add(MessageQueue *message_queue) {
51 return Instance()->AddInternal(message_queue); 47 return Instance()->AddInternal(message_queue);
52 } 48 }
53 void MessageQueueManager::AddInternal(MessageQueue *message_queue) { 49 void MessageQueueManager::AddInternal(MessageQueue *message_queue) {
54 // MessageQueueManager methods should be non-reentrant, so we 50 // MessageQueueManager methods should be non-reentrant, so we
55 // ASSERT that is the case. If any of these ASSERT, please 51 // ASSERT that is the case. If any of these ASSERT, please
(...skipping 45 matching lines...) Expand 10 before | Expand all | Expand 10 after
101 void MessageQueueManager::ClearInternal(MessageHandler *handler) { 97 void MessageQueueManager::ClearInternal(MessageHandler *handler) {
102 #if CS_DEBUG_CHECKS // CurrentThreadIsOwner returns true by default. 98 #if CS_DEBUG_CHECKS // CurrentThreadIsOwner returns true by default.
103 ASSERT(!crit_.CurrentThreadIsOwner()); // See note above. 99 ASSERT(!crit_.CurrentThreadIsOwner()); // See note above.
104 #endif 100 #endif
105 CritScope cs(&crit_); 101 CritScope cs(&crit_);
106 std::vector<MessageQueue *>::iterator iter; 102 std::vector<MessageQueue *>::iterator iter;
107 for (iter = message_queues_.begin(); iter != message_queues_.end(); iter++) 103 for (iter = message_queues_.begin(); iter != message_queues_.end(); iter++)
108 (*iter)->Clear(handler); 104 (*iter)->Clear(handler);
109 } 105 }
110 106
111 void MessageQueueManager::WakeAllMessageQueues() { 107 void MessageQueueManager::ProcessAllMessageQueues() {
112 if (!instance_) { 108 if (!instance_) {
113 return; 109 return;
114 } 110 }
115 return Instance()->WakeAllMessageQueuesInternal(); 111 return Instance()->ProcessAllMessageQueuesInternal();
116 } 112 }
117 113
118 void MessageQueueManager::WakeAllMessageQueuesInternal() { 114 void MessageQueueManager::ProcessAllMessageQueuesInternal() {
119 #if CS_DEBUG_CHECKS // CurrentThreadIsOwner returns true by default. 115 #if CS_DEBUG_CHECKS // CurrentThreadIsOwner returns true by default.
120 ASSERT(!crit_.CurrentThreadIsOwner()); // See note above. 116 ASSERT(!crit_.CurrentThreadIsOwner()); // See note above.
121 #endif 117 #endif
122 CritScope cs(&crit_); 118 // Post a delayed message at the current time and wait for it to be dispatched
123 for (MessageQueue* queue : message_queues_) { 119 // on all queues, which will ensure that all messages that came before it were
124 // Posting an arbitrary message will force the message queue to wake up. 120 // also dispatched.
125 queue->Post(this, MSG_WAKE_MESSAGE_QUEUE); 121 volatile int queues_not_done;
122 auto functor = [&queues_not_done] { AtomicOps::Decrement(&queues_not_done); };
123 FunctorMessageHandler<void, decltype(functor)> handler(functor);
124 {
125 CritScope cs(&crit_);
126 queues_not_done = static_cast<int>(message_queues_.size());
127 for (MessageQueue* queue : message_queues_) {
128 queue->PostDelayed(0, &handler);
129 }
130 }
131 // Note: One of the message queues may have been on this thread, which is why
132 // we can't synchronously wait for queues_not_done to go to 0; we need to
133 // process messages as well.
134 while (AtomicOps::AcquireLoad(&queues_not_done) > 0) {
135 rtc::Thread::Current()->ProcessMessages(0);
126 } 136 }
127 } 137 }
128 138
129 void MessageQueueManager::OnMessage(Message* pmsg) {
130 RTC_DCHECK(pmsg->message_id == MSG_WAKE_MESSAGE_QUEUE);
131 }
132
133 //------------------------------------------------------------------ 139 //------------------------------------------------------------------
134 // MessageQueue 140 // MessageQueue
135 MessageQueue::MessageQueue(SocketServer* ss, bool init_queue) 141 MessageQueue::MessageQueue(SocketServer* ss, bool init_queue)
136 : fStop_(false), fPeekKeep_(false), 142 : fStop_(false), fPeekKeep_(false),
137 dmsgq_next_num_(0), fInitialized_(false), fDestroyed_(false), ss_(ss) { 143 dmsgq_next_num_(0), fInitialized_(false), fDestroyed_(false), ss_(ss) {
138 RTC_DCHECK(ss); 144 RTC_DCHECK(ss);
139 // Currently, MessageQueue holds a socket server, and is the base class for 145 // Currently, MessageQueue holds a socket server, and is the base class for
140 // Thread. It seems like it makes more sense for Thread to hold the socket 146 // Thread. It seems like it makes more sense for Thread to hold the socket
141 // server, and provide it to the MessageQueue, since the Thread controls 147 // server, and provide it to the MessageQueue, since the Thread controls
142 // the I/O model, and MQ is agnostic to those details. Anyway, this causes 148 // the I/O model, and MQ is agnostic to those details. Anyway, this causes
(...skipping 334 matching lines...) Expand 10 before | Expand all | Expand 10 after
477 dmsgq_.container().erase(new_end, dmsgq_.container().end()); 483 dmsgq_.container().erase(new_end, dmsgq_.container().end());
478 dmsgq_.reheap(); 484 dmsgq_.reheap();
479 } 485 }
480 486
481 void MessageQueue::Dispatch(Message *pmsg) { 487 void MessageQueue::Dispatch(Message *pmsg) {
482 TRACE_EVENT0("webrtc", "MessageQueue::Dispatch"); 488 TRACE_EVENT0("webrtc", "MessageQueue::Dispatch");
483 pmsg->phandler->OnMessage(pmsg); 489 pmsg->phandler->OnMessage(pmsg);
484 } 490 }
485 491
486 } // namespace rtc 492 } // namespace rtc
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698