Index: webrtc/test/channel_transport/udp_socket2_manager_win.cc |
diff --git a/webrtc/test/channel_transport/udp_socket2_manager_win.cc b/webrtc/test/channel_transport/udp_socket2_manager_win.cc |
new file mode 100644 |
index 0000000000000000000000000000000000000000..9f40350287b61b8a810800a8e73901371a51291c |
--- /dev/null |
+++ b/webrtc/test/channel_transport/udp_socket2_manager_win.cc |
@@ -0,0 +1,608 @@ |
+/* |
+ * Copyright (c) 2012 The WebRTC project authors. All Rights Reserved. |
+ * |
+ * Use of this source code is governed by a BSD-style license |
+ * that can be found in the LICENSE file in the root of the source |
+ * tree. An additional intellectual property rights grant can be found |
+ * in the file PATENTS. All contributing project authors may |
+ * be found in the AUTHORS file in the root of the source tree. |
+ */ |
+ |
+#include "webrtc/test/channel_transport/udp_socket2_manager_win.h" |
+ |
+#include <assert.h> |
+#include <stdio.h> |
+ |
+#include "webrtc/system_wrappers/include/aligned_malloc.h" |
+#include "webrtc/test/channel_transport/udp_socket2_win.h" |
+ |
+namespace webrtc { |
+namespace test { |
+ |
+uint32_t UdpSocket2ManagerWindows::_numOfActiveManagers = 0; |
+bool UdpSocket2ManagerWindows::_wsaInit = false; |
+ |
+UdpSocket2ManagerWindows::UdpSocket2ManagerWindows() |
+ : UdpSocketManager(), |
+ _id(-1), |
+ _stopped(false), |
+ _init(false), |
+ _pCrit(CriticalSectionWrapper::CreateCriticalSection()), |
+ _ioCompletionHandle(NULL), |
+ _numActiveSockets(0), |
+ _event(EventWrapper::Create()) |
+{ |
+ _managerNumber = _numOfActiveManagers++; |
+ |
+ if(_numOfActiveManagers == 1) |
+ { |
+ WORD wVersionRequested = MAKEWORD(2, 2); |
+ WSADATA wsaData; |
+ _wsaInit = WSAStartup(wVersionRequested, &wsaData) == 0; |
+ // TODO (hellner): seems safer to use RAII for this. E.g. what happens |
+ // if a UdpSocket2ManagerWindows() created and destroyed |
+ // without being initialized. |
+ } |
+} |
+ |
+UdpSocket2ManagerWindows::~UdpSocket2ManagerWindows() |
+{ |
+ WEBRTC_TRACE(kTraceDebug, kTraceTransport, _id, |
+ "UdpSocket2ManagerWindows(%d)::~UdpSocket2ManagerWindows()", |
+ _managerNumber); |
+ |
+ if(_init) |
+ { |
+ _pCrit->Enter(); |
+ if(_numActiveSockets) |
+ { |
+ _pCrit->Leave(); |
+ _event->Wait(INFINITE); |
+ } |
+ else |
+ { |
+ _pCrit->Leave(); |
+ } |
+ StopWorkerThreads(); |
+ |
+ for (WorkerList::iterator iter = _workerThreadsList.begin(); |
+ iter != _workerThreadsList.end(); ++iter) { |
+ delete *iter; |
+ } |
+ _workerThreadsList.clear(); |
+ _ioContextPool.Free(); |
+ |
+ _numOfActiveManagers--; |
+ if(_ioCompletionHandle) |
+ { |
+ CloseHandle(_ioCompletionHandle); |
+ } |
+ if (_numOfActiveManagers == 0) |
+ { |
+ if(_wsaInit) |
+ { |
+ WSACleanup(); |
+ } |
+ } |
+ } |
+ if(_pCrit) |
+ { |
+ delete _pCrit; |
+ } |
+ if(_event) |
+ { |
+ delete _event; |
+ } |
+} |
+ |
+bool UdpSocket2ManagerWindows::Init(int32_t id, |
+ uint8_t& numOfWorkThreads) { |
+ CriticalSectionScoped cs(_pCrit); |
+ if ((_id != -1) || (_numOfWorkThreads != 0)) { |
+ assert(_id != -1); |
+ assert(_numOfWorkThreads != 0); |
+ return false; |
+ } |
+ _id = id; |
+ _numOfWorkThreads = numOfWorkThreads; |
+ return true; |
+} |
+ |
+bool UdpSocket2ManagerWindows::Start() |
+{ |
+ WEBRTC_TRACE(kTraceDebug, kTraceTransport, _id, |
+ "UdpSocket2ManagerWindows(%d)::Start()",_managerNumber); |
+ if(!_init) |
+ { |
+ StartWorkerThreads(); |
+ } |
+ |
+ if(!_init) |
+ { |
+ return false; |
+ } |
+ _pCrit->Enter(); |
+ // Start worker threads. |
+ _stopped = false; |
+ int32_t error = 0; |
+ for (WorkerList::iterator iter = _workerThreadsList.begin(); |
+ iter != _workerThreadsList.end() && !error; ++iter) { |
+ if(!(*iter)->Start()) |
+ error = 1; |
+ } |
+ if(error) |
+ { |
+ WEBRTC_TRACE( |
+ kTraceError, |
+ kTraceTransport, |
+ _id, |
+ "UdpSocket2ManagerWindows(%d)::Start() error starting worker\ |
+ threads", |
+ _managerNumber); |
+ _pCrit->Leave(); |
+ return false; |
+ } |
+ _pCrit->Leave(); |
+ return true; |
+} |
+ |
+bool UdpSocket2ManagerWindows::StartWorkerThreads() |
+{ |
+ if(!_init) |
+ { |
+ _pCrit->Enter(); |
+ |
+ _ioCompletionHandle = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, |
+ 0, 0); |
+ if(_ioCompletionHandle == NULL) |
+ { |
+ int32_t error = GetLastError(); |
+ WEBRTC_TRACE( |
+ kTraceError, |
+ kTraceTransport, |
+ _id, |
+ "UdpSocket2ManagerWindows(%d)::StartWorkerThreads()" |
+ "_ioCompletioHandle == NULL: error:%d", |
+ _managerNumber,error); |
+ _pCrit->Leave(); |
+ return false; |
+ } |
+ |
+ // Create worker threads. |
+ uint32_t i = 0; |
+ bool error = false; |
+ while(i < _numOfWorkThreads && !error) |
+ { |
+ UdpSocket2WorkerWindows* pWorker = |
+ new UdpSocket2WorkerWindows(_ioCompletionHandle); |
+ if(pWorker->Init() != 0) |
+ { |
+ error = true; |
+ delete pWorker; |
+ break; |
+ } |
+ _workerThreadsList.push_front(pWorker); |
+ i++; |
+ } |
+ if(error) |
+ { |
+ WEBRTC_TRACE( |
+ kTraceError, |
+ kTraceTransport, |
+ _id, |
+ "UdpSocket2ManagerWindows(%d)::StartWorkerThreads() error " |
+ "creating work threads", |
+ _managerNumber); |
+ // Delete worker threads. |
+ for (WorkerList::iterator iter = _workerThreadsList.begin(); |
+ iter != _workerThreadsList.end(); ++iter) { |
+ delete *iter; |
+ } |
+ _workerThreadsList.clear(); |
+ _pCrit->Leave(); |
+ return false; |
+ } |
+ if(_ioContextPool.Init()) |
+ { |
+ WEBRTC_TRACE( |
+ kTraceError, |
+ kTraceTransport, |
+ _id, |
+ "UdpSocket2ManagerWindows(%d)::StartWorkerThreads() error " |
+ "initiating _ioContextPool", |
+ _managerNumber); |
+ _pCrit->Leave(); |
+ return false; |
+ } |
+ _init = true; |
+ WEBRTC_TRACE( |
+ kTraceDebug, |
+ kTraceTransport, |
+ _id, |
+ "UdpSocket2ManagerWindows::StartWorkerThreads %d number of work " |
+ "threads created and initialized", |
+ _numOfWorkThreads); |
+ _pCrit->Leave(); |
+ } |
+ return true; |
+} |
+ |
+bool UdpSocket2ManagerWindows::Stop() |
+{ |
+ WEBRTC_TRACE(kTraceDebug, kTraceTransport, _id, |
+ "UdpSocket2ManagerWindows(%d)::Stop()",_managerNumber); |
+ |
+ if(!_init) |
+ { |
+ return false; |
+ } |
+ _pCrit->Enter(); |
+ _stopped = true; |
+ if(_numActiveSockets) |
+ { |
+ WEBRTC_TRACE( |
+ kTraceError, |
+ kTraceTransport, |
+ _id, |
+ "UdpSocket2ManagerWindows(%d)::Stop() there is still active\ |
+ sockets", |
+ _managerNumber); |
+ _pCrit->Leave(); |
+ return false; |
+ } |
+ // No active sockets. Stop all worker threads. |
+ bool result = StopWorkerThreads(); |
+ _pCrit->Leave(); |
+ return result; |
+} |
+ |
+bool UdpSocket2ManagerWindows::StopWorkerThreads() |
+{ |
+ int32_t error = 0; |
+ WEBRTC_TRACE( |
+ kTraceDebug, |
+ kTraceTransport, |
+ _id, |
+ "UdpSocket2ManagerWindows(%d)::StopWorkerThreads() Worker\ |
+ threadsStoped, numActicve Sockets=%d", |
+ _managerNumber, |
+ _numActiveSockets); |
+ |
+ // Release all threads waiting for GetQueuedCompletionStatus(..). |
+ if(_ioCompletionHandle) |
+ { |
+ uint32_t i = 0; |
+ for(i = 0; i < _workerThreadsList.size(); i++) |
+ { |
+ PostQueuedCompletionStatus(_ioCompletionHandle, 0 ,0 , NULL); |
+ } |
+ } |
+ for (WorkerList::iterator iter = _workerThreadsList.begin(); |
+ iter != _workerThreadsList.end(); ++iter) { |
+ if((*iter)->Stop() == false) |
+ { |
+ error = -1; |
+ WEBRTC_TRACE(kTraceWarning, kTraceTransport, -1, |
+ "failed to stop worker thread"); |
+ } |
+ } |
+ |
+ if(error) |
+ { |
+ WEBRTC_TRACE( |
+ kTraceError, |
+ kTraceTransport, |
+ _id, |
+ "UdpSocket2ManagerWindows(%d)::StopWorkerThreads() error stopping\ |
+ worker threads", |
+ _managerNumber); |
+ return false; |
+ } |
+ return true; |
+} |
+ |
+bool UdpSocket2ManagerWindows::AddSocketPrv(UdpSocket2Windows* s) |
+{ |
+ WEBRTC_TRACE(kTraceDebug, kTraceTransport, _id, |
+ "UdpSocket2ManagerWindows(%d)::AddSocketPrv()",_managerNumber); |
+ if(!_init) |
+ { |
+ WEBRTC_TRACE( |
+ kTraceError, |
+ kTraceTransport, |
+ _id, |
+ "UdpSocket2ManagerWindows(%d)::AddSocketPrv() manager not\ |
+ initialized", |
+ _managerNumber); |
+ return false; |
+ } |
+ _pCrit->Enter(); |
+ if(s == NULL) |
+ { |
+ WEBRTC_TRACE( |
+ kTraceError, |
+ kTraceTransport, |
+ _id, |
+ "UdpSocket2ManagerWindows(%d)::AddSocketPrv() socket == NULL", |
+ _managerNumber); |
+ _pCrit->Leave(); |
+ return false; |
+ } |
+ if(s->GetFd() == NULL || s->GetFd() == INVALID_SOCKET) |
+ { |
+ WEBRTC_TRACE( |
+ kTraceError, |
+ kTraceTransport, |
+ _id, |
+ "UdpSocket2ManagerWindows(%d)::AddSocketPrv() socket->GetFd() ==\ |
+ %d", |
+ _managerNumber, |
+ (int32_t)s->GetFd()); |
+ _pCrit->Leave(); |
+ return false; |
+ |
+ } |
+ _ioCompletionHandle = CreateIoCompletionPort((HANDLE)s->GetFd(), |
+ _ioCompletionHandle, |
+ (ULONG_PTR)(s), 0); |
+ if(_ioCompletionHandle == NULL) |
+ { |
+ int32_t error = GetLastError(); |
+ WEBRTC_TRACE( |
+ kTraceError, |
+ kTraceTransport, |
+ _id, |
+ "UdpSocket2ManagerWindows(%d)::AddSocketPrv() Error adding to IO\ |
+ completion: %d", |
+ _managerNumber, |
+ error); |
+ _pCrit->Leave(); |
+ return false; |
+ } |
+ _numActiveSockets++; |
+ _pCrit->Leave(); |
+ return true; |
+} |
+bool UdpSocket2ManagerWindows::RemoveSocketPrv(UdpSocket2Windows* s) |
+{ |
+ if(!_init) |
+ { |
+ return false; |
+ } |
+ _pCrit->Enter(); |
+ _numActiveSockets--; |
+ if(_numActiveSockets == 0) |
+ { |
+ _event->Set(); |
+ } |
+ _pCrit->Leave(); |
+ return true; |
+} |
+ |
+PerIoContext* UdpSocket2ManagerWindows::PopIoContext() |
+{ |
+ if(!_init) |
+ { |
+ return NULL; |
+ } |
+ |
+ PerIoContext* pIoC = NULL; |
+ if(!_stopped) |
+ { |
+ pIoC = _ioContextPool.PopIoContext(); |
+ }else |
+ { |
+ WEBRTC_TRACE( |
+ kTraceError, |
+ kTraceTransport, |
+ _id, |
+ "UdpSocket2ManagerWindows(%d)::PopIoContext() Manager Not started", |
+ _managerNumber); |
+ } |
+ return pIoC; |
+} |
+ |
+int32_t UdpSocket2ManagerWindows::PushIoContext(PerIoContext* pIoContext) |
+{ |
+ return _ioContextPool.PushIoContext(pIoContext); |
+} |
+ |
+IoContextPool::IoContextPool() |
+ : _pListHead(NULL), |
+ _init(false), |
+ _size(0), |
+ _inUse(0) |
+{ |
+} |
+ |
+IoContextPool::~IoContextPool() |
+{ |
+ Free(); |
+ assert(_size.Value() == 0); |
+ AlignedFree(_pListHead); |
+} |
+ |
+int32_t IoContextPool::Init(uint32_t /*increaseSize*/) |
+{ |
+ if(_init) |
+ { |
+ return 0; |
+ } |
+ |
+ _pListHead = (PSLIST_HEADER)AlignedMalloc(sizeof(SLIST_HEADER), |
+ MEMORY_ALLOCATION_ALIGNMENT); |
+ if(_pListHead == NULL) |
+ { |
+ return -1; |
+ } |
+ InitializeSListHead(_pListHead); |
+ _init = true; |
+ return 0; |
+} |
+ |
+PerIoContext* IoContextPool::PopIoContext() |
+{ |
+ if(!_init) |
+ { |
+ return NULL; |
+ } |
+ |
+ PSLIST_ENTRY pListEntry = InterlockedPopEntrySList(_pListHead); |
+ if(pListEntry == NULL) |
+ { |
+ IoContextPoolItem* item = (IoContextPoolItem*) |
+ AlignedMalloc( |
+ sizeof(IoContextPoolItem), |
+ MEMORY_ALLOCATION_ALIGNMENT); |
+ if(item == NULL) |
+ { |
+ return NULL; |
+ } |
+ memset(&item->payload.ioContext,0,sizeof(PerIoContext)); |
+ item->payload.base = item; |
+ pListEntry = &(item->itemEntry); |
+ ++_size; |
+ } |
+ ++_inUse; |
+ return &((IoContextPoolItem*)pListEntry)->payload.ioContext; |
+} |
+ |
+int32_t IoContextPool::PushIoContext(PerIoContext* pIoContext) |
+{ |
+ // TODO (hellner): Overlapped IO should be completed at this point. Perhaps |
+ // add an assert? |
+ const bool overlappedIOCompleted = HasOverlappedIoCompleted( |
+ (LPOVERLAPPED)pIoContext); |
+ |
+ IoContextPoolItem* item = ((IoContextPoolItemPayload*)pIoContext)->base; |
+ |
+ const int32_t usedItems = --_inUse; |
+ const int32_t totalItems = _size.Value(); |
+ const int32_t freeItems = totalItems - usedItems; |
+ if(freeItems < 0) |
+ { |
+ assert(false); |
+ AlignedFree(item); |
+ return -1; |
+ } |
+ if((freeItems >= totalItems>>1) && |
+ overlappedIOCompleted) |
+ { |
+ AlignedFree(item); |
+ --_size; |
+ return 0; |
+ } |
+ InterlockedPushEntrySList(_pListHead, &(item->itemEntry)); |
+ return 0; |
+} |
+ |
+int32_t IoContextPool::Free() |
+{ |
+ if(!_init) |
+ { |
+ return 0; |
+ } |
+ |
+ int32_t itemsFreed = 0; |
+ PSLIST_ENTRY pListEntry = InterlockedPopEntrySList(_pListHead); |
+ while(pListEntry != NULL) |
+ { |
+ IoContextPoolItem* item = ((IoContextPoolItem*)pListEntry); |
+ AlignedFree(item); |
+ --_size; |
+ itemsFreed++; |
+ pListEntry = InterlockedPopEntrySList(_pListHead); |
+ } |
+ return itemsFreed; |
+} |
+ |
+int32_t UdpSocket2WorkerWindows::_numOfWorkers = 0; |
+ |
+UdpSocket2WorkerWindows::UdpSocket2WorkerWindows(HANDLE ioCompletionHandle) |
+ : _ioCompletionHandle(ioCompletionHandle), |
+ _pThread(Run, this, "UdpSocket2ManagerWindows_thread"), |
+ _init(false) { |
+ _workerNumber = _numOfWorkers++; |
+ WEBRTC_TRACE(kTraceMemory, kTraceTransport, -1, |
+ "UdpSocket2WorkerWindows created"); |
+} |
+ |
+UdpSocket2WorkerWindows::~UdpSocket2WorkerWindows() |
+{ |
+ WEBRTC_TRACE(kTraceMemory, kTraceTransport, -1, |
+ "UdpSocket2WorkerWindows deleted"); |
+} |
+ |
+bool UdpSocket2WorkerWindows::Start() |
+{ |
+ WEBRTC_TRACE(kTraceStateInfo, kTraceTransport, -1, |
+ "Start UdpSocket2WorkerWindows"); |
+ _pThread.Start(); |
+ |
+ _pThread.SetPriority(rtc::kRealtimePriority); |
+ return true; |
+} |
+ |
+bool UdpSocket2WorkerWindows::Stop() |
+{ |
+ WEBRTC_TRACE(kTraceStateInfo, kTraceTransport, -1, |
+ "Stop UdpSocket2WorkerWindows"); |
+ _pThread.Stop(); |
+ return true; |
+} |
+ |
+int32_t UdpSocket2WorkerWindows::Init() |
+{ |
+ _init = true; |
+ return 0; |
+} |
+ |
+bool UdpSocket2WorkerWindows::Run(void* obj) |
+{ |
+ UdpSocket2WorkerWindows* pWorker = |
+ static_cast<UdpSocket2WorkerWindows*>(obj); |
+ return pWorker->Process(); |
+} |
+ |
+// Process should always return true. Stopping the worker threads is done in |
+// the UdpSocket2ManagerWindows::StopWorkerThreads() function. |
+bool UdpSocket2WorkerWindows::Process() |
+{ |
+ int32_t success = 0; |
+ DWORD ioSize = 0; |
+ UdpSocket2Windows* pSocket = NULL; |
+ PerIoContext* pIOContext = 0; |
+ OVERLAPPED* pOverlapped = 0; |
+ success = GetQueuedCompletionStatus(_ioCompletionHandle, |
+ &ioSize, |
+ (ULONG_PTR*)&pSocket, &pOverlapped, 200); |
+ |
+ uint32_t error = 0; |
+ if(!success) |
+ { |
+ error = GetLastError(); |
+ if(error == WAIT_TIMEOUT) |
+ { |
+ return true; |
+ } |
+ // This may happen if e.g. PostQueuedCompletionStatus() has been called. |
+ // The IO context still needs to be reclaimed or re-used which is done |
+ // in UdpSocket2Windows::IOCompleted(..). |
+ } |
+ if(pSocket == NULL) |
+ { |
+ WEBRTC_TRACE( |
+ kTraceDebug, |
+ kTraceTransport, |
+ -1, |
+ "UdpSocket2WorkerWindows(%d)::Process(), pSocket == 0, end thread", |
+ _workerNumber); |
+ return true; |
+ } |
+ pIOContext = (PerIoContext*)pOverlapped; |
+ pSocket->IOCompleted(pIOContext,ioSize,error); |
+ return true; |
+} |
+ |
+} // namespace test |
+} // namespace webrtc |