| OLD | NEW |
| 1 /* | 1 /* |
| 2 * Copyright 2004 The WebRTC Project Authors. All rights reserved. | 2 * Copyright 2004 The WebRTC Project Authors. All rights reserved. |
| 3 * | 3 * |
| 4 * Use of this source code is governed by a BSD-style license | 4 * Use of this source code is governed by a BSD-style license |
| 5 * that can be found in the LICENSE file in the root of the source | 5 * that can be found in the LICENSE file in the root of the source |
| 6 * tree. An additional intellectual property rights grant can be found | 6 * tree. An additional intellectual property rights grant can be found |
| 7 * in the file PATENTS. All contributing project authors may | 7 * in the file PATENTS. All contributing project authors may |
| 8 * be found in the AUTHORS file in the root of the source tree. | 8 * be found in the AUTHORS file in the root of the source tree. |
| 9 */ | 9 */ |
| 10 #include <algorithm> | 10 #include <algorithm> |
| (...skipping 128 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 139 } | 139 } |
| 140 ~ScopedIncrement() override { AtomicOps::Decrement(value_); } | 140 ~ScopedIncrement() override { AtomicOps::Decrement(value_); } |
| 141 | 141 |
| 142 private: | 142 private: |
| 143 volatile int* value_; | 143 volatile int* value_; |
| 144 }; | 144 }; |
| 145 | 145 |
| 146 { | 146 { |
| 147 DebugNonReentrantCritScope cs(&crit_, &locked_); | 147 DebugNonReentrantCritScope cs(&crit_, &locked_); |
| 148 for (MessageQueue* queue : message_queues_) { | 148 for (MessageQueue* queue : message_queues_) { |
| 149 if (!queue->IsProcessingMessages()) { | 149 if (!queue->IsProcessingMessages() || queue->empty()) { |
| 150 // If the queue is not processing messages, it can | 150 // If the queue is empty or not processing messages, it can |
| 151 // be ignored. If we tried to post a message to it, it would be dropped | 151 // be ignored. If we tried to post a message to it, it would be dropped |
| 152 // or ignored. | 152 // or ignored. |
| 153 continue; | 153 continue; |
| 154 } | 154 } |
| 155 queue->PostDelayed(RTC_FROM_HERE, 0, nullptr, MQID_DISPOSE, | 155 queue->PostDelayed(RTC_FROM_HERE, 0, nullptr, MQID_DISPOSE, |
| 156 new ScopedIncrement(&queues_not_done)); | 156 new ScopedIncrement(&queues_not_done)); |
| 157 } | 157 } |
| 158 } | 158 } |
| 159 // Note: One of the message queues may have been on this thread, which is why | 159 // Note: One of the message queues may have been on this thread, which is why |
| 160 // we can't synchronously wait for queues_not_done to go to 0; we need to | 160 // we can't synchronously wait for queues_not_done to go to 0; we need to |
| (...skipping 48 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 209 } | 209 } |
| 210 | 210 |
| 211 fDestroyed_ = true; | 211 fDestroyed_ = true; |
| 212 // The signal is done from here to ensure | 212 // The signal is done from here to ensure |
| 213 // that it always gets called when the queue | 213 // that it always gets called when the queue |
| 214 // is going away. | 214 // is going away. |
| 215 SignalQueueDestroyed(); | 215 SignalQueueDestroyed(); |
| 216 MessageQueueManager::Remove(this); | 216 MessageQueueManager::Remove(this); |
| 217 Clear(nullptr); | 217 Clear(nullptr); |
| 218 | 218 |
| 219 SharedScope ss(&ss_lock_); | |
| 220 if (ss_) { | 219 if (ss_) { |
| 221 ss_->SetMessageQueue(nullptr); | 220 ss_->SetMessageQueue(nullptr); |
| 222 } | 221 } |
| 223 } | 222 } |
| 224 | 223 |
| 225 SocketServer* MessageQueue::socketserver() { | 224 SocketServer* MessageQueue::socketserver() { |
| 226 SharedScope ss(&ss_lock_); | |
| 227 return ss_; | 225 return ss_; |
| 228 } | 226 } |
| 229 | 227 |
| 230 void MessageQueue::set_socketserver(SocketServer* ss) { | |
| 231 // Need to lock exclusively here to prevent simultaneous modifications from | |
| 232 // other threads. Can't be a shared lock to prevent races with other reading | |
| 233 // threads. | |
| 234 // Other places that only read "ss_" can use a shared lock as simultaneous | |
| 235 // read access is allowed. | |
| 236 ExclusiveScope es(&ss_lock_); | |
| 237 ss_ = ss ? ss : own_ss_.get(); | |
| 238 ss_->SetMessageQueue(this); | |
| 239 } | |
| 240 | |
| 241 void MessageQueue::WakeUpSocketServer() { | 228 void MessageQueue::WakeUpSocketServer() { |
| 242 SharedScope ss(&ss_lock_); | |
| 243 ss_->WakeUp(); | 229 ss_->WakeUp(); |
| 244 } | 230 } |
| 245 | 231 |
| 246 void MessageQueue::Quit() { | 232 void MessageQueue::Quit() { |
| 247 AtomicOps::ReleaseStore(&stop_, 1); | 233 AtomicOps::ReleaseStore(&stop_, 1); |
| 248 WakeUpSocketServer(); | 234 WakeUpSocketServer(); |
| 249 } | 235 } |
| 250 | 236 |
| 251 bool MessageQueue::IsQuitting() { | 237 bool MessageQueue::IsQuitting() { |
| 252 return AtomicOps::AcquireLoad(&stop_) != 0; | 238 return AtomicOps::AcquireLoad(&stop_) != 0; |
| (...skipping 97 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 350 if (cmsWait == kForever) { | 336 if (cmsWait == kForever) { |
| 351 cmsNext = cmsDelayNext; | 337 cmsNext = cmsDelayNext; |
| 352 } else { | 338 } else { |
| 353 cmsNext = std::max<int64_t>(0, cmsTotal - cmsElapsed); | 339 cmsNext = std::max<int64_t>(0, cmsTotal - cmsElapsed); |
| 354 if ((cmsDelayNext != kForever) && (cmsDelayNext < cmsNext)) | 340 if ((cmsDelayNext != kForever) && (cmsDelayNext < cmsNext)) |
| 355 cmsNext = cmsDelayNext; | 341 cmsNext = cmsDelayNext; |
| 356 } | 342 } |
| 357 | 343 |
| 358 { | 344 { |
| 359 // Wait and multiplex in the meantime | 345 // Wait and multiplex in the meantime |
| 360 SharedScope ss(&ss_lock_); | |
| 361 if (!ss_->Wait(static_cast<int>(cmsNext), process_io)) | 346 if (!ss_->Wait(static_cast<int>(cmsNext), process_io)) |
| 362 return false; | 347 return false; |
| 363 } | 348 } |
| 364 | 349 |
| 365 // If the specified timeout expired, return | 350 // If the specified timeout expired, return |
| 366 | 351 |
| 367 msCurrent = TimeMillis(); | 352 msCurrent = TimeMillis(); |
| 368 cmsElapsed = TimeDiff(msCurrent, msStart); | 353 cmsElapsed = TimeDiff(msCurrent, msStart); |
| 369 if (cmsWait != kForever) { | 354 if (cmsWait != kForever) { |
| 370 if (cmsElapsed >= cmsWait) | 355 if (cmsElapsed >= cmsWait) |
| (...skipping 167 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 538 pmsg->phandler->OnMessage(pmsg); | 523 pmsg->phandler->OnMessage(pmsg); |
| 539 int64_t end_time = TimeMillis(); | 524 int64_t end_time = TimeMillis(); |
| 540 int64_t diff = TimeDiff(end_time, start_time); | 525 int64_t diff = TimeDiff(end_time, start_time); |
| 541 if (diff >= kSlowDispatchLoggingThreshold) { | 526 if (diff >= kSlowDispatchLoggingThreshold) { |
| 542 LOG(LS_INFO) << "Message took " << diff << "ms to dispatch. Posted from: " | 527 LOG(LS_INFO) << "Message took " << diff << "ms to dispatch. Posted from: " |
| 543 << pmsg->posted_from.ToString(); | 528 << pmsg->posted_from.ToString(); |
| 544 } | 529 } |
| 545 } | 530 } |
| 546 | 531 |
| 547 } // namespace rtc | 532 } // namespace rtc |
| OLD | NEW |