| OLD | NEW |
| (Empty) |
| 1 /* | |
| 2 * Copyright 2004 The WebRTC Project Authors. All rights reserved. | |
| 3 * | |
| 4 * Use of this source code is governed by a BSD-style license | |
| 5 * that can be found in the LICENSE file in the root of the source | |
| 6 * tree. An additional intellectual property rights grant can be found | |
| 7 * in the file PATENTS. All contributing project authors may | |
| 8 * be found in the AUTHORS file in the root of the source tree. | |
| 9 */ | |
| 10 | |
| 11 #include "webrtc/base/thread.h" | |
| 12 | |
| 13 #if defined(WEBRTC_WIN) | |
| 14 #include <comdef.h> | |
| 15 #elif defined(WEBRTC_POSIX) | |
| 16 #include <time.h> | |
| 17 #endif | |
| 18 | |
| 19 #include "webrtc/base/checks.h" | |
| 20 #include "webrtc/base/logging.h" | |
| 21 #include "webrtc/base/nullsocketserver.h" | |
| 22 #include "webrtc/base/platform_thread.h" | |
| 23 #include "webrtc/base/stringutils.h" | |
| 24 #include "webrtc/base/timeutils.h" | |
| 25 #include "webrtc/base/trace_event.h" | |
| 26 | |
| 27 namespace rtc { | |
| 28 | |
| 29 ThreadManager* ThreadManager::Instance() { | |
| 30 RTC_DEFINE_STATIC_LOCAL(ThreadManager, thread_manager, ()); | |
| 31 return &thread_manager; | |
| 32 } | |
| 33 | |
| 34 ThreadManager::~ThreadManager() { | |
| 35 // By above RTC_DEFINE_STATIC_LOCAL. | |
| 36 RTC_NOTREACHED() << "ThreadManager should never be destructed."; | |
| 37 } | |
| 38 | |
| 39 // static | |
| 40 Thread* Thread::Current() { | |
| 41 ThreadManager* manager = ThreadManager::Instance(); | |
| 42 Thread* thread = manager->CurrentThread(); | |
| 43 | |
| 44 #ifndef NO_MAIN_THREAD_WRAPPING | |
| 45 // Only autowrap the thread which instantiated the ThreadManager. | |
| 46 if (!thread && manager->IsMainThread()) { | |
| 47 thread = new Thread(); | |
| 48 thread->WrapCurrentWithThreadManager(manager, true); | |
| 49 } | |
| 50 #endif | |
| 51 | |
| 52 return thread; | |
| 53 } | |
| 54 | |
| 55 #if defined(WEBRTC_POSIX) | |
| 56 #if !defined(WEBRTC_MAC) | |
| 57 ThreadManager::ThreadManager() { | |
| 58 main_thread_ref_ = CurrentThreadRef(); | |
| 59 pthread_key_create(&key_, nullptr); | |
| 60 } | |
| 61 #endif | |
| 62 | |
| 63 Thread *ThreadManager::CurrentThread() { | |
| 64 return static_cast<Thread *>(pthread_getspecific(key_)); | |
| 65 } | |
| 66 | |
| 67 void ThreadManager::SetCurrentThread(Thread *thread) { | |
| 68 pthread_setspecific(key_, thread); | |
| 69 } | |
| 70 #endif | |
| 71 | |
| 72 #if defined(WEBRTC_WIN) | |
| 73 ThreadManager::ThreadManager() { | |
| 74 main_thread_ref_ = CurrentThreadRef(); | |
| 75 key_ = TlsAlloc(); | |
| 76 } | |
| 77 | |
| 78 Thread *ThreadManager::CurrentThread() { | |
| 79 return static_cast<Thread *>(TlsGetValue(key_)); | |
| 80 } | |
| 81 | |
| 82 void ThreadManager::SetCurrentThread(Thread *thread) { | |
| 83 TlsSetValue(key_, thread); | |
| 84 } | |
| 85 #endif | |
| 86 | |
| 87 Thread *ThreadManager::WrapCurrentThread() { | |
| 88 Thread* result = CurrentThread(); | |
| 89 if (nullptr == result) { | |
| 90 result = new Thread(); | |
| 91 result->WrapCurrentWithThreadManager(this, true); | |
| 92 } | |
| 93 return result; | |
| 94 } | |
| 95 | |
| 96 void ThreadManager::UnwrapCurrentThread() { | |
| 97 Thread* t = CurrentThread(); | |
| 98 if (t && !(t->IsOwned())) { | |
| 99 t->UnwrapCurrent(); | |
| 100 delete t; | |
| 101 } | |
| 102 } | |
| 103 | |
| 104 bool ThreadManager::IsMainThread() { | |
| 105 return IsThreadRefEqual(CurrentThreadRef(), main_thread_ref_); | |
| 106 } | |
| 107 | |
| 108 Thread::ScopedDisallowBlockingCalls::ScopedDisallowBlockingCalls() | |
| 109 : thread_(Thread::Current()), | |
| 110 previous_state_(thread_->SetAllowBlockingCalls(false)) { | |
| 111 } | |
| 112 | |
| 113 Thread::ScopedDisallowBlockingCalls::~ScopedDisallowBlockingCalls() { | |
| 114 RTC_DCHECK(thread_->IsCurrent()); | |
| 115 thread_->SetAllowBlockingCalls(previous_state_); | |
| 116 } | |
| 117 | |
| 118 Thread::Thread() : Thread(SocketServer::CreateDefault()) {} | |
| 119 | |
| 120 Thread::Thread(SocketServer* ss) | |
| 121 : MessageQueue(ss, false), | |
| 122 running_(true, false), | |
| 123 #if defined(WEBRTC_WIN) | |
| 124 thread_(nullptr), | |
| 125 thread_id_(0), | |
| 126 #endif | |
| 127 owned_(true), | |
| 128 blocking_calls_allowed_(true) { | |
| 129 SetName("Thread", this); // default name | |
| 130 DoInit(); | |
| 131 } | |
| 132 | |
| 133 Thread::Thread(std::unique_ptr<SocketServer> ss) | |
| 134 : MessageQueue(std::move(ss), false), | |
| 135 running_(true, false), | |
| 136 #if defined(WEBRTC_WIN) | |
| 137 thread_(nullptr), | |
| 138 thread_id_(0), | |
| 139 #endif | |
| 140 owned_(true), | |
| 141 blocking_calls_allowed_(true) { | |
| 142 SetName("Thread", this); // default name | |
| 143 DoInit(); | |
| 144 } | |
| 145 | |
| 146 Thread::~Thread() { | |
| 147 Stop(); | |
| 148 DoDestroy(); | |
| 149 } | |
| 150 | |
| 151 bool Thread::IsCurrent() const { | |
| 152 return ThreadManager::Instance()->CurrentThread() == this; | |
| 153 } | |
| 154 | |
| 155 std::unique_ptr<Thread> Thread::CreateWithSocketServer() { | |
| 156 return std::unique_ptr<Thread>(new Thread(SocketServer::CreateDefault())); | |
| 157 } | |
| 158 | |
| 159 std::unique_ptr<Thread> Thread::Create() { | |
| 160 return std::unique_ptr<Thread>( | |
| 161 new Thread(std::unique_ptr<SocketServer>(new NullSocketServer()))); | |
| 162 } | |
| 163 | |
| 164 bool Thread::SleepMs(int milliseconds) { | |
| 165 AssertBlockingIsAllowedOnCurrentThread(); | |
| 166 | |
| 167 #if defined(WEBRTC_WIN) | |
| 168 ::Sleep(milliseconds); | |
| 169 return true; | |
| 170 #else | |
| 171 // POSIX has both a usleep() and a nanosleep(), but the former is deprecated, | |
| 172 // so we use nanosleep() even though it has greater precision than necessary. | |
| 173 struct timespec ts; | |
| 174 ts.tv_sec = milliseconds / 1000; | |
| 175 ts.tv_nsec = (milliseconds % 1000) * 1000000; | |
| 176 int ret = nanosleep(&ts, nullptr); | |
| 177 if (ret != 0) { | |
| 178 LOG_ERR(LS_WARNING) << "nanosleep() returning early"; | |
| 179 return false; | |
| 180 } | |
| 181 return true; | |
| 182 #endif | |
| 183 } | |
| 184 | |
| 185 bool Thread::SetName(const std::string& name, const void* obj) { | |
| 186 if (running()) return false; | |
| 187 name_ = name; | |
| 188 if (obj) { | |
| 189 char buf[16]; | |
| 190 sprintfn(buf, sizeof(buf), " 0x%p", obj); | |
| 191 name_ += buf; | |
| 192 } | |
| 193 return true; | |
| 194 } | |
| 195 | |
| 196 bool Thread::Start(Runnable* runnable) { | |
| 197 RTC_DCHECK(owned_); | |
| 198 if (!owned_) return false; | |
| 199 RTC_DCHECK(!running()); | |
| 200 if (running()) return false; | |
| 201 | |
| 202 Restart(); // reset IsQuitting() if the thread is being restarted | |
| 203 | |
| 204 // Make sure that ThreadManager is created on the main thread before | |
| 205 // we start a new thread. | |
| 206 ThreadManager::Instance(); | |
| 207 | |
| 208 ThreadInit* init = new ThreadInit; | |
| 209 init->thread = this; | |
| 210 init->runnable = runnable; | |
| 211 #if defined(WEBRTC_WIN) | |
| 212 thread_ = CreateThread(nullptr, 0, PreRun, init, 0, &thread_id_); | |
| 213 if (thread_) { | |
| 214 running_.Set(); | |
| 215 } else { | |
| 216 return false; | |
| 217 } | |
| 218 #elif defined(WEBRTC_POSIX) | |
| 219 pthread_attr_t attr; | |
| 220 pthread_attr_init(&attr); | |
| 221 | |
| 222 int error_code = pthread_create(&thread_, &attr, PreRun, init); | |
| 223 if (0 != error_code) { | |
| 224 LOG(LS_ERROR) << "Unable to create pthread, error " << error_code; | |
| 225 return false; | |
| 226 } | |
| 227 running_.Set(); | |
| 228 #endif | |
| 229 return true; | |
| 230 } | |
| 231 | |
| 232 bool Thread::WrapCurrent() { | |
| 233 return WrapCurrentWithThreadManager(ThreadManager::Instance(), true); | |
| 234 } | |
| 235 | |
| 236 void Thread::UnwrapCurrent() { | |
| 237 // Clears the platform-specific thread-specific storage. | |
| 238 ThreadManager::Instance()->SetCurrentThread(nullptr); | |
| 239 #if defined(WEBRTC_WIN) | |
| 240 if (thread_ != nullptr) { | |
| 241 if (!CloseHandle(thread_)) { | |
| 242 LOG_GLE(LS_ERROR) << "When unwrapping thread, failed to close handle."; | |
| 243 } | |
| 244 thread_ = nullptr; | |
| 245 } | |
| 246 #endif | |
| 247 running_.Reset(); | |
| 248 } | |
| 249 | |
| 250 void Thread::SafeWrapCurrent() { | |
| 251 WrapCurrentWithThreadManager(ThreadManager::Instance(), false); | |
| 252 } | |
| 253 | |
| 254 void Thread::Join() { | |
| 255 if (running()) { | |
| 256 RTC_DCHECK(!IsCurrent()); | |
| 257 if (Current() && !Current()->blocking_calls_allowed_) { | |
| 258 LOG(LS_WARNING) << "Waiting for the thread to join, " | |
| 259 << "but blocking calls have been disallowed"; | |
| 260 } | |
| 261 | |
| 262 #if defined(WEBRTC_WIN) | |
| 263 RTC_DCHECK(thread_ != nullptr); | |
| 264 WaitForSingleObject(thread_, INFINITE); | |
| 265 CloseHandle(thread_); | |
| 266 thread_ = nullptr; | |
| 267 thread_id_ = 0; | |
| 268 #elif defined(WEBRTC_POSIX) | |
| 269 void *pv; | |
| 270 pthread_join(thread_, &pv); | |
| 271 #endif | |
| 272 running_.Reset(); | |
| 273 } | |
| 274 } | |
| 275 | |
| 276 bool Thread::SetAllowBlockingCalls(bool allow) { | |
| 277 RTC_DCHECK(IsCurrent()); | |
| 278 bool previous = blocking_calls_allowed_; | |
| 279 blocking_calls_allowed_ = allow; | |
| 280 return previous; | |
| 281 } | |
| 282 | |
| 283 // static | |
| 284 void Thread::AssertBlockingIsAllowedOnCurrentThread() { | |
| 285 #if !defined(NDEBUG) | |
| 286 Thread* current = Thread::Current(); | |
| 287 RTC_DCHECK(!current || current->blocking_calls_allowed_); | |
| 288 #endif | |
| 289 } | |
| 290 | |
| 291 // static | |
| 292 #if !defined(WEBRTC_MAC) | |
| 293 #if defined(WEBRTC_WIN) | |
| 294 DWORD WINAPI Thread::PreRun(LPVOID pv) { | |
| 295 #else | |
| 296 void* Thread::PreRun(void* pv) { | |
| 297 #endif | |
| 298 ThreadInit* init = static_cast<ThreadInit*>(pv); | |
| 299 ThreadManager::Instance()->SetCurrentThread(init->thread); | |
| 300 rtc::SetCurrentThreadName(init->thread->name_.c_str()); | |
| 301 if (init->runnable) { | |
| 302 init->runnable->Run(init->thread); | |
| 303 } else { | |
| 304 init->thread->Run(); | |
| 305 } | |
| 306 delete init; | |
| 307 #ifdef WEBRTC_WIN | |
| 308 return 0; | |
| 309 #else | |
| 310 return nullptr; | |
| 311 #endif | |
| 312 } | |
| 313 #endif | |
| 314 | |
| 315 void Thread::Run() { | |
| 316 ProcessMessages(kForever); | |
| 317 } | |
| 318 | |
| 319 bool Thread::IsOwned() { | |
| 320 return owned_; | |
| 321 } | |
| 322 | |
| 323 void Thread::Stop() { | |
| 324 MessageQueue::Quit(); | |
| 325 Join(); | |
| 326 } | |
| 327 | |
| 328 void Thread::Send(const Location& posted_from, | |
| 329 MessageHandler* phandler, | |
| 330 uint32_t id, | |
| 331 MessageData* pdata) { | |
| 332 if (IsQuitting()) | |
| 333 return; | |
| 334 | |
| 335 // Sent messages are sent to the MessageHandler directly, in the context | |
| 336 // of "thread", like Win32 SendMessage. If in the right context, | |
| 337 // call the handler directly. | |
| 338 Message msg; | |
| 339 msg.posted_from = posted_from; | |
| 340 msg.phandler = phandler; | |
| 341 msg.message_id = id; | |
| 342 msg.pdata = pdata; | |
| 343 if (IsCurrent()) { | |
| 344 phandler->OnMessage(&msg); | |
| 345 return; | |
| 346 } | |
| 347 | |
| 348 AssertBlockingIsAllowedOnCurrentThread(); | |
| 349 | |
| 350 AutoThread thread; | |
| 351 Thread *current_thread = Thread::Current(); | |
| 352 RTC_DCHECK(current_thread != nullptr); // AutoThread ensures this | |
| 353 | |
| 354 bool ready = false; | |
| 355 { | |
| 356 CritScope cs(&crit_); | |
| 357 _SendMessage smsg; | |
| 358 smsg.thread = current_thread; | |
| 359 smsg.msg = msg; | |
| 360 smsg.ready = &ready; | |
| 361 sendlist_.push_back(smsg); | |
| 362 } | |
| 363 | |
| 364 // Wait for a reply | |
| 365 WakeUpSocketServer(); | |
| 366 | |
| 367 bool waited = false; | |
| 368 crit_.Enter(); | |
| 369 while (!ready) { | |
| 370 crit_.Leave(); | |
| 371 // We need to limit "ReceiveSends" to |this| thread to avoid an arbitrary | |
| 372 // thread invoking calls on the current thread. | |
| 373 current_thread->ReceiveSendsFromThread(this); | |
| 374 current_thread->socketserver()->Wait(kForever, false); | |
| 375 waited = true; | |
| 376 crit_.Enter(); | |
| 377 } | |
| 378 crit_.Leave(); | |
| 379 | |
| 380 // Our Wait loop above may have consumed some WakeUp events for this | |
| 381 // MessageQueue, that weren't relevant to this Send. Losing these WakeUps can | |
| 382 // cause problems for some SocketServers. | |
| 383 // | |
| 384 // Concrete example: | |
| 385 // Win32SocketServer on thread A calls Send on thread B. While processing the | |
| 386 // message, thread B Posts a message to A. We consume the wakeup for that | |
| 387 // Post while waiting for the Send to complete, which means that when we exit | |
| 388 // this loop, we need to issue another WakeUp, or else the Posted message | |
| 389 // won't be processed in a timely manner. | |
| 390 | |
| 391 if (waited) { | |
| 392 current_thread->socketserver()->WakeUp(); | |
| 393 } | |
| 394 } | |
| 395 | |
| 396 void Thread::ReceiveSends() { | |
| 397 ReceiveSendsFromThread(nullptr); | |
| 398 } | |
| 399 | |
| 400 void Thread::ReceiveSendsFromThread(const Thread* source) { | |
| 401 // Receive a sent message. Cleanup scenarios: | |
| 402 // - thread sending exits: We don't allow this, since thread can exit | |
| 403 // only via Join, so Send must complete. | |
| 404 // - thread receiving exits: Wakeup/set ready in Thread::Clear() | |
| 405 // - object target cleared: Wakeup/set ready in Thread::Clear() | |
| 406 _SendMessage smsg; | |
| 407 | |
| 408 crit_.Enter(); | |
| 409 while (PopSendMessageFromThread(source, &smsg)) { | |
| 410 crit_.Leave(); | |
| 411 | |
| 412 smsg.msg.phandler->OnMessage(&smsg.msg); | |
| 413 | |
| 414 crit_.Enter(); | |
| 415 *smsg.ready = true; | |
| 416 smsg.thread->socketserver()->WakeUp(); | |
| 417 } | |
| 418 crit_.Leave(); | |
| 419 } | |
| 420 | |
| 421 bool Thread::PopSendMessageFromThread(const Thread* source, _SendMessage* msg) { | |
| 422 for (std::list<_SendMessage>::iterator it = sendlist_.begin(); | |
| 423 it != sendlist_.end(); ++it) { | |
| 424 if (it->thread == source || source == nullptr) { | |
| 425 *msg = *it; | |
| 426 sendlist_.erase(it); | |
| 427 return true; | |
| 428 } | |
| 429 } | |
| 430 return false; | |
| 431 } | |
| 432 | |
| 433 void Thread::InvokeInternal(const Location& posted_from, | |
| 434 MessageHandler* handler) { | |
| 435 TRACE_EVENT2("webrtc", "Thread::Invoke", "src_file_and_line", | |
| 436 posted_from.file_and_line(), "src_func", | |
| 437 posted_from.function_name()); | |
| 438 Send(posted_from, handler); | |
| 439 } | |
| 440 | |
| 441 void Thread::Clear(MessageHandler* phandler, | |
| 442 uint32_t id, | |
| 443 MessageList* removed) { | |
| 444 CritScope cs(&crit_); | |
| 445 | |
| 446 // Remove messages on sendlist_ with phandler | |
| 447 // Object target cleared: remove from send list, wakeup/set ready | |
| 448 // if sender not null. | |
| 449 | |
| 450 std::list<_SendMessage>::iterator iter = sendlist_.begin(); | |
| 451 while (iter != sendlist_.end()) { | |
| 452 _SendMessage smsg = *iter; | |
| 453 if (smsg.msg.Match(phandler, id)) { | |
| 454 if (removed) { | |
| 455 removed->push_back(smsg.msg); | |
| 456 } else { | |
| 457 delete smsg.msg.pdata; | |
| 458 } | |
| 459 iter = sendlist_.erase(iter); | |
| 460 *smsg.ready = true; | |
| 461 smsg.thread->socketserver()->WakeUp(); | |
| 462 continue; | |
| 463 } | |
| 464 ++iter; | |
| 465 } | |
| 466 | |
| 467 MessageQueue::Clear(phandler, id, removed); | |
| 468 } | |
| 469 | |
| 470 #if !defined(WEBRTC_MAC) | |
| 471 // Note that these methods have a separate implementation for mac and ios | |
| 472 // defined in webrtc/base/thread_darwin.mm. | |
| 473 bool Thread::ProcessMessages(int cmsLoop) { | |
| 474 // Using ProcessMessages with a custom clock for testing and a time greater | |
| 475 // than 0 doesn't work, since it's not guaranteed to advance the custom | |
| 476 // clock's time, and may get stuck in an infinite loop. | |
| 477 RTC_DCHECK(GetClockForTesting() == nullptr || cmsLoop == 0 || | |
| 478 cmsLoop == kForever); | |
| 479 int64_t msEnd = (kForever == cmsLoop) ? 0 : TimeAfter(cmsLoop); | |
| 480 int cmsNext = cmsLoop; | |
| 481 | |
| 482 while (true) { | |
| 483 Message msg; | |
| 484 if (!Get(&msg, cmsNext)) | |
| 485 return !IsQuitting(); | |
| 486 Dispatch(&msg); | |
| 487 | |
| 488 if (cmsLoop != kForever) { | |
| 489 cmsNext = static_cast<int>(TimeUntil(msEnd)); | |
| 490 if (cmsNext < 0) | |
| 491 return true; | |
| 492 } | |
| 493 } | |
| 494 } | |
| 495 #endif | |
| 496 | |
| 497 bool Thread::WrapCurrentWithThreadManager(ThreadManager* thread_manager, | |
| 498 bool need_synchronize_access) { | |
| 499 if (running()) | |
| 500 return false; | |
| 501 | |
| 502 #if defined(WEBRTC_WIN) | |
| 503 if (need_synchronize_access) { | |
| 504 // We explicitly ask for no rights other than synchronization. | |
| 505 // This gives us the best chance of succeeding. | |
| 506 thread_ = OpenThread(SYNCHRONIZE, FALSE, GetCurrentThreadId()); | |
| 507 if (!thread_) { | |
| 508 LOG_GLE(LS_ERROR) << "Unable to get handle to thread."; | |
| 509 return false; | |
| 510 } | |
| 511 thread_id_ = GetCurrentThreadId(); | |
| 512 } | |
| 513 #elif defined(WEBRTC_POSIX) | |
| 514 thread_ = pthread_self(); | |
| 515 #endif | |
| 516 | |
| 517 owned_ = false; | |
| 518 running_.Set(); | |
| 519 thread_manager->SetCurrentThread(this); | |
| 520 return true; | |
| 521 } | |
| 522 | |
| 523 AutoThread::AutoThread() { | |
| 524 if (!ThreadManager::Instance()->CurrentThread()) { | |
| 525 ThreadManager::Instance()->SetCurrentThread(this); | |
| 526 } | |
| 527 } | |
| 528 | |
| 529 AutoThread::~AutoThread() { | |
| 530 Stop(); | |
| 531 if (ThreadManager::Instance()->CurrentThread() == this) { | |
| 532 ThreadManager::Instance()->SetCurrentThread(nullptr); | |
| 533 } | |
| 534 } | |
| 535 | |
| 536 AutoSocketServerThread::AutoSocketServerThread(SocketServer* ss) | |
| 537 : Thread(ss) { | |
| 538 old_thread_ = ThreadManager::Instance()->CurrentThread(); | |
| 539 rtc::ThreadManager::Instance()->SetCurrentThread(this); | |
| 540 if (old_thread_) { | |
| 541 MessageQueueManager::Remove(old_thread_); | |
| 542 } | |
| 543 } | |
| 544 | |
| 545 AutoSocketServerThread::~AutoSocketServerThread() { | |
| 546 RTC_DCHECK(ThreadManager::Instance()->CurrentThread() == this); | |
| 547 // Some tests post destroy messages to this thread. To avoid memory | |
| 548 // leaks, we have to process those messages. In particular | |
| 549 // P2PTransportChannelPingTest, relying on the message posted in | |
| 550 // cricket::Connection::Destroy. | |
| 551 ProcessMessages(0); | |
| 552 rtc::ThreadManager::Instance()->SetCurrentThread(old_thread_); | |
| 553 if (old_thread_) { | |
| 554 MessageQueueManager::Add(old_thread_); | |
| 555 } | |
| 556 } | |
| 557 | |
| 558 } // namespace rtc | |
| OLD | NEW |