OLD | NEW |
| (Empty) |
1 /* | |
2 * Copyright (c) 2012 The WebRTC project authors. All Rights Reserved. | |
3 * | |
4 * Use of this source code is governed by a BSD-style license | |
5 * that can be found in the LICENSE file in the root of the source | |
6 * tree. An additional intellectual property rights grant can be found | |
7 * in the file PATENTS. All contributing project authors may | |
8 * be found in the AUTHORS file in the root of the source tree. | |
9 */ | |
10 | |
11 #include "webrtc/test/channel_transport/udp_socket2_manager_win.h" | |
12 | |
13 #include <assert.h> | |
14 #include <stdio.h> | |
15 | |
16 #include "webrtc/system_wrappers/include/aligned_malloc.h" | |
17 #include "webrtc/test/channel_transport/udp_socket2_win.h" | |
18 | |
19 namespace webrtc { | |
20 namespace test { | |
21 | |
22 uint32_t UdpSocket2ManagerWindows::_numOfActiveManagers = 0; | |
23 bool UdpSocket2ManagerWindows::_wsaInit = false; | |
24 | |
25 UdpSocket2ManagerWindows::UdpSocket2ManagerWindows() | |
26 : UdpSocketManager(), | |
27 _id(-1), | |
28 _stopped(false), | |
29 _init(false), | |
30 _pCrit(CriticalSectionWrapper::CreateCriticalSection()), | |
31 _ioCompletionHandle(NULL), | |
32 _numActiveSockets(0), | |
33 _event(EventWrapper::Create()) | |
34 { | |
35 _managerNumber = _numOfActiveManagers++; | |
36 | |
37 if(_numOfActiveManagers == 1) | |
38 { | |
39 WORD wVersionRequested = MAKEWORD(2, 2); | |
40 WSADATA wsaData; | |
41 _wsaInit = WSAStartup(wVersionRequested, &wsaData) == 0; | |
42 // TODO (hellner): seems safer to use RAII for this. E.g. what happens | |
43 // if a UdpSocket2ManagerWindows() created and destroyed | |
44 // without being initialized. | |
45 } | |
46 } | |
47 | |
48 UdpSocket2ManagerWindows::~UdpSocket2ManagerWindows() | |
49 { | |
50 WEBRTC_TRACE(kTraceDebug, kTraceTransport, _id, | |
51 "UdpSocket2ManagerWindows(%d)::~UdpSocket2ManagerWindows()", | |
52 _managerNumber); | |
53 | |
54 if(_init) | |
55 { | |
56 _pCrit->Enter(); | |
57 if(_numActiveSockets) | |
58 { | |
59 _pCrit->Leave(); | |
60 _event->Wait(INFINITE); | |
61 } | |
62 else | |
63 { | |
64 _pCrit->Leave(); | |
65 } | |
66 StopWorkerThreads(); | |
67 | |
68 for (WorkerList::iterator iter = _workerThreadsList.begin(); | |
69 iter != _workerThreadsList.end(); ++iter) { | |
70 delete *iter; | |
71 } | |
72 _workerThreadsList.clear(); | |
73 _ioContextPool.Free(); | |
74 | |
75 _numOfActiveManagers--; | |
76 if(_ioCompletionHandle) | |
77 { | |
78 CloseHandle(_ioCompletionHandle); | |
79 } | |
80 if (_numOfActiveManagers == 0) | |
81 { | |
82 if(_wsaInit) | |
83 { | |
84 WSACleanup(); | |
85 } | |
86 } | |
87 } | |
88 if(_pCrit) | |
89 { | |
90 delete _pCrit; | |
91 } | |
92 if(_event) | |
93 { | |
94 delete _event; | |
95 } | |
96 } | |
97 | |
98 bool UdpSocket2ManagerWindows::Init(int32_t id, | |
99 uint8_t& numOfWorkThreads) { | |
100 CriticalSectionScoped cs(_pCrit); | |
101 if ((_id != -1) || (_numOfWorkThreads != 0)) { | |
102 assert(_id != -1); | |
103 assert(_numOfWorkThreads != 0); | |
104 return false; | |
105 } | |
106 _id = id; | |
107 _numOfWorkThreads = numOfWorkThreads; | |
108 return true; | |
109 } | |
110 | |
111 bool UdpSocket2ManagerWindows::Start() | |
112 { | |
113 WEBRTC_TRACE(kTraceDebug, kTraceTransport, _id, | |
114 "UdpSocket2ManagerWindows(%d)::Start()",_managerNumber); | |
115 if(!_init) | |
116 { | |
117 StartWorkerThreads(); | |
118 } | |
119 | |
120 if(!_init) | |
121 { | |
122 return false; | |
123 } | |
124 _pCrit->Enter(); | |
125 // Start worker threads. | |
126 _stopped = false; | |
127 int32_t error = 0; | |
128 for (WorkerList::iterator iter = _workerThreadsList.begin(); | |
129 iter != _workerThreadsList.end() && !error; ++iter) { | |
130 if(!(*iter)->Start()) | |
131 error = 1; | |
132 } | |
133 if(error) | |
134 { | |
135 WEBRTC_TRACE( | |
136 kTraceError, | |
137 kTraceTransport, | |
138 _id, | |
139 "UdpSocket2ManagerWindows(%d)::Start() error starting worker\ | |
140 threads", | |
141 _managerNumber); | |
142 _pCrit->Leave(); | |
143 return false; | |
144 } | |
145 _pCrit->Leave(); | |
146 return true; | |
147 } | |
148 | |
149 bool UdpSocket2ManagerWindows::StartWorkerThreads() | |
150 { | |
151 if(!_init) | |
152 { | |
153 _pCrit->Enter(); | |
154 | |
155 _ioCompletionHandle = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, | |
156 0, 0); | |
157 if(_ioCompletionHandle == NULL) | |
158 { | |
159 int32_t error = GetLastError(); | |
160 WEBRTC_TRACE( | |
161 kTraceError, | |
162 kTraceTransport, | |
163 _id, | |
164 "UdpSocket2ManagerWindows(%d)::StartWorkerThreads()" | |
165 "_ioCompletioHandle == NULL: error:%d", | |
166 _managerNumber,error); | |
167 _pCrit->Leave(); | |
168 return false; | |
169 } | |
170 | |
171 // Create worker threads. | |
172 uint32_t i = 0; | |
173 bool error = false; | |
174 while(i < _numOfWorkThreads && !error) | |
175 { | |
176 UdpSocket2WorkerWindows* pWorker = | |
177 new UdpSocket2WorkerWindows(_ioCompletionHandle); | |
178 if(pWorker->Init() != 0) | |
179 { | |
180 error = true; | |
181 delete pWorker; | |
182 break; | |
183 } | |
184 _workerThreadsList.push_front(pWorker); | |
185 i++; | |
186 } | |
187 if(error) | |
188 { | |
189 WEBRTC_TRACE( | |
190 kTraceError, | |
191 kTraceTransport, | |
192 _id, | |
193 "UdpSocket2ManagerWindows(%d)::StartWorkerThreads() error " | |
194 "creating work threads", | |
195 _managerNumber); | |
196 // Delete worker threads. | |
197 for (WorkerList::iterator iter = _workerThreadsList.begin(); | |
198 iter != _workerThreadsList.end(); ++iter) { | |
199 delete *iter; | |
200 } | |
201 _workerThreadsList.clear(); | |
202 _pCrit->Leave(); | |
203 return false; | |
204 } | |
205 if(_ioContextPool.Init()) | |
206 { | |
207 WEBRTC_TRACE( | |
208 kTraceError, | |
209 kTraceTransport, | |
210 _id, | |
211 "UdpSocket2ManagerWindows(%d)::StartWorkerThreads() error " | |
212 "initiating _ioContextPool", | |
213 _managerNumber); | |
214 _pCrit->Leave(); | |
215 return false; | |
216 } | |
217 _init = true; | |
218 WEBRTC_TRACE( | |
219 kTraceDebug, | |
220 kTraceTransport, | |
221 _id, | |
222 "UdpSocket2ManagerWindows::StartWorkerThreads %d number of work " | |
223 "threads created and initialized", | |
224 _numOfWorkThreads); | |
225 _pCrit->Leave(); | |
226 } | |
227 return true; | |
228 } | |
229 | |
230 bool UdpSocket2ManagerWindows::Stop() | |
231 { | |
232 WEBRTC_TRACE(kTraceDebug, kTraceTransport, _id, | |
233 "UdpSocket2ManagerWindows(%d)::Stop()",_managerNumber); | |
234 | |
235 if(!_init) | |
236 { | |
237 return false; | |
238 } | |
239 _pCrit->Enter(); | |
240 _stopped = true; | |
241 if(_numActiveSockets) | |
242 { | |
243 WEBRTC_TRACE( | |
244 kTraceError, | |
245 kTraceTransport, | |
246 _id, | |
247 "UdpSocket2ManagerWindows(%d)::Stop() there is still active\ | |
248 sockets", | |
249 _managerNumber); | |
250 _pCrit->Leave(); | |
251 return false; | |
252 } | |
253 // No active sockets. Stop all worker threads. | |
254 bool result = StopWorkerThreads(); | |
255 _pCrit->Leave(); | |
256 return result; | |
257 } | |
258 | |
259 bool UdpSocket2ManagerWindows::StopWorkerThreads() | |
260 { | |
261 int32_t error = 0; | |
262 WEBRTC_TRACE( | |
263 kTraceDebug, | |
264 kTraceTransport, | |
265 _id, | |
266 "UdpSocket2ManagerWindows(%d)::StopWorkerThreads() Worker\ | |
267 threadsStoped, numActicve Sockets=%d", | |
268 _managerNumber, | |
269 _numActiveSockets); | |
270 | |
271 // Release all threads waiting for GetQueuedCompletionStatus(..). | |
272 if(_ioCompletionHandle) | |
273 { | |
274 uint32_t i = 0; | |
275 for(i = 0; i < _workerThreadsList.size(); i++) | |
276 { | |
277 PostQueuedCompletionStatus(_ioCompletionHandle, 0 ,0 , NULL); | |
278 } | |
279 } | |
280 for (WorkerList::iterator iter = _workerThreadsList.begin(); | |
281 iter != _workerThreadsList.end(); ++iter) { | |
282 if((*iter)->Stop() == false) | |
283 { | |
284 error = -1; | |
285 WEBRTC_TRACE(kTraceWarning, kTraceTransport, -1, | |
286 "failed to stop worker thread"); | |
287 } | |
288 } | |
289 | |
290 if(error) | |
291 { | |
292 WEBRTC_TRACE( | |
293 kTraceError, | |
294 kTraceTransport, | |
295 _id, | |
296 "UdpSocket2ManagerWindows(%d)::StopWorkerThreads() error stopping\ | |
297 worker threads", | |
298 _managerNumber); | |
299 return false; | |
300 } | |
301 return true; | |
302 } | |
303 | |
304 bool UdpSocket2ManagerWindows::AddSocketPrv(UdpSocket2Windows* s) | |
305 { | |
306 WEBRTC_TRACE(kTraceDebug, kTraceTransport, _id, | |
307 "UdpSocket2ManagerWindows(%d)::AddSocketPrv()",_managerNumber); | |
308 if(!_init) | |
309 { | |
310 WEBRTC_TRACE( | |
311 kTraceError, | |
312 kTraceTransport, | |
313 _id, | |
314 "UdpSocket2ManagerWindows(%d)::AddSocketPrv() manager not\ | |
315 initialized", | |
316 _managerNumber); | |
317 return false; | |
318 } | |
319 _pCrit->Enter(); | |
320 if(s == NULL) | |
321 { | |
322 WEBRTC_TRACE( | |
323 kTraceError, | |
324 kTraceTransport, | |
325 _id, | |
326 "UdpSocket2ManagerWindows(%d)::AddSocketPrv() socket == NULL", | |
327 _managerNumber); | |
328 _pCrit->Leave(); | |
329 return false; | |
330 } | |
331 if(s->GetFd() == NULL || s->GetFd() == INVALID_SOCKET) | |
332 { | |
333 WEBRTC_TRACE( | |
334 kTraceError, | |
335 kTraceTransport, | |
336 _id, | |
337 "UdpSocket2ManagerWindows(%d)::AddSocketPrv() socket->GetFd() ==\ | |
338 %d", | |
339 _managerNumber, | |
340 (int32_t)s->GetFd()); | |
341 _pCrit->Leave(); | |
342 return false; | |
343 | |
344 } | |
345 _ioCompletionHandle = CreateIoCompletionPort((HANDLE)s->GetFd(), | |
346 _ioCompletionHandle, | |
347 (ULONG_PTR)(s), 0); | |
348 if(_ioCompletionHandle == NULL) | |
349 { | |
350 int32_t error = GetLastError(); | |
351 WEBRTC_TRACE( | |
352 kTraceError, | |
353 kTraceTransport, | |
354 _id, | |
355 "UdpSocket2ManagerWindows(%d)::AddSocketPrv() Error adding to IO\ | |
356 completion: %d", | |
357 _managerNumber, | |
358 error); | |
359 _pCrit->Leave(); | |
360 return false; | |
361 } | |
362 _numActiveSockets++; | |
363 _pCrit->Leave(); | |
364 return true; | |
365 } | |
366 bool UdpSocket2ManagerWindows::RemoveSocketPrv(UdpSocket2Windows* s) | |
367 { | |
368 if(!_init) | |
369 { | |
370 return false; | |
371 } | |
372 _pCrit->Enter(); | |
373 _numActiveSockets--; | |
374 if(_numActiveSockets == 0) | |
375 { | |
376 _event->Set(); | |
377 } | |
378 _pCrit->Leave(); | |
379 return true; | |
380 } | |
381 | |
382 PerIoContext* UdpSocket2ManagerWindows::PopIoContext() | |
383 { | |
384 if(!_init) | |
385 { | |
386 return NULL; | |
387 } | |
388 | |
389 PerIoContext* pIoC = NULL; | |
390 if(!_stopped) | |
391 { | |
392 pIoC = _ioContextPool.PopIoContext(); | |
393 }else | |
394 { | |
395 WEBRTC_TRACE( | |
396 kTraceError, | |
397 kTraceTransport, | |
398 _id, | |
399 "UdpSocket2ManagerWindows(%d)::PopIoContext() Manager Not started", | |
400 _managerNumber); | |
401 } | |
402 return pIoC; | |
403 } | |
404 | |
405 int32_t UdpSocket2ManagerWindows::PushIoContext(PerIoContext* pIoContext) | |
406 { | |
407 return _ioContextPool.PushIoContext(pIoContext); | |
408 } | |
409 | |
410 IoContextPool::IoContextPool() | |
411 : _pListHead(NULL), | |
412 _init(false), | |
413 _size(0), | |
414 _inUse(0) | |
415 { | |
416 } | |
417 | |
418 IoContextPool::~IoContextPool() | |
419 { | |
420 Free(); | |
421 assert(_size.Value() == 0); | |
422 AlignedFree(_pListHead); | |
423 } | |
424 | |
425 int32_t IoContextPool::Init(uint32_t /*increaseSize*/) | |
426 { | |
427 if(_init) | |
428 { | |
429 return 0; | |
430 } | |
431 | |
432 _pListHead = (PSLIST_HEADER)AlignedMalloc(sizeof(SLIST_HEADER), | |
433 MEMORY_ALLOCATION_ALIGNMENT); | |
434 if(_pListHead == NULL) | |
435 { | |
436 return -1; | |
437 } | |
438 InitializeSListHead(_pListHead); | |
439 _init = true; | |
440 return 0; | |
441 } | |
442 | |
443 PerIoContext* IoContextPool::PopIoContext() | |
444 { | |
445 if(!_init) | |
446 { | |
447 return NULL; | |
448 } | |
449 | |
450 PSLIST_ENTRY pListEntry = InterlockedPopEntrySList(_pListHead); | |
451 if(pListEntry == NULL) | |
452 { | |
453 IoContextPoolItem* item = (IoContextPoolItem*) | |
454 AlignedMalloc( | |
455 sizeof(IoContextPoolItem), | |
456 MEMORY_ALLOCATION_ALIGNMENT); | |
457 if(item == NULL) | |
458 { | |
459 return NULL; | |
460 } | |
461 memset(&item->payload.ioContext,0,sizeof(PerIoContext)); | |
462 item->payload.base = item; | |
463 pListEntry = &(item->itemEntry); | |
464 ++_size; | |
465 } | |
466 ++_inUse; | |
467 return &((IoContextPoolItem*)pListEntry)->payload.ioContext; | |
468 } | |
469 | |
470 int32_t IoContextPool::PushIoContext(PerIoContext* pIoContext) | |
471 { | |
472 // TODO (hellner): Overlapped IO should be completed at this point. Perhaps | |
473 // add an assert? | |
474 const bool overlappedIOCompleted = HasOverlappedIoCompleted( | |
475 (LPOVERLAPPED)pIoContext); | |
476 | |
477 IoContextPoolItem* item = ((IoContextPoolItemPayload*)pIoContext)->base; | |
478 | |
479 const int32_t usedItems = --_inUse; | |
480 const int32_t totalItems = _size.Value(); | |
481 const int32_t freeItems = totalItems - usedItems; | |
482 if(freeItems < 0) | |
483 { | |
484 assert(false); | |
485 AlignedFree(item); | |
486 return -1; | |
487 } | |
488 if((freeItems >= totalItems>>1) && | |
489 overlappedIOCompleted) | |
490 { | |
491 AlignedFree(item); | |
492 --_size; | |
493 return 0; | |
494 } | |
495 InterlockedPushEntrySList(_pListHead, &(item->itemEntry)); | |
496 return 0; | |
497 } | |
498 | |
499 int32_t IoContextPool::Free() | |
500 { | |
501 if(!_init) | |
502 { | |
503 return 0; | |
504 } | |
505 | |
506 int32_t itemsFreed = 0; | |
507 PSLIST_ENTRY pListEntry = InterlockedPopEntrySList(_pListHead); | |
508 while(pListEntry != NULL) | |
509 { | |
510 IoContextPoolItem* item = ((IoContextPoolItem*)pListEntry); | |
511 AlignedFree(item); | |
512 --_size; | |
513 itemsFreed++; | |
514 pListEntry = InterlockedPopEntrySList(_pListHead); | |
515 } | |
516 return itemsFreed; | |
517 } | |
518 | |
519 int32_t UdpSocket2WorkerWindows::_numOfWorkers = 0; | |
520 | |
521 UdpSocket2WorkerWindows::UdpSocket2WorkerWindows(HANDLE ioCompletionHandle) | |
522 : _ioCompletionHandle(ioCompletionHandle), | |
523 _pThread(Run, this, "UdpSocket2ManagerWindows_thread"), | |
524 _init(false) { | |
525 _workerNumber = _numOfWorkers++; | |
526 WEBRTC_TRACE(kTraceMemory, kTraceTransport, -1, | |
527 "UdpSocket2WorkerWindows created"); | |
528 } | |
529 | |
530 UdpSocket2WorkerWindows::~UdpSocket2WorkerWindows() | |
531 { | |
532 WEBRTC_TRACE(kTraceMemory, kTraceTransport, -1, | |
533 "UdpSocket2WorkerWindows deleted"); | |
534 } | |
535 | |
536 bool UdpSocket2WorkerWindows::Start() | |
537 { | |
538 WEBRTC_TRACE(kTraceStateInfo, kTraceTransport, -1, | |
539 "Start UdpSocket2WorkerWindows"); | |
540 _pThread.Start(); | |
541 | |
542 _pThread.SetPriority(rtc::kRealtimePriority); | |
543 return true; | |
544 } | |
545 | |
546 bool UdpSocket2WorkerWindows::Stop() | |
547 { | |
548 WEBRTC_TRACE(kTraceStateInfo, kTraceTransport, -1, | |
549 "Stop UdpSocket2WorkerWindows"); | |
550 _pThread.Stop(); | |
551 return true; | |
552 } | |
553 | |
554 int32_t UdpSocket2WorkerWindows::Init() | |
555 { | |
556 _init = true; | |
557 return 0; | |
558 } | |
559 | |
560 bool UdpSocket2WorkerWindows::Run(void* obj) | |
561 { | |
562 UdpSocket2WorkerWindows* pWorker = | |
563 static_cast<UdpSocket2WorkerWindows*>(obj); | |
564 return pWorker->Process(); | |
565 } | |
566 | |
567 // Process should always return true. Stopping the worker threads is done in | |
568 // the UdpSocket2ManagerWindows::StopWorkerThreads() function. | |
569 bool UdpSocket2WorkerWindows::Process() | |
570 { | |
571 int32_t success = 0; | |
572 DWORD ioSize = 0; | |
573 UdpSocket2Windows* pSocket = NULL; | |
574 PerIoContext* pIOContext = 0; | |
575 OVERLAPPED* pOverlapped = 0; | |
576 success = GetQueuedCompletionStatus(_ioCompletionHandle, | |
577 &ioSize, | |
578 (ULONG_PTR*)&pSocket, &pOverlapped, 200); | |
579 | |
580 uint32_t error = 0; | |
581 if(!success) | |
582 { | |
583 error = GetLastError(); | |
584 if(error == WAIT_TIMEOUT) | |
585 { | |
586 return true; | |
587 } | |
588 // This may happen if e.g. PostQueuedCompletionStatus() has been called. | |
589 // The IO context still needs to be reclaimed or re-used which is done | |
590 // in UdpSocket2Windows::IOCompleted(..). | |
591 } | |
592 if(pSocket == NULL) | |
593 { | |
594 WEBRTC_TRACE( | |
595 kTraceDebug, | |
596 kTraceTransport, | |
597 -1, | |
598 "UdpSocket2WorkerWindows(%d)::Process(), pSocket == 0, end thread", | |
599 _workerNumber); | |
600 return true; | |
601 } | |
602 pIOContext = (PerIoContext*)pOverlapped; | |
603 pSocket->IOCompleted(pIOContext,ioSize,error); | |
604 return true; | |
605 } | |
606 | |
607 } // namespace test | |
608 } // namespace webrtc | |
OLD | NEW |