Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(241)

Unified Diff: webrtc/test/channel_transport/udp_socket2_manager_win.cc

Issue 2336123002: Revert of Moved webrtc/test/channel_transport/ into webrtc/voice_engine/test/ (Closed)
Patch Set: Created 4 years, 3 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: webrtc/test/channel_transport/udp_socket2_manager_win.cc
diff --git a/webrtc/test/channel_transport/udp_socket2_manager_win.cc b/webrtc/test/channel_transport/udp_socket2_manager_win.cc
new file mode 100644
index 0000000000000000000000000000000000000000..9f40350287b61b8a810800a8e73901371a51291c
--- /dev/null
+++ b/webrtc/test/channel_transport/udp_socket2_manager_win.cc
@@ -0,0 +1,608 @@
+/*
+ * Copyright (c) 2012 The WebRTC project authors. All Rights Reserved.
+ *
+ * Use of this source code is governed by a BSD-style license
+ * that can be found in the LICENSE file in the root of the source
+ * tree. An additional intellectual property rights grant can be found
+ * in the file PATENTS. All contributing project authors may
+ * be found in the AUTHORS file in the root of the source tree.
+ */
+
+#include "webrtc/test/channel_transport/udp_socket2_manager_win.h"
+
+#include <assert.h>
+#include <stdio.h>
+
+#include "webrtc/system_wrappers/include/aligned_malloc.h"
+#include "webrtc/test/channel_transport/udp_socket2_win.h"
+
+namespace webrtc {
+namespace test {
+
+uint32_t UdpSocket2ManagerWindows::_numOfActiveManagers = 0;
+bool UdpSocket2ManagerWindows::_wsaInit = false;
+
+UdpSocket2ManagerWindows::UdpSocket2ManagerWindows()
+ : UdpSocketManager(),
+ _id(-1),
+ _stopped(false),
+ _init(false),
+ _pCrit(CriticalSectionWrapper::CreateCriticalSection()),
+ _ioCompletionHandle(NULL),
+ _numActiveSockets(0),
+ _event(EventWrapper::Create())
+{
+ _managerNumber = _numOfActiveManagers++;
+
+ if(_numOfActiveManagers == 1)
+ {
+ WORD wVersionRequested = MAKEWORD(2, 2);
+ WSADATA wsaData;
+ _wsaInit = WSAStartup(wVersionRequested, &wsaData) == 0;
+ // TODO (hellner): seems safer to use RAII for this. E.g. what happens
+ // if a UdpSocket2ManagerWindows() created and destroyed
+ // without being initialized.
+ }
+}
+
+UdpSocket2ManagerWindows::~UdpSocket2ManagerWindows()
+{
+ WEBRTC_TRACE(kTraceDebug, kTraceTransport, _id,
+ "UdpSocket2ManagerWindows(%d)::~UdpSocket2ManagerWindows()",
+ _managerNumber);
+
+ if(_init)
+ {
+ _pCrit->Enter();
+ if(_numActiveSockets)
+ {
+ _pCrit->Leave();
+ _event->Wait(INFINITE);
+ }
+ else
+ {
+ _pCrit->Leave();
+ }
+ StopWorkerThreads();
+
+ for (WorkerList::iterator iter = _workerThreadsList.begin();
+ iter != _workerThreadsList.end(); ++iter) {
+ delete *iter;
+ }
+ _workerThreadsList.clear();
+ _ioContextPool.Free();
+
+ _numOfActiveManagers--;
+ if(_ioCompletionHandle)
+ {
+ CloseHandle(_ioCompletionHandle);
+ }
+ if (_numOfActiveManagers == 0)
+ {
+ if(_wsaInit)
+ {
+ WSACleanup();
+ }
+ }
+ }
+ if(_pCrit)
+ {
+ delete _pCrit;
+ }
+ if(_event)
+ {
+ delete _event;
+ }
+}
+
+bool UdpSocket2ManagerWindows::Init(int32_t id,
+ uint8_t& numOfWorkThreads) {
+ CriticalSectionScoped cs(_pCrit);
+ if ((_id != -1) || (_numOfWorkThreads != 0)) {
+ assert(_id != -1);
+ assert(_numOfWorkThreads != 0);
+ return false;
+ }
+ _id = id;
+ _numOfWorkThreads = numOfWorkThreads;
+ return true;
+}
+
+bool UdpSocket2ManagerWindows::Start()
+{
+ WEBRTC_TRACE(kTraceDebug, kTraceTransport, _id,
+ "UdpSocket2ManagerWindows(%d)::Start()",_managerNumber);
+ if(!_init)
+ {
+ StartWorkerThreads();
+ }
+
+ if(!_init)
+ {
+ return false;
+ }
+ _pCrit->Enter();
+ // Start worker threads.
+ _stopped = false;
+ int32_t error = 0;
+ for (WorkerList::iterator iter = _workerThreadsList.begin();
+ iter != _workerThreadsList.end() && !error; ++iter) {
+ if(!(*iter)->Start())
+ error = 1;
+ }
+ if(error)
+ {
+ WEBRTC_TRACE(
+ kTraceError,
+ kTraceTransport,
+ _id,
+ "UdpSocket2ManagerWindows(%d)::Start() error starting worker\
+ threads",
+ _managerNumber);
+ _pCrit->Leave();
+ return false;
+ }
+ _pCrit->Leave();
+ return true;
+}
+
+bool UdpSocket2ManagerWindows::StartWorkerThreads()
+{
+ if(!_init)
+ {
+ _pCrit->Enter();
+
+ _ioCompletionHandle = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL,
+ 0, 0);
+ if(_ioCompletionHandle == NULL)
+ {
+ int32_t error = GetLastError();
+ WEBRTC_TRACE(
+ kTraceError,
+ kTraceTransport,
+ _id,
+ "UdpSocket2ManagerWindows(%d)::StartWorkerThreads()"
+ "_ioCompletioHandle == NULL: error:%d",
+ _managerNumber,error);
+ _pCrit->Leave();
+ return false;
+ }
+
+ // Create worker threads.
+ uint32_t i = 0;
+ bool error = false;
+ while(i < _numOfWorkThreads && !error)
+ {
+ UdpSocket2WorkerWindows* pWorker =
+ new UdpSocket2WorkerWindows(_ioCompletionHandle);
+ if(pWorker->Init() != 0)
+ {
+ error = true;
+ delete pWorker;
+ break;
+ }
+ _workerThreadsList.push_front(pWorker);
+ i++;
+ }
+ if(error)
+ {
+ WEBRTC_TRACE(
+ kTraceError,
+ kTraceTransport,
+ _id,
+ "UdpSocket2ManagerWindows(%d)::StartWorkerThreads() error "
+ "creating work threads",
+ _managerNumber);
+ // Delete worker threads.
+ for (WorkerList::iterator iter = _workerThreadsList.begin();
+ iter != _workerThreadsList.end(); ++iter) {
+ delete *iter;
+ }
+ _workerThreadsList.clear();
+ _pCrit->Leave();
+ return false;
+ }
+ if(_ioContextPool.Init())
+ {
+ WEBRTC_TRACE(
+ kTraceError,
+ kTraceTransport,
+ _id,
+ "UdpSocket2ManagerWindows(%d)::StartWorkerThreads() error "
+ "initiating _ioContextPool",
+ _managerNumber);
+ _pCrit->Leave();
+ return false;
+ }
+ _init = true;
+ WEBRTC_TRACE(
+ kTraceDebug,
+ kTraceTransport,
+ _id,
+ "UdpSocket2ManagerWindows::StartWorkerThreads %d number of work "
+ "threads created and initialized",
+ _numOfWorkThreads);
+ _pCrit->Leave();
+ }
+ return true;
+}
+
+bool UdpSocket2ManagerWindows::Stop()
+{
+ WEBRTC_TRACE(kTraceDebug, kTraceTransport, _id,
+ "UdpSocket2ManagerWindows(%d)::Stop()",_managerNumber);
+
+ if(!_init)
+ {
+ return false;
+ }
+ _pCrit->Enter();
+ _stopped = true;
+ if(_numActiveSockets)
+ {
+ WEBRTC_TRACE(
+ kTraceError,
+ kTraceTransport,
+ _id,
+ "UdpSocket2ManagerWindows(%d)::Stop() there is still active\
+ sockets",
+ _managerNumber);
+ _pCrit->Leave();
+ return false;
+ }
+ // No active sockets. Stop all worker threads.
+ bool result = StopWorkerThreads();
+ _pCrit->Leave();
+ return result;
+}
+
+bool UdpSocket2ManagerWindows::StopWorkerThreads()
+{
+ int32_t error = 0;
+ WEBRTC_TRACE(
+ kTraceDebug,
+ kTraceTransport,
+ _id,
+ "UdpSocket2ManagerWindows(%d)::StopWorkerThreads() Worker\
+ threadsStoped, numActicve Sockets=%d",
+ _managerNumber,
+ _numActiveSockets);
+
+ // Release all threads waiting for GetQueuedCompletionStatus(..).
+ if(_ioCompletionHandle)
+ {
+ uint32_t i = 0;
+ for(i = 0; i < _workerThreadsList.size(); i++)
+ {
+ PostQueuedCompletionStatus(_ioCompletionHandle, 0 ,0 , NULL);
+ }
+ }
+ for (WorkerList::iterator iter = _workerThreadsList.begin();
+ iter != _workerThreadsList.end(); ++iter) {
+ if((*iter)->Stop() == false)
+ {
+ error = -1;
+ WEBRTC_TRACE(kTraceWarning, kTraceTransport, -1,
+ "failed to stop worker thread");
+ }
+ }
+
+ if(error)
+ {
+ WEBRTC_TRACE(
+ kTraceError,
+ kTraceTransport,
+ _id,
+ "UdpSocket2ManagerWindows(%d)::StopWorkerThreads() error stopping\
+ worker threads",
+ _managerNumber);
+ return false;
+ }
+ return true;
+}
+
+bool UdpSocket2ManagerWindows::AddSocketPrv(UdpSocket2Windows* s)
+{
+ WEBRTC_TRACE(kTraceDebug, kTraceTransport, _id,
+ "UdpSocket2ManagerWindows(%d)::AddSocketPrv()",_managerNumber);
+ if(!_init)
+ {
+ WEBRTC_TRACE(
+ kTraceError,
+ kTraceTransport,
+ _id,
+ "UdpSocket2ManagerWindows(%d)::AddSocketPrv() manager not\
+ initialized",
+ _managerNumber);
+ return false;
+ }
+ _pCrit->Enter();
+ if(s == NULL)
+ {
+ WEBRTC_TRACE(
+ kTraceError,
+ kTraceTransport,
+ _id,
+ "UdpSocket2ManagerWindows(%d)::AddSocketPrv() socket == NULL",
+ _managerNumber);
+ _pCrit->Leave();
+ return false;
+ }
+ if(s->GetFd() == NULL || s->GetFd() == INVALID_SOCKET)
+ {
+ WEBRTC_TRACE(
+ kTraceError,
+ kTraceTransport,
+ _id,
+ "UdpSocket2ManagerWindows(%d)::AddSocketPrv() socket->GetFd() ==\
+ %d",
+ _managerNumber,
+ (int32_t)s->GetFd());
+ _pCrit->Leave();
+ return false;
+
+ }
+ _ioCompletionHandle = CreateIoCompletionPort((HANDLE)s->GetFd(),
+ _ioCompletionHandle,
+ (ULONG_PTR)(s), 0);
+ if(_ioCompletionHandle == NULL)
+ {
+ int32_t error = GetLastError();
+ WEBRTC_TRACE(
+ kTraceError,
+ kTraceTransport,
+ _id,
+ "UdpSocket2ManagerWindows(%d)::AddSocketPrv() Error adding to IO\
+ completion: %d",
+ _managerNumber,
+ error);
+ _pCrit->Leave();
+ return false;
+ }
+ _numActiveSockets++;
+ _pCrit->Leave();
+ return true;
+}
+bool UdpSocket2ManagerWindows::RemoveSocketPrv(UdpSocket2Windows* s)
+{
+ if(!_init)
+ {
+ return false;
+ }
+ _pCrit->Enter();
+ _numActiveSockets--;
+ if(_numActiveSockets == 0)
+ {
+ _event->Set();
+ }
+ _pCrit->Leave();
+ return true;
+}
+
+PerIoContext* UdpSocket2ManagerWindows::PopIoContext()
+{
+ if(!_init)
+ {
+ return NULL;
+ }
+
+ PerIoContext* pIoC = NULL;
+ if(!_stopped)
+ {
+ pIoC = _ioContextPool.PopIoContext();
+ }else
+ {
+ WEBRTC_TRACE(
+ kTraceError,
+ kTraceTransport,
+ _id,
+ "UdpSocket2ManagerWindows(%d)::PopIoContext() Manager Not started",
+ _managerNumber);
+ }
+ return pIoC;
+}
+
+int32_t UdpSocket2ManagerWindows::PushIoContext(PerIoContext* pIoContext)
+{
+ return _ioContextPool.PushIoContext(pIoContext);
+}
+
+IoContextPool::IoContextPool()
+ : _pListHead(NULL),
+ _init(false),
+ _size(0),
+ _inUse(0)
+{
+}
+
+IoContextPool::~IoContextPool()
+{
+ Free();
+ assert(_size.Value() == 0);
+ AlignedFree(_pListHead);
+}
+
+int32_t IoContextPool::Init(uint32_t /*increaseSize*/)
+{
+ if(_init)
+ {
+ return 0;
+ }
+
+ _pListHead = (PSLIST_HEADER)AlignedMalloc(sizeof(SLIST_HEADER),
+ MEMORY_ALLOCATION_ALIGNMENT);
+ if(_pListHead == NULL)
+ {
+ return -1;
+ }
+ InitializeSListHead(_pListHead);
+ _init = true;
+ return 0;
+}
+
+PerIoContext* IoContextPool::PopIoContext()
+{
+ if(!_init)
+ {
+ return NULL;
+ }
+
+ PSLIST_ENTRY pListEntry = InterlockedPopEntrySList(_pListHead);
+ if(pListEntry == NULL)
+ {
+ IoContextPoolItem* item = (IoContextPoolItem*)
+ AlignedMalloc(
+ sizeof(IoContextPoolItem),
+ MEMORY_ALLOCATION_ALIGNMENT);
+ if(item == NULL)
+ {
+ return NULL;
+ }
+ memset(&item->payload.ioContext,0,sizeof(PerIoContext));
+ item->payload.base = item;
+ pListEntry = &(item->itemEntry);
+ ++_size;
+ }
+ ++_inUse;
+ return &((IoContextPoolItem*)pListEntry)->payload.ioContext;
+}
+
+int32_t IoContextPool::PushIoContext(PerIoContext* pIoContext)
+{
+ // TODO (hellner): Overlapped IO should be completed at this point. Perhaps
+ // add an assert?
+ const bool overlappedIOCompleted = HasOverlappedIoCompleted(
+ (LPOVERLAPPED)pIoContext);
+
+ IoContextPoolItem* item = ((IoContextPoolItemPayload*)pIoContext)->base;
+
+ const int32_t usedItems = --_inUse;
+ const int32_t totalItems = _size.Value();
+ const int32_t freeItems = totalItems - usedItems;
+ if(freeItems < 0)
+ {
+ assert(false);
+ AlignedFree(item);
+ return -1;
+ }
+ if((freeItems >= totalItems>>1) &&
+ overlappedIOCompleted)
+ {
+ AlignedFree(item);
+ --_size;
+ return 0;
+ }
+ InterlockedPushEntrySList(_pListHead, &(item->itemEntry));
+ return 0;
+}
+
+int32_t IoContextPool::Free()
+{
+ if(!_init)
+ {
+ return 0;
+ }
+
+ int32_t itemsFreed = 0;
+ PSLIST_ENTRY pListEntry = InterlockedPopEntrySList(_pListHead);
+ while(pListEntry != NULL)
+ {
+ IoContextPoolItem* item = ((IoContextPoolItem*)pListEntry);
+ AlignedFree(item);
+ --_size;
+ itemsFreed++;
+ pListEntry = InterlockedPopEntrySList(_pListHead);
+ }
+ return itemsFreed;
+}
+
+int32_t UdpSocket2WorkerWindows::_numOfWorkers = 0;
+
+UdpSocket2WorkerWindows::UdpSocket2WorkerWindows(HANDLE ioCompletionHandle)
+ : _ioCompletionHandle(ioCompletionHandle),
+ _pThread(Run, this, "UdpSocket2ManagerWindows_thread"),
+ _init(false) {
+ _workerNumber = _numOfWorkers++;
+ WEBRTC_TRACE(kTraceMemory, kTraceTransport, -1,
+ "UdpSocket2WorkerWindows created");
+}
+
+UdpSocket2WorkerWindows::~UdpSocket2WorkerWindows()
+{
+ WEBRTC_TRACE(kTraceMemory, kTraceTransport, -1,
+ "UdpSocket2WorkerWindows deleted");
+}
+
+bool UdpSocket2WorkerWindows::Start()
+{
+ WEBRTC_TRACE(kTraceStateInfo, kTraceTransport, -1,
+ "Start UdpSocket2WorkerWindows");
+ _pThread.Start();
+
+ _pThread.SetPriority(rtc::kRealtimePriority);
+ return true;
+}
+
+bool UdpSocket2WorkerWindows::Stop()
+{
+ WEBRTC_TRACE(kTraceStateInfo, kTraceTransport, -1,
+ "Stop UdpSocket2WorkerWindows");
+ _pThread.Stop();
+ return true;
+}
+
+int32_t UdpSocket2WorkerWindows::Init()
+{
+ _init = true;
+ return 0;
+}
+
+bool UdpSocket2WorkerWindows::Run(void* obj)
+{
+ UdpSocket2WorkerWindows* pWorker =
+ static_cast<UdpSocket2WorkerWindows*>(obj);
+ return pWorker->Process();
+}
+
+// Process should always return true. Stopping the worker threads is done in
+// the UdpSocket2ManagerWindows::StopWorkerThreads() function.
+bool UdpSocket2WorkerWindows::Process()
+{
+ int32_t success = 0;
+ DWORD ioSize = 0;
+ UdpSocket2Windows* pSocket = NULL;
+ PerIoContext* pIOContext = 0;
+ OVERLAPPED* pOverlapped = 0;
+ success = GetQueuedCompletionStatus(_ioCompletionHandle,
+ &ioSize,
+ (ULONG_PTR*)&pSocket, &pOverlapped, 200);
+
+ uint32_t error = 0;
+ if(!success)
+ {
+ error = GetLastError();
+ if(error == WAIT_TIMEOUT)
+ {
+ return true;
+ }
+ // This may happen if e.g. PostQueuedCompletionStatus() has been called.
+ // The IO context still needs to be reclaimed or re-used which is done
+ // in UdpSocket2Windows::IOCompleted(..).
+ }
+ if(pSocket == NULL)
+ {
+ WEBRTC_TRACE(
+ kTraceDebug,
+ kTraceTransport,
+ -1,
+ "UdpSocket2WorkerWindows(%d)::Process(), pSocket == 0, end thread",
+ _workerNumber);
+ return true;
+ }
+ pIOContext = (PerIoContext*)pOverlapped;
+ pSocket->IOCompleted(pIOContext,ioSize,error);
+ return true;
+}
+
+} // namespace test
+} // namespace webrtc
« no previous file with comments | « webrtc/test/channel_transport/udp_socket2_manager_win.h ('k') | webrtc/test/channel_transport/udp_socket2_win.h » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698