| Index: webrtc/base/messagequeue.cc
|
| diff --git a/webrtc/base/messagequeue.cc b/webrtc/base/messagequeue.cc
|
| index bbdb941ffab4941107b347023f5ab23e0b762b42..61aa61192bbad69d87ddf5e79aab3a5af3ce290f 100644
|
| --- a/webrtc/base/messagequeue.cc
|
| +++ b/webrtc/base/messagequeue.cc
|
| @@ -117,8 +117,8 @@ void MessageQueueManager::ClearInternal(MessageHandler *handler) {
|
| // MessageQueue
|
|
|
| MessageQueue::MessageQueue(SocketServer* ss, bool init_queue)
|
| - : ss_(ss), fStop_(false), fPeekKeep_(false),
|
| - dmsgq_next_num_(0), fInitialized_(false), fDestroyed_(false) {
|
| + : fStop_(false), fPeekKeep_(false),
|
| + dmsgq_next_num_(0), fInitialized_(false), fDestroyed_(false), ss_(ss) {
|
| if (!ss_) {
|
| // Currently, MessageQueue holds a socket server, and is the base class for
|
| // Thread. It seems like it makes more sense for Thread to hold the socket
|
| @@ -159,19 +159,37 @@ void MessageQueue::DoDestroy() {
|
| SignalQueueDestroyed();
|
| MessageQueueManager::Remove(this);
|
| Clear(NULL);
|
| +
|
| + SharedScope ss(&ss_lock_);
|
| if (ss_) {
|
| ss_->SetMessageQueue(NULL);
|
| }
|
| }
|
|
|
| +SocketServer* MessageQueue::socketserver() {
|
| + SharedScope ss(&ss_lock_);
|
| + return ss_;
|
| +}
|
| +
|
| void MessageQueue::set_socketserver(SocketServer* ss) {
|
| + // Need to lock exclusively here to prevent simultaneous modifications from
|
| + // other threads. Can't be a shared lock to prevent races with other reading
|
| + // threads.
|
| + // Other places that only read "ss_" can use a shared lock as simultaneous
|
| + // read access is allowed.
|
| + ExclusiveScope es(&ss_lock_);
|
| ss_ = ss ? ss : default_ss_.get();
|
| ss_->SetMessageQueue(this);
|
| }
|
|
|
| +void MessageQueue::WakeUpSocketServer() {
|
| + SharedScope ss(&ss_lock_);
|
| + ss_->WakeUp();
|
| +}
|
| +
|
| void MessageQueue::Quit() {
|
| fStop_ = true;
|
| - ss_->WakeUp();
|
| + WakeUpSocketServer();
|
| }
|
|
|
| bool MessageQueue::IsQuitting() {
|
| @@ -277,9 +295,12 @@ bool MessageQueue::Get(Message *pmsg, int cmsWait, bool process_io) {
|
| cmsNext = cmsDelayNext;
|
| }
|
|
|
| - // Wait and multiplex in the meantime
|
| - if (!ss_->Wait(cmsNext, process_io))
|
| - return false;
|
| + {
|
| + // Wait and multiplex in the meantime
|
| + SharedScope ss(&ss_lock_);
|
| + if (!ss_->Wait(cmsNext, process_io))
|
| + return false;
|
| + }
|
|
|
| // If the specified timeout expired, return
|
|
|
| @@ -307,16 +328,18 @@ void MessageQueue::Post(MessageHandler* phandler,
|
| // Add the message to the end of the queue
|
| // Signal for the multiplexer to return
|
|
|
| - CritScope cs(&crit_);
|
| - Message msg;
|
| - msg.phandler = phandler;
|
| - msg.message_id = id;
|
| - msg.pdata = pdata;
|
| - if (time_sensitive) {
|
| - msg.ts_sensitive = Time() + kMaxMsgLatency;
|
| + {
|
| + CritScope cs(&crit_);
|
| + Message msg;
|
| + msg.phandler = phandler;
|
| + msg.message_id = id;
|
| + msg.pdata = pdata;
|
| + if (time_sensitive) {
|
| + msg.ts_sensitive = Time() + kMaxMsgLatency;
|
| + }
|
| + msgq_.push_back(msg);
|
| }
|
| - msgq_.push_back(msg);
|
| - ss_->WakeUp();
|
| + WakeUpSocketServer();
|
| }
|
|
|
| void MessageQueue::PostDelayed(int cmsDelay,
|
| @@ -345,18 +368,20 @@ void MessageQueue::DoDelayPost(int cmsDelay,
|
| // Add to the priority queue. Gets sorted soonest first.
|
| // Signal for the multiplexer to return.
|
|
|
| - CritScope cs(&crit_);
|
| - Message msg;
|
| - msg.phandler = phandler;
|
| - msg.message_id = id;
|
| - msg.pdata = pdata;
|
| - DelayedMessage dmsg(cmsDelay, tstamp, dmsgq_next_num_, msg);
|
| - dmsgq_.push(dmsg);
|
| - // If this message queue processes 1 message every millisecond for 50 days,
|
| - // we will wrap this number. Even then, only messages with identical times
|
| - // will be misordered, and then only briefly. This is probably ok.
|
| - VERIFY(0 != ++dmsgq_next_num_);
|
| - ss_->WakeUp();
|
| + {
|
| + CritScope cs(&crit_);
|
| + Message msg;
|
| + msg.phandler = phandler;
|
| + msg.message_id = id;
|
| + msg.pdata = pdata;
|
| + DelayedMessage dmsg(cmsDelay, tstamp, dmsgq_next_num_, msg);
|
| + dmsgq_.push(dmsg);
|
| + // If this message queue processes 1 message every millisecond for 50 days,
|
| + // we will wrap this number. Even then, only messages with identical times
|
| + // will be misordered, and then only briefly. This is probably ok.
|
| + VERIFY(0 != ++dmsgq_next_num_);
|
| + }
|
| + WakeUpSocketServer();
|
| }
|
|
|
| int MessageQueue::GetDelay() {
|
|
|