OLD | NEW |
(Empty) | |
| 1 /* |
| 2 * Copyright (c) 2012 The WebRTC project authors. All Rights Reserved. |
| 3 * |
| 4 * Use of this source code is governed by a BSD-style license |
| 5 * that can be found in the LICENSE file in the root of the source |
| 6 * tree. An additional intellectual property rights grant can be found |
| 7 * in the file PATENTS. All contributing project authors may |
| 8 * be found in the AUTHORS file in the root of the source tree. |
| 9 */ |
| 10 |
| 11 #include "webrtc/test/channel_transport/udp_socket2_manager_win.h" |
| 12 |
| 13 #include <assert.h> |
| 14 #include <stdio.h> |
| 15 |
| 16 #include "webrtc/system_wrappers/include/aligned_malloc.h" |
| 17 #include "webrtc/test/channel_transport/udp_socket2_win.h" |
| 18 |
| 19 namespace webrtc { |
| 20 namespace test { |
| 21 |
| 22 uint32_t UdpSocket2ManagerWindows::_numOfActiveManagers = 0; |
| 23 bool UdpSocket2ManagerWindows::_wsaInit = false; |
| 24 |
| 25 UdpSocket2ManagerWindows::UdpSocket2ManagerWindows() |
| 26 : UdpSocketManager(), |
| 27 _id(-1), |
| 28 _stopped(false), |
| 29 _init(false), |
| 30 _pCrit(CriticalSectionWrapper::CreateCriticalSection()), |
| 31 _ioCompletionHandle(NULL), |
| 32 _numActiveSockets(0), |
| 33 _event(EventWrapper::Create()) |
| 34 { |
| 35 _managerNumber = _numOfActiveManagers++; |
| 36 |
| 37 if(_numOfActiveManagers == 1) |
| 38 { |
| 39 WORD wVersionRequested = MAKEWORD(2, 2); |
| 40 WSADATA wsaData; |
| 41 _wsaInit = WSAStartup(wVersionRequested, &wsaData) == 0; |
| 42 // TODO (hellner): seems safer to use RAII for this. E.g. what happens |
| 43 // if a UdpSocket2ManagerWindows() created and destroyed |
| 44 // without being initialized. |
| 45 } |
| 46 } |
| 47 |
| 48 UdpSocket2ManagerWindows::~UdpSocket2ManagerWindows() |
| 49 { |
| 50 WEBRTC_TRACE(kTraceDebug, kTraceTransport, _id, |
| 51 "UdpSocket2ManagerWindows(%d)::~UdpSocket2ManagerWindows()", |
| 52 _managerNumber); |
| 53 |
| 54 if(_init) |
| 55 { |
| 56 _pCrit->Enter(); |
| 57 if(_numActiveSockets) |
| 58 { |
| 59 _pCrit->Leave(); |
| 60 _event->Wait(INFINITE); |
| 61 } |
| 62 else |
| 63 { |
| 64 _pCrit->Leave(); |
| 65 } |
| 66 StopWorkerThreads(); |
| 67 |
| 68 for (WorkerList::iterator iter = _workerThreadsList.begin(); |
| 69 iter != _workerThreadsList.end(); ++iter) { |
| 70 delete *iter; |
| 71 } |
| 72 _workerThreadsList.clear(); |
| 73 _ioContextPool.Free(); |
| 74 |
| 75 _numOfActiveManagers--; |
| 76 if(_ioCompletionHandle) |
| 77 { |
| 78 CloseHandle(_ioCompletionHandle); |
| 79 } |
| 80 if (_numOfActiveManagers == 0) |
| 81 { |
| 82 if(_wsaInit) |
| 83 { |
| 84 WSACleanup(); |
| 85 } |
| 86 } |
| 87 } |
| 88 if(_pCrit) |
| 89 { |
| 90 delete _pCrit; |
| 91 } |
| 92 if(_event) |
| 93 { |
| 94 delete _event; |
| 95 } |
| 96 } |
| 97 |
| 98 bool UdpSocket2ManagerWindows::Init(int32_t id, |
| 99 uint8_t& numOfWorkThreads) { |
| 100 CriticalSectionScoped cs(_pCrit); |
| 101 if ((_id != -1) || (_numOfWorkThreads != 0)) { |
| 102 assert(_id != -1); |
| 103 assert(_numOfWorkThreads != 0); |
| 104 return false; |
| 105 } |
| 106 _id = id; |
| 107 _numOfWorkThreads = numOfWorkThreads; |
| 108 return true; |
| 109 } |
| 110 |
| 111 bool UdpSocket2ManagerWindows::Start() |
| 112 { |
| 113 WEBRTC_TRACE(kTraceDebug, kTraceTransport, _id, |
| 114 "UdpSocket2ManagerWindows(%d)::Start()",_managerNumber); |
| 115 if(!_init) |
| 116 { |
| 117 StartWorkerThreads(); |
| 118 } |
| 119 |
| 120 if(!_init) |
| 121 { |
| 122 return false; |
| 123 } |
| 124 _pCrit->Enter(); |
| 125 // Start worker threads. |
| 126 _stopped = false; |
| 127 int32_t error = 0; |
| 128 for (WorkerList::iterator iter = _workerThreadsList.begin(); |
| 129 iter != _workerThreadsList.end() && !error; ++iter) { |
| 130 if(!(*iter)->Start()) |
| 131 error = 1; |
| 132 } |
| 133 if(error) |
| 134 { |
| 135 WEBRTC_TRACE( |
| 136 kTraceError, |
| 137 kTraceTransport, |
| 138 _id, |
| 139 "UdpSocket2ManagerWindows(%d)::Start() error starting worker\ |
| 140 threads", |
| 141 _managerNumber); |
| 142 _pCrit->Leave(); |
| 143 return false; |
| 144 } |
| 145 _pCrit->Leave(); |
| 146 return true; |
| 147 } |
| 148 |
| 149 bool UdpSocket2ManagerWindows::StartWorkerThreads() |
| 150 { |
| 151 if(!_init) |
| 152 { |
| 153 _pCrit->Enter(); |
| 154 |
| 155 _ioCompletionHandle = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, |
| 156 0, 0); |
| 157 if(_ioCompletionHandle == NULL) |
| 158 { |
| 159 int32_t error = GetLastError(); |
| 160 WEBRTC_TRACE( |
| 161 kTraceError, |
| 162 kTraceTransport, |
| 163 _id, |
| 164 "UdpSocket2ManagerWindows(%d)::StartWorkerThreads()" |
| 165 "_ioCompletioHandle == NULL: error:%d", |
| 166 _managerNumber,error); |
| 167 _pCrit->Leave(); |
| 168 return false; |
| 169 } |
| 170 |
| 171 // Create worker threads. |
| 172 uint32_t i = 0; |
| 173 bool error = false; |
| 174 while(i < _numOfWorkThreads && !error) |
| 175 { |
| 176 UdpSocket2WorkerWindows* pWorker = |
| 177 new UdpSocket2WorkerWindows(_ioCompletionHandle); |
| 178 if(pWorker->Init() != 0) |
| 179 { |
| 180 error = true; |
| 181 delete pWorker; |
| 182 break; |
| 183 } |
| 184 _workerThreadsList.push_front(pWorker); |
| 185 i++; |
| 186 } |
| 187 if(error) |
| 188 { |
| 189 WEBRTC_TRACE( |
| 190 kTraceError, |
| 191 kTraceTransport, |
| 192 _id, |
| 193 "UdpSocket2ManagerWindows(%d)::StartWorkerThreads() error " |
| 194 "creating work threads", |
| 195 _managerNumber); |
| 196 // Delete worker threads. |
| 197 for (WorkerList::iterator iter = _workerThreadsList.begin(); |
| 198 iter != _workerThreadsList.end(); ++iter) { |
| 199 delete *iter; |
| 200 } |
| 201 _workerThreadsList.clear(); |
| 202 _pCrit->Leave(); |
| 203 return false; |
| 204 } |
| 205 if(_ioContextPool.Init()) |
| 206 { |
| 207 WEBRTC_TRACE( |
| 208 kTraceError, |
| 209 kTraceTransport, |
| 210 _id, |
| 211 "UdpSocket2ManagerWindows(%d)::StartWorkerThreads() error " |
| 212 "initiating _ioContextPool", |
| 213 _managerNumber); |
| 214 _pCrit->Leave(); |
| 215 return false; |
| 216 } |
| 217 _init = true; |
| 218 WEBRTC_TRACE( |
| 219 kTraceDebug, |
| 220 kTraceTransport, |
| 221 _id, |
| 222 "UdpSocket2ManagerWindows::StartWorkerThreads %d number of work " |
| 223 "threads created and initialized", |
| 224 _numOfWorkThreads); |
| 225 _pCrit->Leave(); |
| 226 } |
| 227 return true; |
| 228 } |
| 229 |
| 230 bool UdpSocket2ManagerWindows::Stop() |
| 231 { |
| 232 WEBRTC_TRACE(kTraceDebug, kTraceTransport, _id, |
| 233 "UdpSocket2ManagerWindows(%d)::Stop()",_managerNumber); |
| 234 |
| 235 if(!_init) |
| 236 { |
| 237 return false; |
| 238 } |
| 239 _pCrit->Enter(); |
| 240 _stopped = true; |
| 241 if(_numActiveSockets) |
| 242 { |
| 243 WEBRTC_TRACE( |
| 244 kTraceError, |
| 245 kTraceTransport, |
| 246 _id, |
| 247 "UdpSocket2ManagerWindows(%d)::Stop() there is still active\ |
| 248 sockets", |
| 249 _managerNumber); |
| 250 _pCrit->Leave(); |
| 251 return false; |
| 252 } |
| 253 // No active sockets. Stop all worker threads. |
| 254 bool result = StopWorkerThreads(); |
| 255 _pCrit->Leave(); |
| 256 return result; |
| 257 } |
| 258 |
| 259 bool UdpSocket2ManagerWindows::StopWorkerThreads() |
| 260 { |
| 261 int32_t error = 0; |
| 262 WEBRTC_TRACE( |
| 263 kTraceDebug, |
| 264 kTraceTransport, |
| 265 _id, |
| 266 "UdpSocket2ManagerWindows(%d)::StopWorkerThreads() Worker\ |
| 267 threadsStoped, numActicve Sockets=%d", |
| 268 _managerNumber, |
| 269 _numActiveSockets); |
| 270 |
| 271 // Release all threads waiting for GetQueuedCompletionStatus(..). |
| 272 if(_ioCompletionHandle) |
| 273 { |
| 274 uint32_t i = 0; |
| 275 for(i = 0; i < _workerThreadsList.size(); i++) |
| 276 { |
| 277 PostQueuedCompletionStatus(_ioCompletionHandle, 0 ,0 , NULL); |
| 278 } |
| 279 } |
| 280 for (WorkerList::iterator iter = _workerThreadsList.begin(); |
| 281 iter != _workerThreadsList.end(); ++iter) { |
| 282 if((*iter)->Stop() == false) |
| 283 { |
| 284 error = -1; |
| 285 WEBRTC_TRACE(kTraceWarning, kTraceTransport, -1, |
| 286 "failed to stop worker thread"); |
| 287 } |
| 288 } |
| 289 |
| 290 if(error) |
| 291 { |
| 292 WEBRTC_TRACE( |
| 293 kTraceError, |
| 294 kTraceTransport, |
| 295 _id, |
| 296 "UdpSocket2ManagerWindows(%d)::StopWorkerThreads() error stopping\ |
| 297 worker threads", |
| 298 _managerNumber); |
| 299 return false; |
| 300 } |
| 301 return true; |
| 302 } |
| 303 |
| 304 bool UdpSocket2ManagerWindows::AddSocketPrv(UdpSocket2Windows* s) |
| 305 { |
| 306 WEBRTC_TRACE(kTraceDebug, kTraceTransport, _id, |
| 307 "UdpSocket2ManagerWindows(%d)::AddSocketPrv()",_managerNumber); |
| 308 if(!_init) |
| 309 { |
| 310 WEBRTC_TRACE( |
| 311 kTraceError, |
| 312 kTraceTransport, |
| 313 _id, |
| 314 "UdpSocket2ManagerWindows(%d)::AddSocketPrv() manager not\ |
| 315 initialized", |
| 316 _managerNumber); |
| 317 return false; |
| 318 } |
| 319 _pCrit->Enter(); |
| 320 if(s == NULL) |
| 321 { |
| 322 WEBRTC_TRACE( |
| 323 kTraceError, |
| 324 kTraceTransport, |
| 325 _id, |
| 326 "UdpSocket2ManagerWindows(%d)::AddSocketPrv() socket == NULL", |
| 327 _managerNumber); |
| 328 _pCrit->Leave(); |
| 329 return false; |
| 330 } |
| 331 if(s->GetFd() == NULL || s->GetFd() == INVALID_SOCKET) |
| 332 { |
| 333 WEBRTC_TRACE( |
| 334 kTraceError, |
| 335 kTraceTransport, |
| 336 _id, |
| 337 "UdpSocket2ManagerWindows(%d)::AddSocketPrv() socket->GetFd() ==\ |
| 338 %d", |
| 339 _managerNumber, |
| 340 (int32_t)s->GetFd()); |
| 341 _pCrit->Leave(); |
| 342 return false; |
| 343 |
| 344 } |
| 345 _ioCompletionHandle = CreateIoCompletionPort((HANDLE)s->GetFd(), |
| 346 _ioCompletionHandle, |
| 347 (ULONG_PTR)(s), 0); |
| 348 if(_ioCompletionHandle == NULL) |
| 349 { |
| 350 int32_t error = GetLastError(); |
| 351 WEBRTC_TRACE( |
| 352 kTraceError, |
| 353 kTraceTransport, |
| 354 _id, |
| 355 "UdpSocket2ManagerWindows(%d)::AddSocketPrv() Error adding to IO\ |
| 356 completion: %d", |
| 357 _managerNumber, |
| 358 error); |
| 359 _pCrit->Leave(); |
| 360 return false; |
| 361 } |
| 362 _numActiveSockets++; |
| 363 _pCrit->Leave(); |
| 364 return true; |
| 365 } |
| 366 bool UdpSocket2ManagerWindows::RemoveSocketPrv(UdpSocket2Windows* s) |
| 367 { |
| 368 if(!_init) |
| 369 { |
| 370 return false; |
| 371 } |
| 372 _pCrit->Enter(); |
| 373 _numActiveSockets--; |
| 374 if(_numActiveSockets == 0) |
| 375 { |
| 376 _event->Set(); |
| 377 } |
| 378 _pCrit->Leave(); |
| 379 return true; |
| 380 } |
| 381 |
| 382 PerIoContext* UdpSocket2ManagerWindows::PopIoContext() |
| 383 { |
| 384 if(!_init) |
| 385 { |
| 386 return NULL; |
| 387 } |
| 388 |
| 389 PerIoContext* pIoC = NULL; |
| 390 if(!_stopped) |
| 391 { |
| 392 pIoC = _ioContextPool.PopIoContext(); |
| 393 }else |
| 394 { |
| 395 WEBRTC_TRACE( |
| 396 kTraceError, |
| 397 kTraceTransport, |
| 398 _id, |
| 399 "UdpSocket2ManagerWindows(%d)::PopIoContext() Manager Not started", |
| 400 _managerNumber); |
| 401 } |
| 402 return pIoC; |
| 403 } |
| 404 |
| 405 int32_t UdpSocket2ManagerWindows::PushIoContext(PerIoContext* pIoContext) |
| 406 { |
| 407 return _ioContextPool.PushIoContext(pIoContext); |
| 408 } |
| 409 |
| 410 IoContextPool::IoContextPool() |
| 411 : _pListHead(NULL), |
| 412 _init(false), |
| 413 _size(0), |
| 414 _inUse(0) |
| 415 { |
| 416 } |
| 417 |
| 418 IoContextPool::~IoContextPool() |
| 419 { |
| 420 Free(); |
| 421 assert(_size.Value() == 0); |
| 422 AlignedFree(_pListHead); |
| 423 } |
| 424 |
| 425 int32_t IoContextPool::Init(uint32_t /*increaseSize*/) |
| 426 { |
| 427 if(_init) |
| 428 { |
| 429 return 0; |
| 430 } |
| 431 |
| 432 _pListHead = (PSLIST_HEADER)AlignedMalloc(sizeof(SLIST_HEADER), |
| 433 MEMORY_ALLOCATION_ALIGNMENT); |
| 434 if(_pListHead == NULL) |
| 435 { |
| 436 return -1; |
| 437 } |
| 438 InitializeSListHead(_pListHead); |
| 439 _init = true; |
| 440 return 0; |
| 441 } |
| 442 |
| 443 PerIoContext* IoContextPool::PopIoContext() |
| 444 { |
| 445 if(!_init) |
| 446 { |
| 447 return NULL; |
| 448 } |
| 449 |
| 450 PSLIST_ENTRY pListEntry = InterlockedPopEntrySList(_pListHead); |
| 451 if(pListEntry == NULL) |
| 452 { |
| 453 IoContextPoolItem* item = (IoContextPoolItem*) |
| 454 AlignedMalloc( |
| 455 sizeof(IoContextPoolItem), |
| 456 MEMORY_ALLOCATION_ALIGNMENT); |
| 457 if(item == NULL) |
| 458 { |
| 459 return NULL; |
| 460 } |
| 461 memset(&item->payload.ioContext,0,sizeof(PerIoContext)); |
| 462 item->payload.base = item; |
| 463 pListEntry = &(item->itemEntry); |
| 464 ++_size; |
| 465 } |
| 466 ++_inUse; |
| 467 return &((IoContextPoolItem*)pListEntry)->payload.ioContext; |
| 468 } |
| 469 |
| 470 int32_t IoContextPool::PushIoContext(PerIoContext* pIoContext) |
| 471 { |
| 472 // TODO (hellner): Overlapped IO should be completed at this point. Perhaps |
| 473 // add an assert? |
| 474 const bool overlappedIOCompleted = HasOverlappedIoCompleted( |
| 475 (LPOVERLAPPED)pIoContext); |
| 476 |
| 477 IoContextPoolItem* item = ((IoContextPoolItemPayload*)pIoContext)->base; |
| 478 |
| 479 const int32_t usedItems = --_inUse; |
| 480 const int32_t totalItems = _size.Value(); |
| 481 const int32_t freeItems = totalItems - usedItems; |
| 482 if(freeItems < 0) |
| 483 { |
| 484 assert(false); |
| 485 AlignedFree(item); |
| 486 return -1; |
| 487 } |
| 488 if((freeItems >= totalItems>>1) && |
| 489 overlappedIOCompleted) |
| 490 { |
| 491 AlignedFree(item); |
| 492 --_size; |
| 493 return 0; |
| 494 } |
| 495 InterlockedPushEntrySList(_pListHead, &(item->itemEntry)); |
| 496 return 0; |
| 497 } |
| 498 |
| 499 int32_t IoContextPool::Free() |
| 500 { |
| 501 if(!_init) |
| 502 { |
| 503 return 0; |
| 504 } |
| 505 |
| 506 int32_t itemsFreed = 0; |
| 507 PSLIST_ENTRY pListEntry = InterlockedPopEntrySList(_pListHead); |
| 508 while(pListEntry != NULL) |
| 509 { |
| 510 IoContextPoolItem* item = ((IoContextPoolItem*)pListEntry); |
| 511 AlignedFree(item); |
| 512 --_size; |
| 513 itemsFreed++; |
| 514 pListEntry = InterlockedPopEntrySList(_pListHead); |
| 515 } |
| 516 return itemsFreed; |
| 517 } |
| 518 |
| 519 int32_t UdpSocket2WorkerWindows::_numOfWorkers = 0; |
| 520 |
| 521 UdpSocket2WorkerWindows::UdpSocket2WorkerWindows(HANDLE ioCompletionHandle) |
| 522 : _ioCompletionHandle(ioCompletionHandle), |
| 523 _pThread(Run, this, "UdpSocket2ManagerWindows_thread"), |
| 524 _init(false) { |
| 525 _workerNumber = _numOfWorkers++; |
| 526 WEBRTC_TRACE(kTraceMemory, kTraceTransport, -1, |
| 527 "UdpSocket2WorkerWindows created"); |
| 528 } |
| 529 |
| 530 UdpSocket2WorkerWindows::~UdpSocket2WorkerWindows() |
| 531 { |
| 532 WEBRTC_TRACE(kTraceMemory, kTraceTransport, -1, |
| 533 "UdpSocket2WorkerWindows deleted"); |
| 534 } |
| 535 |
| 536 bool UdpSocket2WorkerWindows::Start() |
| 537 { |
| 538 WEBRTC_TRACE(kTraceStateInfo, kTraceTransport, -1, |
| 539 "Start UdpSocket2WorkerWindows"); |
| 540 _pThread.Start(); |
| 541 |
| 542 _pThread.SetPriority(rtc::kRealtimePriority); |
| 543 return true; |
| 544 } |
| 545 |
| 546 bool UdpSocket2WorkerWindows::Stop() |
| 547 { |
| 548 WEBRTC_TRACE(kTraceStateInfo, kTraceTransport, -1, |
| 549 "Stop UdpSocket2WorkerWindows"); |
| 550 _pThread.Stop(); |
| 551 return true; |
| 552 } |
| 553 |
| 554 int32_t UdpSocket2WorkerWindows::Init() |
| 555 { |
| 556 _init = true; |
| 557 return 0; |
| 558 } |
| 559 |
| 560 bool UdpSocket2WorkerWindows::Run(void* obj) |
| 561 { |
| 562 UdpSocket2WorkerWindows* pWorker = |
| 563 static_cast<UdpSocket2WorkerWindows*>(obj); |
| 564 return pWorker->Process(); |
| 565 } |
| 566 |
| 567 // Process should always return true. Stopping the worker threads is done in |
| 568 // the UdpSocket2ManagerWindows::StopWorkerThreads() function. |
| 569 bool UdpSocket2WorkerWindows::Process() |
| 570 { |
| 571 int32_t success = 0; |
| 572 DWORD ioSize = 0; |
| 573 UdpSocket2Windows* pSocket = NULL; |
| 574 PerIoContext* pIOContext = 0; |
| 575 OVERLAPPED* pOverlapped = 0; |
| 576 success = GetQueuedCompletionStatus(_ioCompletionHandle, |
| 577 &ioSize, |
| 578 (ULONG_PTR*)&pSocket, &pOverlapped, 200); |
| 579 |
| 580 uint32_t error = 0; |
| 581 if(!success) |
| 582 { |
| 583 error = GetLastError(); |
| 584 if(error == WAIT_TIMEOUT) |
| 585 { |
| 586 return true; |
| 587 } |
| 588 // This may happen if e.g. PostQueuedCompletionStatus() has been called. |
| 589 // The IO context still needs to be reclaimed or re-used which is done |
| 590 // in UdpSocket2Windows::IOCompleted(..). |
| 591 } |
| 592 if(pSocket == NULL) |
| 593 { |
| 594 WEBRTC_TRACE( |
| 595 kTraceDebug, |
| 596 kTraceTransport, |
| 597 -1, |
| 598 "UdpSocket2WorkerWindows(%d)::Process(), pSocket == 0, end thread", |
| 599 _workerNumber); |
| 600 return true; |
| 601 } |
| 602 pIOContext = (PerIoContext*)pOverlapped; |
| 603 pSocket->IOCompleted(pIOContext,ioSize,error); |
| 604 return true; |
| 605 } |
| 606 |
| 607 } // namespace test |
| 608 } // namespace webrtc |
OLD | NEW |