OLD | NEW |
| (Empty) |
1 /* | |
2 * Copyright 2004 The WebRTC Project Authors. All rights reserved. | |
3 * | |
4 * Use of this source code is governed by a BSD-style license | |
5 * that can be found in the LICENSE file in the root of the source | |
6 * tree. An additional intellectual property rights grant can be found | |
7 * in the file PATENTS. All contributing project authors may | |
8 * be found in the AUTHORS file in the root of the source tree. | |
9 */ | |
10 | |
11 #include "webrtc/base/thread.h" | |
12 | |
13 #if defined(WEBRTC_WIN) | |
14 #include <comdef.h> | |
15 #elif defined(WEBRTC_POSIX) | |
16 #include <time.h> | |
17 #endif | |
18 | |
19 #include "webrtc/base/checks.h" | |
20 #include "webrtc/base/logging.h" | |
21 #include "webrtc/base/nullsocketserver.h" | |
22 #include "webrtc/base/platform_thread.h" | |
23 #include "webrtc/base/stringutils.h" | |
24 #include "webrtc/base/timeutils.h" | |
25 #include "webrtc/base/trace_event.h" | |
26 | |
27 namespace rtc { | |
28 | |
29 ThreadManager* ThreadManager::Instance() { | |
30 RTC_DEFINE_STATIC_LOCAL(ThreadManager, thread_manager, ()); | |
31 return &thread_manager; | |
32 } | |
33 | |
34 ThreadManager::~ThreadManager() { | |
35 // By above RTC_DEFINE_STATIC_LOCAL. | |
36 RTC_NOTREACHED() << "ThreadManager should never be destructed."; | |
37 } | |
38 | |
39 // static | |
40 Thread* Thread::Current() { | |
41 ThreadManager* manager = ThreadManager::Instance(); | |
42 Thread* thread = manager->CurrentThread(); | |
43 | |
44 #ifndef NO_MAIN_THREAD_WRAPPING | |
45 // Only autowrap the thread which instantiated the ThreadManager. | |
46 if (!thread && manager->IsMainThread()) { | |
47 thread = new Thread(); | |
48 thread->WrapCurrentWithThreadManager(manager, true); | |
49 } | |
50 #endif | |
51 | |
52 return thread; | |
53 } | |
54 | |
55 #if defined(WEBRTC_POSIX) | |
56 #if !defined(WEBRTC_MAC) | |
57 ThreadManager::ThreadManager() { | |
58 main_thread_ref_ = CurrentThreadRef(); | |
59 pthread_key_create(&key_, nullptr); | |
60 } | |
61 #endif | |
62 | |
63 Thread *ThreadManager::CurrentThread() { | |
64 return static_cast<Thread *>(pthread_getspecific(key_)); | |
65 } | |
66 | |
67 void ThreadManager::SetCurrentThread(Thread *thread) { | |
68 pthread_setspecific(key_, thread); | |
69 } | |
70 #endif | |
71 | |
72 #if defined(WEBRTC_WIN) | |
73 ThreadManager::ThreadManager() { | |
74 main_thread_ref_ = CurrentThreadRef(); | |
75 key_ = TlsAlloc(); | |
76 } | |
77 | |
78 Thread *ThreadManager::CurrentThread() { | |
79 return static_cast<Thread *>(TlsGetValue(key_)); | |
80 } | |
81 | |
82 void ThreadManager::SetCurrentThread(Thread *thread) { | |
83 TlsSetValue(key_, thread); | |
84 } | |
85 #endif | |
86 | |
87 Thread *ThreadManager::WrapCurrentThread() { | |
88 Thread* result = CurrentThread(); | |
89 if (nullptr == result) { | |
90 result = new Thread(); | |
91 result->WrapCurrentWithThreadManager(this, true); | |
92 } | |
93 return result; | |
94 } | |
95 | |
96 void ThreadManager::UnwrapCurrentThread() { | |
97 Thread* t = CurrentThread(); | |
98 if (t && !(t->IsOwned())) { | |
99 t->UnwrapCurrent(); | |
100 delete t; | |
101 } | |
102 } | |
103 | |
104 bool ThreadManager::IsMainThread() { | |
105 return IsThreadRefEqual(CurrentThreadRef(), main_thread_ref_); | |
106 } | |
107 | |
108 Thread::ScopedDisallowBlockingCalls::ScopedDisallowBlockingCalls() | |
109 : thread_(Thread::Current()), | |
110 previous_state_(thread_->SetAllowBlockingCalls(false)) { | |
111 } | |
112 | |
113 Thread::ScopedDisallowBlockingCalls::~ScopedDisallowBlockingCalls() { | |
114 RTC_DCHECK(thread_->IsCurrent()); | |
115 thread_->SetAllowBlockingCalls(previous_state_); | |
116 } | |
117 | |
118 Thread::Thread() : Thread(SocketServer::CreateDefault()) {} | |
119 | |
120 Thread::Thread(SocketServer* ss) | |
121 : MessageQueue(ss, false), | |
122 running_(true, false), | |
123 #if defined(WEBRTC_WIN) | |
124 thread_(nullptr), | |
125 thread_id_(0), | |
126 #endif | |
127 owned_(true), | |
128 blocking_calls_allowed_(true) { | |
129 SetName("Thread", this); // default name | |
130 DoInit(); | |
131 } | |
132 | |
133 Thread::Thread(std::unique_ptr<SocketServer> ss) | |
134 : MessageQueue(std::move(ss), false), | |
135 running_(true, false), | |
136 #if defined(WEBRTC_WIN) | |
137 thread_(nullptr), | |
138 thread_id_(0), | |
139 #endif | |
140 owned_(true), | |
141 blocking_calls_allowed_(true) { | |
142 SetName("Thread", this); // default name | |
143 DoInit(); | |
144 } | |
145 | |
146 Thread::~Thread() { | |
147 Stop(); | |
148 DoDestroy(); | |
149 } | |
150 | |
151 bool Thread::IsCurrent() const { | |
152 return ThreadManager::Instance()->CurrentThread() == this; | |
153 } | |
154 | |
155 std::unique_ptr<Thread> Thread::CreateWithSocketServer() { | |
156 return std::unique_ptr<Thread>(new Thread(SocketServer::CreateDefault())); | |
157 } | |
158 | |
159 std::unique_ptr<Thread> Thread::Create() { | |
160 return std::unique_ptr<Thread>( | |
161 new Thread(std::unique_ptr<SocketServer>(new NullSocketServer()))); | |
162 } | |
163 | |
164 bool Thread::SleepMs(int milliseconds) { | |
165 AssertBlockingIsAllowedOnCurrentThread(); | |
166 | |
167 #if defined(WEBRTC_WIN) | |
168 ::Sleep(milliseconds); | |
169 return true; | |
170 #else | |
171 // POSIX has both a usleep() and a nanosleep(), but the former is deprecated, | |
172 // so we use nanosleep() even though it has greater precision than necessary. | |
173 struct timespec ts; | |
174 ts.tv_sec = milliseconds / 1000; | |
175 ts.tv_nsec = (milliseconds % 1000) * 1000000; | |
176 int ret = nanosleep(&ts, nullptr); | |
177 if (ret != 0) { | |
178 LOG_ERR(LS_WARNING) << "nanosleep() returning early"; | |
179 return false; | |
180 } | |
181 return true; | |
182 #endif | |
183 } | |
184 | |
185 bool Thread::SetName(const std::string& name, const void* obj) { | |
186 if (running()) return false; | |
187 name_ = name; | |
188 if (obj) { | |
189 char buf[16]; | |
190 sprintfn(buf, sizeof(buf), " 0x%p", obj); | |
191 name_ += buf; | |
192 } | |
193 return true; | |
194 } | |
195 | |
196 bool Thread::Start(Runnable* runnable) { | |
197 RTC_DCHECK(owned_); | |
198 if (!owned_) return false; | |
199 RTC_DCHECK(!running()); | |
200 if (running()) return false; | |
201 | |
202 Restart(); // reset IsQuitting() if the thread is being restarted | |
203 | |
204 // Make sure that ThreadManager is created on the main thread before | |
205 // we start a new thread. | |
206 ThreadManager::Instance(); | |
207 | |
208 ThreadInit* init = new ThreadInit; | |
209 init->thread = this; | |
210 init->runnable = runnable; | |
211 #if defined(WEBRTC_WIN) | |
212 thread_ = CreateThread(nullptr, 0, PreRun, init, 0, &thread_id_); | |
213 if (thread_) { | |
214 running_.Set(); | |
215 } else { | |
216 return false; | |
217 } | |
218 #elif defined(WEBRTC_POSIX) | |
219 pthread_attr_t attr; | |
220 pthread_attr_init(&attr); | |
221 | |
222 int error_code = pthread_create(&thread_, &attr, PreRun, init); | |
223 if (0 != error_code) { | |
224 LOG(LS_ERROR) << "Unable to create pthread, error " << error_code; | |
225 return false; | |
226 } | |
227 running_.Set(); | |
228 #endif | |
229 return true; | |
230 } | |
231 | |
232 bool Thread::WrapCurrent() { | |
233 return WrapCurrentWithThreadManager(ThreadManager::Instance(), true); | |
234 } | |
235 | |
236 void Thread::UnwrapCurrent() { | |
237 // Clears the platform-specific thread-specific storage. | |
238 ThreadManager::Instance()->SetCurrentThread(nullptr); | |
239 #if defined(WEBRTC_WIN) | |
240 if (thread_ != nullptr) { | |
241 if (!CloseHandle(thread_)) { | |
242 LOG_GLE(LS_ERROR) << "When unwrapping thread, failed to close handle."; | |
243 } | |
244 thread_ = nullptr; | |
245 } | |
246 #endif | |
247 running_.Reset(); | |
248 } | |
249 | |
250 void Thread::SafeWrapCurrent() { | |
251 WrapCurrentWithThreadManager(ThreadManager::Instance(), false); | |
252 } | |
253 | |
254 void Thread::Join() { | |
255 if (running()) { | |
256 RTC_DCHECK(!IsCurrent()); | |
257 if (Current() && !Current()->blocking_calls_allowed_) { | |
258 LOG(LS_WARNING) << "Waiting for the thread to join, " | |
259 << "but blocking calls have been disallowed"; | |
260 } | |
261 | |
262 #if defined(WEBRTC_WIN) | |
263 RTC_DCHECK(thread_ != nullptr); | |
264 WaitForSingleObject(thread_, INFINITE); | |
265 CloseHandle(thread_); | |
266 thread_ = nullptr; | |
267 thread_id_ = 0; | |
268 #elif defined(WEBRTC_POSIX) | |
269 void *pv; | |
270 pthread_join(thread_, &pv); | |
271 #endif | |
272 running_.Reset(); | |
273 } | |
274 } | |
275 | |
276 bool Thread::SetAllowBlockingCalls(bool allow) { | |
277 RTC_DCHECK(IsCurrent()); | |
278 bool previous = blocking_calls_allowed_; | |
279 blocking_calls_allowed_ = allow; | |
280 return previous; | |
281 } | |
282 | |
283 // static | |
284 void Thread::AssertBlockingIsAllowedOnCurrentThread() { | |
285 #if !defined(NDEBUG) | |
286 Thread* current = Thread::Current(); | |
287 RTC_DCHECK(!current || current->blocking_calls_allowed_); | |
288 #endif | |
289 } | |
290 | |
291 // static | |
292 #if !defined(WEBRTC_MAC) | |
293 #if defined(WEBRTC_WIN) | |
294 DWORD WINAPI Thread::PreRun(LPVOID pv) { | |
295 #else | |
296 void* Thread::PreRun(void* pv) { | |
297 #endif | |
298 ThreadInit* init = static_cast<ThreadInit*>(pv); | |
299 ThreadManager::Instance()->SetCurrentThread(init->thread); | |
300 rtc::SetCurrentThreadName(init->thread->name_.c_str()); | |
301 if (init->runnable) { | |
302 init->runnable->Run(init->thread); | |
303 } else { | |
304 init->thread->Run(); | |
305 } | |
306 delete init; | |
307 #ifdef WEBRTC_WIN | |
308 return 0; | |
309 #else | |
310 return nullptr; | |
311 #endif | |
312 } | |
313 #endif | |
314 | |
315 void Thread::Run() { | |
316 ProcessMessages(kForever); | |
317 } | |
318 | |
319 bool Thread::IsOwned() { | |
320 return owned_; | |
321 } | |
322 | |
323 void Thread::Stop() { | |
324 MessageQueue::Quit(); | |
325 Join(); | |
326 } | |
327 | |
328 void Thread::Send(const Location& posted_from, | |
329 MessageHandler* phandler, | |
330 uint32_t id, | |
331 MessageData* pdata) { | |
332 if (IsQuitting()) | |
333 return; | |
334 | |
335 // Sent messages are sent to the MessageHandler directly, in the context | |
336 // of "thread", like Win32 SendMessage. If in the right context, | |
337 // call the handler directly. | |
338 Message msg; | |
339 msg.posted_from = posted_from; | |
340 msg.phandler = phandler; | |
341 msg.message_id = id; | |
342 msg.pdata = pdata; | |
343 if (IsCurrent()) { | |
344 phandler->OnMessage(&msg); | |
345 return; | |
346 } | |
347 | |
348 AssertBlockingIsAllowedOnCurrentThread(); | |
349 | |
350 AutoThread thread; | |
351 Thread *current_thread = Thread::Current(); | |
352 RTC_DCHECK(current_thread != nullptr); // AutoThread ensures this | |
353 | |
354 bool ready = false; | |
355 { | |
356 CritScope cs(&crit_); | |
357 _SendMessage smsg; | |
358 smsg.thread = current_thread; | |
359 smsg.msg = msg; | |
360 smsg.ready = &ready; | |
361 sendlist_.push_back(smsg); | |
362 } | |
363 | |
364 // Wait for a reply | |
365 WakeUpSocketServer(); | |
366 | |
367 bool waited = false; | |
368 crit_.Enter(); | |
369 while (!ready) { | |
370 crit_.Leave(); | |
371 // We need to limit "ReceiveSends" to |this| thread to avoid an arbitrary | |
372 // thread invoking calls on the current thread. | |
373 current_thread->ReceiveSendsFromThread(this); | |
374 current_thread->socketserver()->Wait(kForever, false); | |
375 waited = true; | |
376 crit_.Enter(); | |
377 } | |
378 crit_.Leave(); | |
379 | |
380 // Our Wait loop above may have consumed some WakeUp events for this | |
381 // MessageQueue, that weren't relevant to this Send. Losing these WakeUps can | |
382 // cause problems for some SocketServers. | |
383 // | |
384 // Concrete example: | |
385 // Win32SocketServer on thread A calls Send on thread B. While processing the | |
386 // message, thread B Posts a message to A. We consume the wakeup for that | |
387 // Post while waiting for the Send to complete, which means that when we exit | |
388 // this loop, we need to issue another WakeUp, or else the Posted message | |
389 // won't be processed in a timely manner. | |
390 | |
391 if (waited) { | |
392 current_thread->socketserver()->WakeUp(); | |
393 } | |
394 } | |
395 | |
396 void Thread::ReceiveSends() { | |
397 ReceiveSendsFromThread(nullptr); | |
398 } | |
399 | |
400 void Thread::ReceiveSendsFromThread(const Thread* source) { | |
401 // Receive a sent message. Cleanup scenarios: | |
402 // - thread sending exits: We don't allow this, since thread can exit | |
403 // only via Join, so Send must complete. | |
404 // - thread receiving exits: Wakeup/set ready in Thread::Clear() | |
405 // - object target cleared: Wakeup/set ready in Thread::Clear() | |
406 _SendMessage smsg; | |
407 | |
408 crit_.Enter(); | |
409 while (PopSendMessageFromThread(source, &smsg)) { | |
410 crit_.Leave(); | |
411 | |
412 smsg.msg.phandler->OnMessage(&smsg.msg); | |
413 | |
414 crit_.Enter(); | |
415 *smsg.ready = true; | |
416 smsg.thread->socketserver()->WakeUp(); | |
417 } | |
418 crit_.Leave(); | |
419 } | |
420 | |
421 bool Thread::PopSendMessageFromThread(const Thread* source, _SendMessage* msg) { | |
422 for (std::list<_SendMessage>::iterator it = sendlist_.begin(); | |
423 it != sendlist_.end(); ++it) { | |
424 if (it->thread == source || source == nullptr) { | |
425 *msg = *it; | |
426 sendlist_.erase(it); | |
427 return true; | |
428 } | |
429 } | |
430 return false; | |
431 } | |
432 | |
433 void Thread::InvokeInternal(const Location& posted_from, | |
434 MessageHandler* handler) { | |
435 TRACE_EVENT2("webrtc", "Thread::Invoke", "src_file_and_line", | |
436 posted_from.file_and_line(), "src_func", | |
437 posted_from.function_name()); | |
438 Send(posted_from, handler); | |
439 } | |
440 | |
441 void Thread::Clear(MessageHandler* phandler, | |
442 uint32_t id, | |
443 MessageList* removed) { | |
444 CritScope cs(&crit_); | |
445 | |
446 // Remove messages on sendlist_ with phandler | |
447 // Object target cleared: remove from send list, wakeup/set ready | |
448 // if sender not null. | |
449 | |
450 std::list<_SendMessage>::iterator iter = sendlist_.begin(); | |
451 while (iter != sendlist_.end()) { | |
452 _SendMessage smsg = *iter; | |
453 if (smsg.msg.Match(phandler, id)) { | |
454 if (removed) { | |
455 removed->push_back(smsg.msg); | |
456 } else { | |
457 delete smsg.msg.pdata; | |
458 } | |
459 iter = sendlist_.erase(iter); | |
460 *smsg.ready = true; | |
461 smsg.thread->socketserver()->WakeUp(); | |
462 continue; | |
463 } | |
464 ++iter; | |
465 } | |
466 | |
467 MessageQueue::Clear(phandler, id, removed); | |
468 } | |
469 | |
470 #if !defined(WEBRTC_MAC) | |
471 // Note that these methods have a separate implementation for mac and ios | |
472 // defined in webrtc/base/thread_darwin.mm. | |
473 bool Thread::ProcessMessages(int cmsLoop) { | |
474 // Using ProcessMessages with a custom clock for testing and a time greater | |
475 // than 0 doesn't work, since it's not guaranteed to advance the custom | |
476 // clock's time, and may get stuck in an infinite loop. | |
477 RTC_DCHECK(GetClockForTesting() == nullptr || cmsLoop == 0 || | |
478 cmsLoop == kForever); | |
479 int64_t msEnd = (kForever == cmsLoop) ? 0 : TimeAfter(cmsLoop); | |
480 int cmsNext = cmsLoop; | |
481 | |
482 while (true) { | |
483 Message msg; | |
484 if (!Get(&msg, cmsNext)) | |
485 return !IsQuitting(); | |
486 Dispatch(&msg); | |
487 | |
488 if (cmsLoop != kForever) { | |
489 cmsNext = static_cast<int>(TimeUntil(msEnd)); | |
490 if (cmsNext < 0) | |
491 return true; | |
492 } | |
493 } | |
494 } | |
495 #endif | |
496 | |
497 bool Thread::WrapCurrentWithThreadManager(ThreadManager* thread_manager, | |
498 bool need_synchronize_access) { | |
499 if (running()) | |
500 return false; | |
501 | |
502 #if defined(WEBRTC_WIN) | |
503 if (need_synchronize_access) { | |
504 // We explicitly ask for no rights other than synchronization. | |
505 // This gives us the best chance of succeeding. | |
506 thread_ = OpenThread(SYNCHRONIZE, FALSE, GetCurrentThreadId()); | |
507 if (!thread_) { | |
508 LOG_GLE(LS_ERROR) << "Unable to get handle to thread."; | |
509 return false; | |
510 } | |
511 thread_id_ = GetCurrentThreadId(); | |
512 } | |
513 #elif defined(WEBRTC_POSIX) | |
514 thread_ = pthread_self(); | |
515 #endif | |
516 | |
517 owned_ = false; | |
518 running_.Set(); | |
519 thread_manager->SetCurrentThread(this); | |
520 return true; | |
521 } | |
522 | |
523 AutoThread::AutoThread() { | |
524 if (!ThreadManager::Instance()->CurrentThread()) { | |
525 ThreadManager::Instance()->SetCurrentThread(this); | |
526 } | |
527 } | |
528 | |
529 AutoThread::~AutoThread() { | |
530 Stop(); | |
531 if (ThreadManager::Instance()->CurrentThread() == this) { | |
532 ThreadManager::Instance()->SetCurrentThread(nullptr); | |
533 } | |
534 } | |
535 | |
536 AutoSocketServerThread::AutoSocketServerThread(SocketServer* ss) | |
537 : Thread(ss) { | |
538 old_thread_ = ThreadManager::Instance()->CurrentThread(); | |
539 rtc::ThreadManager::Instance()->SetCurrentThread(this); | |
540 if (old_thread_) { | |
541 MessageQueueManager::Remove(old_thread_); | |
542 } | |
543 } | |
544 | |
545 AutoSocketServerThread::~AutoSocketServerThread() { | |
546 RTC_DCHECK(ThreadManager::Instance()->CurrentThread() == this); | |
547 // Some tests post destroy messages to this thread. To avoid memory | |
548 // leaks, we have to process those messages. In particular | |
549 // P2PTransportChannelPingTest, relying on the message posted in | |
550 // cricket::Connection::Destroy. | |
551 ProcessMessages(0); | |
552 rtc::ThreadManager::Instance()->SetCurrentThread(old_thread_); | |
553 if (old_thread_) { | |
554 MessageQueueManager::Add(old_thread_); | |
555 } | |
556 } | |
557 | |
558 } // namespace rtc | |
OLD | NEW |