Index: webrtc/base/messagequeue.cc |
diff --git a/webrtc/base/messagequeue.cc b/webrtc/base/messagequeue.cc |
index bbdb941ffab4941107b347023f5ab23e0b762b42..61aa61192bbad69d87ddf5e79aab3a5af3ce290f 100644 |
--- a/webrtc/base/messagequeue.cc |
+++ b/webrtc/base/messagequeue.cc |
@@ -117,8 +117,8 @@ void MessageQueueManager::ClearInternal(MessageHandler *handler) { |
// MessageQueue |
MessageQueue::MessageQueue(SocketServer* ss, bool init_queue) |
- : ss_(ss), fStop_(false), fPeekKeep_(false), |
- dmsgq_next_num_(0), fInitialized_(false), fDestroyed_(false) { |
+ : fStop_(false), fPeekKeep_(false), |
+ dmsgq_next_num_(0), fInitialized_(false), fDestroyed_(false), ss_(ss) { |
if (!ss_) { |
// Currently, MessageQueue holds a socket server, and is the base class for |
// Thread. It seems like it makes more sense for Thread to hold the socket |
@@ -159,19 +159,37 @@ void MessageQueue::DoDestroy() { |
SignalQueueDestroyed(); |
MessageQueueManager::Remove(this); |
Clear(NULL); |
+ |
+ SharedScope ss(&ss_lock_); |
if (ss_) { |
ss_->SetMessageQueue(NULL); |
} |
} |
+SocketServer* MessageQueue::socketserver() { |
+ SharedScope ss(&ss_lock_); |
+ return ss_; |
+} |
+ |
void MessageQueue::set_socketserver(SocketServer* ss) { |
+ // Need to lock exclusively here to prevent simultaneous modifications from |
+ // other threads. Can't be a shared lock to prevent races with other reading |
+ // threads. |
+ // Other places that only read "ss_" can use a shared lock as simultaneous |
+ // read access is allowed. |
+ ExclusiveScope es(&ss_lock_); |
ss_ = ss ? ss : default_ss_.get(); |
ss_->SetMessageQueue(this); |
} |
+void MessageQueue::WakeUpSocketServer() { |
+ SharedScope ss(&ss_lock_); |
+ ss_->WakeUp(); |
+} |
+ |
void MessageQueue::Quit() { |
fStop_ = true; |
- ss_->WakeUp(); |
+ WakeUpSocketServer(); |
} |
bool MessageQueue::IsQuitting() { |
@@ -277,9 +295,12 @@ bool MessageQueue::Get(Message *pmsg, int cmsWait, bool process_io) { |
cmsNext = cmsDelayNext; |
} |
- // Wait and multiplex in the meantime |
- if (!ss_->Wait(cmsNext, process_io)) |
- return false; |
+ { |
+ // Wait and multiplex in the meantime |
+ SharedScope ss(&ss_lock_); |
+ if (!ss_->Wait(cmsNext, process_io)) |
+ return false; |
+ } |
// If the specified timeout expired, return |
@@ -307,16 +328,18 @@ void MessageQueue::Post(MessageHandler* phandler, |
// Add the message to the end of the queue |
// Signal for the multiplexer to return |
- CritScope cs(&crit_); |
- Message msg; |
- msg.phandler = phandler; |
- msg.message_id = id; |
- msg.pdata = pdata; |
- if (time_sensitive) { |
- msg.ts_sensitive = Time() + kMaxMsgLatency; |
+ { |
+ CritScope cs(&crit_); |
+ Message msg; |
+ msg.phandler = phandler; |
+ msg.message_id = id; |
+ msg.pdata = pdata; |
+ if (time_sensitive) { |
+ msg.ts_sensitive = Time() + kMaxMsgLatency; |
+ } |
+ msgq_.push_back(msg); |
} |
- msgq_.push_back(msg); |
- ss_->WakeUp(); |
+ WakeUpSocketServer(); |
} |
void MessageQueue::PostDelayed(int cmsDelay, |
@@ -345,18 +368,20 @@ void MessageQueue::DoDelayPost(int cmsDelay, |
// Add to the priority queue. Gets sorted soonest first. |
// Signal for the multiplexer to return. |
- CritScope cs(&crit_); |
- Message msg; |
- msg.phandler = phandler; |
- msg.message_id = id; |
- msg.pdata = pdata; |
- DelayedMessage dmsg(cmsDelay, tstamp, dmsgq_next_num_, msg); |
- dmsgq_.push(dmsg); |
- // If this message queue processes 1 message every millisecond for 50 days, |
- // we will wrap this number. Even then, only messages with identical times |
- // will be misordered, and then only briefly. This is probably ok. |
- VERIFY(0 != ++dmsgq_next_num_); |
- ss_->WakeUp(); |
+ { |
+ CritScope cs(&crit_); |
+ Message msg; |
+ msg.phandler = phandler; |
+ msg.message_id = id; |
+ msg.pdata = pdata; |
+ DelayedMessage dmsg(cmsDelay, tstamp, dmsgq_next_num_, msg); |
+ dmsgq_.push(dmsg); |
+ // If this message queue processes 1 message every millisecond for 50 days, |
+ // we will wrap this number. Even then, only messages with identical times |
+ // will be misordered, and then only briefly. This is probably ok. |
+ VERIFY(0 != ++dmsgq_next_num_); |
+ } |
+ WakeUpSocketServer(); |
} |
int MessageQueue::GetDelay() { |