OLD | NEW |
1 /* | 1 /* |
2 * Copyright 2004 The WebRTC Project Authors. All rights reserved. | 2 * Copyright 2004 The WebRTC Project Authors. All rights reserved. |
3 * | 3 * |
4 * Use of this source code is governed by a BSD-style license | 4 * Use of this source code is governed by a BSD-style license |
5 * that can be found in the LICENSE file in the root of the source | 5 * that can be found in the LICENSE file in the root of the source |
6 * tree. An additional intellectual property rights grant can be found | 6 * tree. An additional intellectual property rights grant can be found |
7 * in the file PATENTS. All contributing project authors may | 7 * in the file PATENTS. All contributing project authors may |
8 * be found in the AUTHORS file in the root of the source tree. | 8 * be found in the AUTHORS file in the root of the source tree. |
9 */ | 9 */ |
10 #include "webrtc/base/physicalsocketserver.h" | 10 #include "webrtc/base/physicalsocketserver.h" |
11 | 11 |
12 #if defined(_MSC_VER) && _MSC_VER < 1300 | 12 #if defined(_MSC_VER) && _MSC_VER < 1300 |
13 #pragma warning(disable:4786) | 13 #pragma warning(disable:4786) |
14 #endif | 14 #endif |
15 | 15 |
16 #ifdef MEMORY_SANITIZER | 16 #ifdef MEMORY_SANITIZER |
17 #include <sanitizer/msan_interface.h> | 17 #include <sanitizer/msan_interface.h> |
18 #endif | 18 #endif |
19 | 19 |
20 #if defined(WEBRTC_POSIX) | 20 #if defined(WEBRTC_POSIX) |
21 #include <string.h> | 21 #include <string.h> |
22 #include <errno.h> | 22 #include <errno.h> |
23 #include <fcntl.h> | 23 #include <fcntl.h> |
| 24 #if defined(WEBRTC_USE_EPOLL) |
| 25 // "poll" will be used to wait for the signal dispatcher. |
| 26 #include <poll.h> |
| 27 #endif |
24 #include <sys/ioctl.h> | 28 #include <sys/ioctl.h> |
25 #include <sys/time.h> | 29 #include <sys/time.h> |
26 #include <sys/select.h> | 30 #include <sys/select.h> |
27 #include <unistd.h> | 31 #include <unistd.h> |
28 #include <signal.h> | 32 #include <signal.h> |
29 #endif | 33 #endif |
30 | 34 |
31 #if defined(WEBRTC_WIN) | 35 #if defined(WEBRTC_WIN) |
32 #define WIN32_LEAN_AND_MEAN | 36 #define WIN32_LEAN_AND_MEAN |
33 #include <windows.h> | 37 #include <windows.h> |
(...skipping 39 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
73 | 77 |
74 int64_t GetSocketRecvTimestamp(int socket) { | 78 int64_t GetSocketRecvTimestamp(int socket) { |
75 return -1; | 79 return -1; |
76 } | 80 } |
77 #endif | 81 #endif |
78 | 82 |
79 #if defined(WEBRTC_WIN) | 83 #if defined(WEBRTC_WIN) |
80 typedef char* SockOptArg; | 84 typedef char* SockOptArg; |
81 #endif | 85 #endif |
82 | 86 |
| 87 #if defined(WEBRTC_USE_EPOLL) |
| 88 // POLLRDHUP / EPOLLRDHUP are only defined starting with Linux 2.6.17. |
| 89 #if !defined(POLLRDHUP) |
| 90 #define POLLRDHUP 0x2000 |
| 91 #endif |
| 92 #if !defined(EPOLLRDHUP) |
| 93 #define EPOLLRDHUP 0x2000 |
| 94 #endif |
| 95 #endif |
| 96 |
83 namespace rtc { | 97 namespace rtc { |
84 | 98 |
85 std::unique_ptr<SocketServer> SocketServer::CreateDefault() { | 99 std::unique_ptr<SocketServer> SocketServer::CreateDefault() { |
86 #if defined(__native_client__) | 100 #if defined(__native_client__) |
87 return std::unique_ptr<SocketServer>(new rtc::NullSocketServer); | 101 return std::unique_ptr<SocketServer>(new rtc::NullSocketServer); |
88 #else | 102 #else |
89 return std::unique_ptr<SocketServer>(new rtc::PhysicalSocketServer); | 103 return std::unique_ptr<SocketServer>(new rtc::PhysicalSocketServer); |
90 #endif | 104 #endif |
91 } | 105 } |
92 | 106 |
(...skipping 665 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
758 } | 772 } |
759 if (((ff & DE_CLOSE) != 0) && (id_ == cache_id)) { | 773 if (((ff & DE_CLOSE) != 0) && (id_ == cache_id)) { |
760 signal_close_ = true; | 774 signal_close_ = true; |
761 signal_err_ = err; | 775 signal_err_ = err; |
762 } | 776 } |
763 } | 777 } |
764 | 778 |
765 #elif defined(WEBRTC_POSIX) | 779 #elif defined(WEBRTC_POSIX) |
766 | 780 |
767 void SocketDispatcher::OnEvent(uint32_t ff, int err) { | 781 void SocketDispatcher::OnEvent(uint32_t ff, int err) { |
| 782 #if defined(WEBRTC_USE_EPOLL) |
| 783 // Remember currently enabled events so we can combine multiple changes |
| 784 // into one update call later. |
| 785 // The signal handlers might re-enable events disabled here, so we can't |
| 786 // keep a list of events to disable at the end of the method. This list |
| 787 // would not be updated with the events enabled by the signal handlers. |
| 788 StartBatchedEventUpdates(); |
| 789 #endif |
768 // Make sure we deliver connect/accept first. Otherwise, consumers may see | 790 // Make sure we deliver connect/accept first. Otherwise, consumers may see |
769 // something like a READ followed by a CONNECT, which would be odd. | 791 // something like a READ followed by a CONNECT, which would be odd. |
770 if ((ff & DE_CONNECT) != 0) { | 792 if ((ff & DE_CONNECT) != 0) { |
771 DisableEvents(DE_CONNECT); | 793 DisableEvents(DE_CONNECT); |
772 SignalConnectEvent(this); | 794 SignalConnectEvent(this); |
773 } | 795 } |
774 if ((ff & DE_ACCEPT) != 0) { | 796 if ((ff & DE_ACCEPT) != 0) { |
775 DisableEvents(DE_ACCEPT); | 797 DisableEvents(DE_ACCEPT); |
776 SignalReadEvent(this); | 798 SignalReadEvent(this); |
777 } | 799 } |
778 if ((ff & DE_READ) != 0) { | 800 if ((ff & DE_READ) != 0) { |
779 DisableEvents(DE_READ); | 801 DisableEvents(DE_READ); |
780 SignalReadEvent(this); | 802 SignalReadEvent(this); |
781 } | 803 } |
782 if ((ff & DE_WRITE) != 0) { | 804 if ((ff & DE_WRITE) != 0) { |
783 DisableEvents(DE_WRITE); | 805 DisableEvents(DE_WRITE); |
784 SignalWriteEvent(this); | 806 SignalWriteEvent(this); |
785 } | 807 } |
786 if ((ff & DE_CLOSE) != 0) { | 808 if ((ff & DE_CLOSE) != 0) { |
787 // The socket is now dead to us, so stop checking it. | 809 // The socket is now dead to us, so stop checking it. |
788 SetEnabledEvents(0); | 810 SetEnabledEvents(0); |
789 SignalCloseEvent(this, err); | 811 SignalCloseEvent(this, err); |
790 } | 812 } |
| 813 #if defined(WEBRTC_USE_EPOLL) |
| 814 FinishBatchedEventUpdates(); |
| 815 #endif |
791 } | 816 } |
792 | 817 |
793 #endif // WEBRTC_POSIX | 818 #endif // WEBRTC_POSIX |
794 | 819 |
| 820 #if defined(WEBRTC_USE_EPOLL) |
| 821 |
| 822 static int GetEpollEvents(uint32_t ff) { |
| 823 int events = 0; |
| 824 if (ff & (DE_READ | DE_ACCEPT)) { |
| 825 events |= EPOLLIN; |
| 826 } |
| 827 if (ff & (DE_WRITE | DE_CONNECT)) { |
| 828 events |= EPOLLOUT; |
| 829 } |
| 830 return events; |
| 831 } |
| 832 |
| 833 void SocketDispatcher::StartBatchedEventUpdates() { |
| 834 RTC_DCHECK_EQ(saved_enabled_events_, -1); |
| 835 saved_enabled_events_ = enabled_events(); |
| 836 } |
| 837 |
| 838 void SocketDispatcher::FinishBatchedEventUpdates() { |
| 839 RTC_DCHECK_NE(saved_enabled_events_, -1); |
| 840 uint8_t old_events = static_cast<uint8_t>(saved_enabled_events_); |
| 841 saved_enabled_events_ = -1; |
| 842 MaybeUpdateDispatcher(old_events); |
| 843 } |
| 844 |
| 845 void SocketDispatcher::MaybeUpdateDispatcher(uint8_t old_events) { |
| 846 if (GetEpollEvents(enabled_events()) != GetEpollEvents(old_events) && |
| 847 saved_enabled_events_ == -1) { |
| 848 ss_->Update(this); |
| 849 } |
| 850 } |
| 851 |
| 852 void SocketDispatcher::SetEnabledEvents(uint8_t events) { |
| 853 uint8_t old_events = enabled_events(); |
| 854 PhysicalSocket::SetEnabledEvents(events); |
| 855 MaybeUpdateDispatcher(old_events); |
| 856 } |
| 857 |
| 858 void SocketDispatcher::EnableEvents(uint8_t events) { |
| 859 uint8_t old_events = enabled_events(); |
| 860 PhysicalSocket::EnableEvents(events); |
| 861 MaybeUpdateDispatcher(old_events); |
| 862 } |
| 863 |
| 864 void SocketDispatcher::DisableEvents(uint8_t events) { |
| 865 uint8_t old_events = enabled_events(); |
| 866 PhysicalSocket::DisableEvents(events); |
| 867 MaybeUpdateDispatcher(old_events); |
| 868 } |
| 869 |
| 870 #endif // WEBRTC_USE_EPOLL |
| 871 |
795 int SocketDispatcher::Close() { | 872 int SocketDispatcher::Close() { |
796 if (s_ == INVALID_SOCKET) | 873 if (s_ == INVALID_SOCKET) |
797 return 0; | 874 return 0; |
798 | 875 |
799 #if defined(WEBRTC_WIN) | 876 #if defined(WEBRTC_WIN) |
800 id_ = 0; | 877 id_ = 0; |
801 signal_close_ = false; | 878 signal_close_ = false; |
802 #endif | 879 #endif |
803 ss_->Remove(this); | 880 ss_->Remove(this); |
804 return PhysicalSocket::Close(); | 881 return PhysicalSocket::Close(); |
(...skipping 308 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
1113 if (pf_) | 1190 if (pf_) |
1114 *pf_ = false; | 1191 *pf_ = false; |
1115 } | 1192 } |
1116 | 1193 |
1117 private: | 1194 private: |
1118 bool *pf_; | 1195 bool *pf_; |
1119 }; | 1196 }; |
1120 | 1197 |
1121 PhysicalSocketServer::PhysicalSocketServer() | 1198 PhysicalSocketServer::PhysicalSocketServer() |
1122 : fWait_(false) { | 1199 : fWait_(false) { |
| 1200 #if defined(WEBRTC_USE_EPOLL) |
| 1201 // Since Linux 2.6.8, the size argument is ignored, but must be greater than |
| 1202 // zero. Before that the size served as hint to the kernel for the amount of |
| 1203 // space to initially allocate in internal data structures. |
| 1204 epoll_fd_ = epoll_create(FD_SETSIZE); |
| 1205 if (epoll_fd_ == -1) { |
| 1206 // Not an error, will fall back to "select" below. |
| 1207 LOG_E(LS_WARNING, EN, errno) << "epoll_create"; |
| 1208 epoll_fd_ = INVALID_SOCKET; |
| 1209 } |
| 1210 #endif |
1123 signal_wakeup_ = new Signaler(this, &fWait_); | 1211 signal_wakeup_ = new Signaler(this, &fWait_); |
1124 #if defined(WEBRTC_WIN) | 1212 #if defined(WEBRTC_WIN) |
1125 socket_ev_ = WSACreateEvent(); | 1213 socket_ev_ = WSACreateEvent(); |
1126 #endif | 1214 #endif |
1127 } | 1215 } |
1128 | 1216 |
1129 PhysicalSocketServer::~PhysicalSocketServer() { | 1217 PhysicalSocketServer::~PhysicalSocketServer() { |
1130 #if defined(WEBRTC_WIN) | 1218 #if defined(WEBRTC_WIN) |
1131 WSACloseEvent(socket_ev_); | 1219 WSACloseEvent(socket_ev_); |
1132 #endif | 1220 #endif |
1133 #if defined(WEBRTC_POSIX) | 1221 #if defined(WEBRTC_POSIX) |
1134 signal_dispatcher_.reset(); | 1222 signal_dispatcher_.reset(); |
1135 #endif | 1223 #endif |
1136 delete signal_wakeup_; | 1224 delete signal_wakeup_; |
| 1225 #if defined(WEBRTC_USE_EPOLL) |
| 1226 if (epoll_fd_ != INVALID_SOCKET) { |
| 1227 close(epoll_fd_); |
| 1228 } |
| 1229 #endif |
1137 RTC_DCHECK(dispatchers_.empty()); | 1230 RTC_DCHECK(dispatchers_.empty()); |
1138 } | 1231 } |
1139 | 1232 |
1140 void PhysicalSocketServer::WakeUp() { | 1233 void PhysicalSocketServer::WakeUp() { |
1141 signal_wakeup_->Signal(); | 1234 signal_wakeup_->Signal(); |
1142 } | 1235 } |
1143 | 1236 |
1144 Socket* PhysicalSocketServer::CreateSocket(int type) { | 1237 Socket* PhysicalSocketServer::CreateSocket(int type) { |
1145 return CreateSocket(AF_INET, type); | 1238 return CreateSocket(AF_INET, type); |
1146 } | 1239 } |
(...skipping 27 matching lines...) Expand all Loading... |
1174 if (dispatcher->Initialize()) { | 1267 if (dispatcher->Initialize()) { |
1175 return dispatcher; | 1268 return dispatcher; |
1176 } else { | 1269 } else { |
1177 delete dispatcher; | 1270 delete dispatcher; |
1178 return nullptr; | 1271 return nullptr; |
1179 } | 1272 } |
1180 } | 1273 } |
1181 | 1274 |
1182 void PhysicalSocketServer::Add(Dispatcher *pdispatcher) { | 1275 void PhysicalSocketServer::Add(Dispatcher *pdispatcher) { |
1183 CritScope cs(&crit_); | 1276 CritScope cs(&crit_); |
1184 // Prevent duplicates. This can cause dead dispatchers to stick around. | 1277 if (processing_dispatchers_) { |
1185 DispatcherList::iterator pos = std::find(dispatchers_.begin(), | 1278 // A dispatcher is being added while a "Wait" call is processing the |
1186 dispatchers_.end(), | 1279 // list of socket events. |
1187 pdispatcher); | 1280 // Defer adding to "dispatchers_" set until processing is done to avoid |
1188 if (pos != dispatchers_.end()) | 1281 // invalidating the iterator in "Wait". |
1189 return; | 1282 pending_remove_dispatchers_.erase(pdispatcher); |
1190 dispatchers_.push_back(pdispatcher); | 1283 pending_add_dispatchers_.insert(pdispatcher); |
| 1284 } else { |
| 1285 dispatchers_.insert(pdispatcher); |
| 1286 } |
| 1287 #if defined(WEBRTC_USE_EPOLL) |
| 1288 if (epoll_fd_ != INVALID_SOCKET) { |
| 1289 AddEpoll(pdispatcher); |
| 1290 } |
| 1291 #endif // WEBRTC_USE_EPOLL |
1191 } | 1292 } |
1192 | 1293 |
1193 void PhysicalSocketServer::Remove(Dispatcher *pdispatcher) { | 1294 void PhysicalSocketServer::Remove(Dispatcher *pdispatcher) { |
1194 CritScope cs(&crit_); | 1295 CritScope cs(&crit_); |
1195 DispatcherList::iterator pos = std::find(dispatchers_.begin(), | 1296 if (processing_dispatchers_) { |
1196 dispatchers_.end(), | 1297 // A dispatcher is being removed while a "Wait" call is processing the |
1197 pdispatcher); | 1298 // list of socket events. |
1198 // We silently ignore duplicate calls to Add, so we should silently ignore | 1299 // Defer removal from "dispatchers_" set until processing is done to avoid |
1199 // the (expected) symmetric calls to Remove. Note that this may still hide | 1300 // invalidating the iterator in "Wait". |
1200 // a real issue, so we at least log a warning about it. | 1301 if (!pending_add_dispatchers_.erase(pdispatcher) && |
1201 if (pos == dispatchers_.end()) { | 1302 dispatchers_.find(pdispatcher) == dispatchers_.end()) { |
| 1303 LOG(LS_WARNING) << "PhysicalSocketServer asked to remove a unknown " |
| 1304 << "dispatcher, potentially from a duplicate call to " |
| 1305 << "Add."; |
| 1306 return; |
| 1307 } |
| 1308 |
| 1309 pending_remove_dispatchers_.insert(pdispatcher); |
| 1310 } else if (!dispatchers_.erase(pdispatcher)) { |
1202 LOG(LS_WARNING) << "PhysicalSocketServer asked to remove a unknown " | 1311 LOG(LS_WARNING) << "PhysicalSocketServer asked to remove a unknown " |
1203 << "dispatcher, potentially from a duplicate call to Add."; | 1312 << "dispatcher, potentially from a duplicate call to Add."; |
1204 return; | 1313 return; |
1205 } | 1314 } |
1206 size_t index = pos - dispatchers_.begin(); | 1315 #if defined(WEBRTC_USE_EPOLL) |
1207 dispatchers_.erase(pos); | 1316 if (epoll_fd_ != INVALID_SOCKET) { |
1208 for (IteratorList::iterator it = iterators_.begin(); it != iterators_.end(); | 1317 RemoveEpoll(pdispatcher); |
1209 ++it) { | 1318 } |
1210 if (index < **it) { | 1319 #endif // WEBRTC_USE_EPOLL |
1211 --**it; | 1320 } |
| 1321 |
| 1322 void PhysicalSocketServer::Update(Dispatcher* pdispatcher) { |
| 1323 #if defined(WEBRTC_USE_EPOLL) |
| 1324 if (epoll_fd_ == INVALID_SOCKET) { |
| 1325 return; |
| 1326 } |
| 1327 |
| 1328 CritScope cs(&crit_); |
| 1329 if (dispatchers_.find(pdispatcher) == dispatchers_.end()) { |
| 1330 return; |
| 1331 } |
| 1332 |
| 1333 UpdateEpoll(pdispatcher); |
| 1334 #endif |
| 1335 } |
| 1336 |
| 1337 void PhysicalSocketServer::AddRemovePendingDispatchers() { |
| 1338 if (!pending_add_dispatchers_.empty()) { |
| 1339 for (Dispatcher* pdispatcher : pending_add_dispatchers_) { |
| 1340 dispatchers_.insert(pdispatcher); |
1212 } | 1341 } |
| 1342 pending_add_dispatchers_.clear(); |
| 1343 } |
| 1344 |
| 1345 if (!pending_remove_dispatchers_.empty()) { |
| 1346 for (Dispatcher* pdispatcher : pending_remove_dispatchers_) { |
| 1347 dispatchers_.erase(pdispatcher); |
| 1348 } |
| 1349 pending_remove_dispatchers_.clear(); |
1213 } | 1350 } |
1214 } | 1351 } |
1215 | 1352 |
1216 #if defined(WEBRTC_POSIX) | 1353 #if defined(WEBRTC_POSIX) |
| 1354 |
1217 bool PhysicalSocketServer::Wait(int cmsWait, bool process_io) { | 1355 bool PhysicalSocketServer::Wait(int cmsWait, bool process_io) { |
| 1356 #if defined(WEBRTC_USE_EPOLL) |
| 1357 // We don't keep a dedicated "epoll" descriptor containing only the non-IO |
| 1358 // (i.e. signaling) dispatcher, so "poll" will be used instead of the default |
| 1359 // "select" to support sockets larger than FD_SETSIZE. |
| 1360 if (!process_io) { |
| 1361 return WaitPoll(cmsWait, signal_wakeup_); |
| 1362 } else if (epoll_fd_ != INVALID_SOCKET) { |
| 1363 return WaitEpoll(cmsWait); |
| 1364 } |
| 1365 #endif |
| 1366 return WaitSelect(cmsWait, process_io); |
| 1367 } |
| 1368 |
| 1369 static void ProcessEvents(Dispatcher* dispatcher, |
| 1370 bool readable, |
| 1371 bool writable, |
| 1372 bool check_error) { |
| 1373 int errcode = 0; |
| 1374 // TODO(pthatcher): Should we set errcode if getsockopt fails? |
| 1375 if (check_error) { |
| 1376 socklen_t len = sizeof(errcode); |
| 1377 ::getsockopt(dispatcher->GetDescriptor(), SOL_SOCKET, SO_ERROR, &errcode, |
| 1378 &len); |
| 1379 } |
| 1380 |
| 1381 uint32_t ff = 0; |
| 1382 |
| 1383 // Check readable descriptors. If we're waiting on an accept, signal |
| 1384 // that. Otherwise we're waiting for data, check to see if we're |
| 1385 // readable or really closed. |
| 1386 // TODO(pthatcher): Only peek at TCP descriptors. |
| 1387 if (readable) { |
| 1388 if (dispatcher->GetRequestedEvents() & DE_ACCEPT) { |
| 1389 ff |= DE_ACCEPT; |
| 1390 } else if (errcode || dispatcher->IsDescriptorClosed()) { |
| 1391 ff |= DE_CLOSE; |
| 1392 } else { |
| 1393 ff |= DE_READ; |
| 1394 } |
| 1395 } |
| 1396 |
| 1397 // Check writable descriptors. If we're waiting on a connect, detect |
| 1398 // success versus failure by the reaped error code. |
| 1399 if (writable) { |
| 1400 if (dispatcher->GetRequestedEvents() & DE_CONNECT) { |
| 1401 if (!errcode) { |
| 1402 ff |= DE_CONNECT; |
| 1403 } else { |
| 1404 ff |= DE_CLOSE; |
| 1405 } |
| 1406 } else { |
| 1407 ff |= DE_WRITE; |
| 1408 } |
| 1409 } |
| 1410 |
| 1411 // Tell the descriptor about the event. |
| 1412 if (ff != 0) { |
| 1413 dispatcher->OnPreEvent(ff); |
| 1414 dispatcher->OnEvent(ff, errcode); |
| 1415 } |
| 1416 } |
| 1417 |
| 1418 bool PhysicalSocketServer::WaitSelect(int cmsWait, bool process_io) { |
1218 // Calculate timing information | 1419 // Calculate timing information |
1219 | 1420 |
1220 struct timeval* ptvWait = nullptr; | 1421 struct timeval* ptvWait = nullptr; |
1221 struct timeval tvWait; | 1422 struct timeval tvWait; |
1222 struct timeval tvStop; | 1423 struct timeval tvStop; |
1223 if (cmsWait != kForever) { | 1424 if (cmsWait != kForever) { |
1224 // Calculate wait timeval | 1425 // Calculate wait timeval |
1225 tvWait.tv_sec = cmsWait / 1000; | 1426 tvWait.tv_sec = cmsWait / 1000; |
1226 tvWait.tv_usec = (cmsWait % 1000) * 1000; | 1427 tvWait.tv_usec = (cmsWait % 1000) * 1000; |
1227 ptvWait = &tvWait; | 1428 ptvWait = &tvWait; |
(...skipping 22 matching lines...) Expand all Loading... |
1250 __msan_unpoison(&fdsRead, sizeof(fdsRead)); | 1451 __msan_unpoison(&fdsRead, sizeof(fdsRead)); |
1251 __msan_unpoison(&fdsWrite, sizeof(fdsWrite)); | 1452 __msan_unpoison(&fdsWrite, sizeof(fdsWrite)); |
1252 #endif | 1453 #endif |
1253 | 1454 |
1254 fWait_ = true; | 1455 fWait_ = true; |
1255 | 1456 |
1256 while (fWait_) { | 1457 while (fWait_) { |
1257 int fdmax = -1; | 1458 int fdmax = -1; |
1258 { | 1459 { |
1259 CritScope cr(&crit_); | 1460 CritScope cr(&crit_); |
1260 for (size_t i = 0; i < dispatchers_.size(); ++i) { | 1461 // TODO(jbauch): Support re-entrant waiting. |
| 1462 RTC_DCHECK(!processing_dispatchers_); |
| 1463 for (Dispatcher* pdispatcher : dispatchers_) { |
1261 // Query dispatchers for read and write wait state | 1464 // Query dispatchers for read and write wait state |
1262 Dispatcher *pdispatcher = dispatchers_[i]; | |
1263 RTC_DCHECK(pdispatcher); | 1465 RTC_DCHECK(pdispatcher); |
1264 if (!process_io && (pdispatcher != signal_wakeup_)) | 1466 if (!process_io && (pdispatcher != signal_wakeup_)) |
1265 continue; | 1467 continue; |
1266 int fd = pdispatcher->GetDescriptor(); | 1468 int fd = pdispatcher->GetDescriptor(); |
| 1469 // "select"ing a file descriptor that is equal to or larger than |
| 1470 // FD_SETSIZE will result in undefined behavior. |
| 1471 RTC_DCHECK_LT(fd, FD_SETSIZE); |
1267 if (fd > fdmax) | 1472 if (fd > fdmax) |
1268 fdmax = fd; | 1473 fdmax = fd; |
1269 | 1474 |
1270 uint32_t ff = pdispatcher->GetRequestedEvents(); | 1475 uint32_t ff = pdispatcher->GetRequestedEvents(); |
1271 if (ff & (DE_READ | DE_ACCEPT)) | 1476 if (ff & (DE_READ | DE_ACCEPT)) |
1272 FD_SET(fd, &fdsRead); | 1477 FD_SET(fd, &fdsRead); |
1273 if (ff & (DE_WRITE | DE_CONNECT)) | 1478 if (ff & (DE_WRITE | DE_CONNECT)) |
1274 FD_SET(fd, &fdsWrite); | 1479 FD_SET(fd, &fdsWrite); |
1275 } | 1480 } |
1276 } | 1481 } |
(...skipping 13 matching lines...) Expand all Loading... |
1290 // Else ignore the error and keep going. If this EINTR was for one of the | 1495 // Else ignore the error and keep going. If this EINTR was for one of the |
1291 // signals managed by this PhysicalSocketServer, the | 1496 // signals managed by this PhysicalSocketServer, the |
1292 // PosixSignalDeliveryDispatcher will be in the signaled state in the next | 1497 // PosixSignalDeliveryDispatcher will be in the signaled state in the next |
1293 // iteration. | 1498 // iteration. |
1294 } else if (n == 0) { | 1499 } else if (n == 0) { |
1295 // If timeout, return success | 1500 // If timeout, return success |
1296 return true; | 1501 return true; |
1297 } else { | 1502 } else { |
1298 // We have signaled descriptors | 1503 // We have signaled descriptors |
1299 CritScope cr(&crit_); | 1504 CritScope cr(&crit_); |
1300 for (size_t i = 0; i < dispatchers_.size(); ++i) { | 1505 processing_dispatchers_ = true; |
1301 Dispatcher *pdispatcher = dispatchers_[i]; | 1506 for (Dispatcher* pdispatcher : dispatchers_) { |
1302 int fd = pdispatcher->GetDescriptor(); | 1507 int fd = pdispatcher->GetDescriptor(); |
1303 uint32_t ff = 0; | |
1304 int errcode = 0; | |
1305 | 1508 |
1306 // Reap any error code, which can be signaled through reads or writes. | 1509 bool readable = FD_ISSET(fd, &fdsRead); |
1307 // TODO(pthatcher): Should we set errcode if getsockopt fails? | 1510 if (readable) { |
1308 if (FD_ISSET(fd, &fdsRead) || FD_ISSET(fd, &fdsWrite)) { | 1511 FD_CLR(fd, &fdsRead); |
1309 socklen_t len = sizeof(errcode); | |
1310 ::getsockopt(fd, SOL_SOCKET, SO_ERROR, &errcode, &len); | |
1311 } | 1512 } |
1312 | 1513 |
1313 // Check readable descriptors. If we're waiting on an accept, signal | 1514 bool writable = FD_ISSET(fd, &fdsWrite); |
1314 // that. Otherwise we're waiting for data, check to see if we're | 1515 if (writable) { |
1315 // readable or really closed. | 1516 FD_CLR(fd, &fdsWrite); |
1316 // TODO(pthatcher): Only peek at TCP descriptors. | |
1317 if (FD_ISSET(fd, &fdsRead)) { | |
1318 FD_CLR(fd, &fdsRead); | |
1319 if (pdispatcher->GetRequestedEvents() & DE_ACCEPT) { | |
1320 ff |= DE_ACCEPT; | |
1321 } else if (errcode || pdispatcher->IsDescriptorClosed()) { | |
1322 ff |= DE_CLOSE; | |
1323 } else { | |
1324 ff |= DE_READ; | |
1325 } | |
1326 } | 1517 } |
1327 | 1518 |
1328 // Check writable descriptors. If we're waiting on a connect, detect | 1519 // The error code can be signaled through reads or writes. |
1329 // success versus failure by the reaped error code. | 1520 ProcessEvents(pdispatcher, readable, writable, readable || writable); |
1330 if (FD_ISSET(fd, &fdsWrite)) { | 1521 } |
1331 FD_CLR(fd, &fdsWrite); | |
1332 if (pdispatcher->GetRequestedEvents() & DE_CONNECT) { | |
1333 if (!errcode) { | |
1334 ff |= DE_CONNECT; | |
1335 } else { | |
1336 ff |= DE_CLOSE; | |
1337 } | |
1338 } else { | |
1339 ff |= DE_WRITE; | |
1340 } | |
1341 } | |
1342 | 1522 |
1343 // Tell the descriptor about the event. | 1523 processing_dispatchers_ = false; |
1344 if (ff != 0) { | 1524 // Process deferred dispatchers that have been added/removed while the |
1345 pdispatcher->OnPreEvent(ff); | 1525 // events were handled above. |
1346 pdispatcher->OnEvent(ff, errcode); | 1526 AddRemovePendingDispatchers(); |
1347 } | |
1348 } | |
1349 } | 1527 } |
1350 | 1528 |
1351 // Recalc the time remaining to wait. Doing it here means it doesn't get | 1529 // Recalc the time remaining to wait. Doing it here means it doesn't get |
1352 // calced twice the first time through the loop | 1530 // calced twice the first time through the loop |
1353 if (ptvWait) { | 1531 if (ptvWait) { |
1354 ptvWait->tv_sec = 0; | 1532 ptvWait->tv_sec = 0; |
1355 ptvWait->tv_usec = 0; | 1533 ptvWait->tv_usec = 0; |
1356 struct timeval tvT; | 1534 struct timeval tvT; |
1357 gettimeofday(&tvT, nullptr); | 1535 gettimeofday(&tvT, nullptr); |
1358 if ((tvStop.tv_sec > tvT.tv_sec) | 1536 if ((tvStop.tv_sec > tvT.tv_sec) |
1359 || ((tvStop.tv_sec == tvT.tv_sec) | 1537 || ((tvStop.tv_sec == tvT.tv_sec) |
1360 && (tvStop.tv_usec > tvT.tv_usec))) { | 1538 && (tvStop.tv_usec > tvT.tv_usec))) { |
1361 ptvWait->tv_sec = tvStop.tv_sec - tvT.tv_sec; | 1539 ptvWait->tv_sec = tvStop.tv_sec - tvT.tv_sec; |
1362 ptvWait->tv_usec = tvStop.tv_usec - tvT.tv_usec; | 1540 ptvWait->tv_usec = tvStop.tv_usec - tvT.tv_usec; |
1363 if (ptvWait->tv_usec < 0) { | 1541 if (ptvWait->tv_usec < 0) { |
1364 RTC_DCHECK(ptvWait->tv_sec > 0); | 1542 RTC_DCHECK(ptvWait->tv_sec > 0); |
1365 ptvWait->tv_usec += 1000000; | 1543 ptvWait->tv_usec += 1000000; |
1366 ptvWait->tv_sec -= 1; | 1544 ptvWait->tv_sec -= 1; |
1367 } | 1545 } |
1368 } | 1546 } |
1369 } | 1547 } |
1370 } | 1548 } |
1371 | 1549 |
1372 return true; | 1550 return true; |
1373 } | 1551 } |
1374 | 1552 |
| 1553 #if defined(WEBRTC_USE_EPOLL) |
| 1554 |
| 1555 // Initial number of events to process with one call to "epoll_wait". |
| 1556 static const size_t kInitialEpollEvents = 128; |
| 1557 |
| 1558 // Maximum number of events to process with one call to "epoll_wait". |
| 1559 static const size_t kMaxEpollEvents = 8192; |
| 1560 |
| 1561 void PhysicalSocketServer::AddEpoll(Dispatcher* pdispatcher) { |
| 1562 RTC_DCHECK(epoll_fd_ != INVALID_SOCKET); |
| 1563 int fd = pdispatcher->GetDescriptor(); |
| 1564 RTC_DCHECK(fd != INVALID_SOCKET); |
| 1565 if (fd == INVALID_SOCKET) { |
| 1566 return; |
| 1567 } |
| 1568 |
| 1569 struct epoll_event event = {0}; |
| 1570 event.events = GetEpollEvents(pdispatcher->GetRequestedEvents()); |
| 1571 event.data.ptr = pdispatcher; |
| 1572 int err = epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, fd, &event); |
| 1573 RTC_DCHECK_EQ(err, 0); |
| 1574 if (err == -1) { |
| 1575 LOG_E(LS_ERROR, EN, errno) << "epoll_ctl EPOLL_CTL_ADD"; |
| 1576 } |
| 1577 } |
| 1578 |
| 1579 void PhysicalSocketServer::RemoveEpoll(Dispatcher* pdispatcher) { |
| 1580 RTC_DCHECK(epoll_fd_ != INVALID_SOCKET); |
| 1581 int fd = pdispatcher->GetDescriptor(); |
| 1582 RTC_DCHECK(fd != INVALID_SOCKET); |
| 1583 if (fd == INVALID_SOCKET) { |
| 1584 return; |
| 1585 } |
| 1586 |
| 1587 struct epoll_event event = {0}; |
| 1588 int err = epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, fd, &event); |
| 1589 RTC_DCHECK(err == 0 || errno == ENOENT); |
| 1590 if (err == -1) { |
| 1591 if (errno == ENOENT) { |
| 1592 // Socket has already been closed. |
| 1593 LOG_E(LS_VERBOSE, EN, errno) << "epoll_ctl EPOLL_CTL_DEL"; |
| 1594 } else { |
| 1595 LOG_E(LS_ERROR, EN, errno) << "epoll_ctl EPOLL_CTL_DEL"; |
| 1596 } |
| 1597 } |
| 1598 } |
| 1599 |
| 1600 void PhysicalSocketServer::UpdateEpoll(Dispatcher* pdispatcher) { |
| 1601 RTC_DCHECK(epoll_fd_ != INVALID_SOCKET); |
| 1602 int fd = pdispatcher->GetDescriptor(); |
| 1603 RTC_DCHECK(fd != INVALID_SOCKET); |
| 1604 if (fd == INVALID_SOCKET) { |
| 1605 return; |
| 1606 } |
| 1607 |
| 1608 struct epoll_event event = {0}; |
| 1609 event.events = GetEpollEvents(pdispatcher->GetRequestedEvents()); |
| 1610 event.data.ptr = pdispatcher; |
| 1611 int err = epoll_ctl(epoll_fd_, EPOLL_CTL_MOD, fd, &event); |
| 1612 RTC_DCHECK_EQ(err, 0); |
| 1613 if (err == -1) { |
| 1614 LOG_E(LS_ERROR, EN, errno) << "epoll_ctl EPOLL_CTL_MOD"; |
| 1615 } |
| 1616 } |
| 1617 |
| 1618 bool PhysicalSocketServer::WaitEpoll(int cmsWait) { |
| 1619 RTC_DCHECK(epoll_fd_ != INVALID_SOCKET); |
| 1620 int64_t tvWait = -1; |
| 1621 int64_t tvStop = -1; |
| 1622 if (cmsWait != kForever) { |
| 1623 tvWait = cmsWait; |
| 1624 tvStop = TimeAfter(cmsWait); |
| 1625 } |
| 1626 |
| 1627 if (epoll_events_.empty()) { |
| 1628 // The initial space to receive events is created only if epoll is used. |
| 1629 epoll_events_.resize(kInitialEpollEvents); |
| 1630 } |
| 1631 |
| 1632 fWait_ = true; |
| 1633 |
| 1634 while (fWait_) { |
| 1635 // Wait then call handlers as appropriate |
| 1636 // < 0 means error |
| 1637 // 0 means timeout |
| 1638 // > 0 means count of descriptors ready |
| 1639 int n = epoll_wait(epoll_fd_, &epoll_events_[0], |
| 1640 static_cast<int>(epoll_events_.size()), |
| 1641 static_cast<int>(tvWait)); |
| 1642 if (n < 0) { |
| 1643 if (errno != EINTR) { |
| 1644 LOG_E(LS_ERROR, EN, errno) << "epoll"; |
| 1645 return false; |
| 1646 } |
| 1647 // Else ignore the error and keep going. If this EINTR was for one of the |
| 1648 // signals managed by this PhysicalSocketServer, the |
| 1649 // PosixSignalDeliveryDispatcher will be in the signaled state in the next |
| 1650 // iteration. |
| 1651 } else if (n == 0) { |
| 1652 // If timeout, return success |
| 1653 return true; |
| 1654 } else { |
| 1655 // We have signaled descriptors |
| 1656 CritScope cr(&crit_); |
| 1657 for (int i = 0; i < n; ++i) { |
| 1658 const epoll_event& event = epoll_events_[i]; |
| 1659 Dispatcher* pdispatcher = static_cast<Dispatcher*>(event.data.ptr); |
| 1660 if (dispatchers_.find(pdispatcher) == dispatchers_.end()) { |
| 1661 // The dispatcher for this socket no longer exists. |
| 1662 continue; |
| 1663 } |
| 1664 |
| 1665 bool readable = (event.events & (EPOLLIN | EPOLLPRI)); |
| 1666 bool writable = (event.events & EPOLLOUT); |
| 1667 bool check_error = (event.events & (EPOLLRDHUP | EPOLLERR | EPOLLHUP)); |
| 1668 |
| 1669 ProcessEvents(pdispatcher, readable, writable, check_error); |
| 1670 } |
| 1671 } |
| 1672 |
| 1673 if (static_cast<size_t>(n) == epoll_events_.size() && |
| 1674 epoll_events_.size() < kMaxEpollEvents) { |
| 1675 // We used the complete space to receive events, increase size for future |
| 1676 // iterations. |
| 1677 epoll_events_.resize(std::max(epoll_events_.size() * 2, kMaxEpollEvents)); |
| 1678 } |
| 1679 |
| 1680 if (cmsWait != kForever) { |
| 1681 tvWait = TimeDiff(tvStop, TimeMillis()); |
| 1682 if (tvWait < 0) { |
| 1683 // Return success on timeout. |
| 1684 return true; |
| 1685 } |
| 1686 } |
| 1687 } |
| 1688 |
| 1689 return true; |
| 1690 } |
| 1691 |
| 1692 bool PhysicalSocketServer::WaitPoll(int cmsWait, Dispatcher* dispatcher) { |
| 1693 RTC_DCHECK(dispatcher); |
| 1694 int64_t tvWait = -1; |
| 1695 int64_t tvStop = -1; |
| 1696 if (cmsWait != kForever) { |
| 1697 tvWait = cmsWait; |
| 1698 tvStop = TimeAfter(cmsWait); |
| 1699 } |
| 1700 |
| 1701 fWait_ = true; |
| 1702 |
| 1703 struct pollfd fds = {0}; |
| 1704 int fd = dispatcher->GetDescriptor(); |
| 1705 fds.fd = fd; |
| 1706 |
| 1707 while (fWait_) { |
| 1708 uint32_t ff = dispatcher->GetRequestedEvents(); |
| 1709 fds.events = 0; |
| 1710 if (ff & (DE_READ | DE_ACCEPT)) { |
| 1711 fds.events |= POLLIN; |
| 1712 } |
| 1713 if (ff & (DE_WRITE | DE_CONNECT)) { |
| 1714 fds.events |= POLLOUT; |
| 1715 } |
| 1716 fds.revents = 0; |
| 1717 |
| 1718 // Wait then call handlers as appropriate |
| 1719 // < 0 means error |
| 1720 // 0 means timeout |
| 1721 // > 0 means count of descriptors ready |
| 1722 int n = poll(&fds, 1, static_cast<int>(tvWait)); |
| 1723 if (n < 0) { |
| 1724 if (errno != EINTR) { |
| 1725 LOG_E(LS_ERROR, EN, errno) << "poll"; |
| 1726 return false; |
| 1727 } |
| 1728 // Else ignore the error and keep going. If this EINTR was for one of the |
| 1729 // signals managed by this PhysicalSocketServer, the |
| 1730 // PosixSignalDeliveryDispatcher will be in the signaled state in the next |
| 1731 // iteration. |
| 1732 } else if (n == 0) { |
| 1733 // If timeout, return success |
| 1734 return true; |
| 1735 } else { |
| 1736 // We have signaled descriptors (should only be the passed dispatcher). |
| 1737 RTC_DCHECK_EQ(n, 1); |
| 1738 RTC_DCHECK_EQ(fds.fd, fd); |
| 1739 |
| 1740 bool readable = (fds.revents & (POLLIN | POLLPRI)); |
| 1741 bool writable = (fds.revents & POLLOUT); |
| 1742 bool check_error = (fds.revents & (POLLRDHUP | POLLERR | POLLHUP)); |
| 1743 |
| 1744 ProcessEvents(dispatcher, readable, writable, check_error); |
| 1745 } |
| 1746 |
| 1747 if (cmsWait != kForever) { |
| 1748 tvWait = TimeDiff(tvStop, TimeMillis()); |
| 1749 if (tvWait < 0) { |
| 1750 // Return success on timeout. |
| 1751 return true; |
| 1752 } |
| 1753 } |
| 1754 } |
| 1755 |
| 1756 return true; |
| 1757 } |
| 1758 |
| 1759 #endif // WEBRTC_USE_EPOLL |
| 1760 |
1375 static void GlobalSignalHandler(int signum) { | 1761 static void GlobalSignalHandler(int signum) { |
1376 PosixSignalHandler::Instance()->OnPosixSignalReceived(signum); | 1762 PosixSignalHandler::Instance()->OnPosixSignalReceived(signum); |
1377 } | 1763 } |
1378 | 1764 |
1379 bool PhysicalSocketServer::SetPosixSignalHandler(int signum, | 1765 bool PhysicalSocketServer::SetPosixSignalHandler(int signum, |
1380 void (*handler)(int)) { | 1766 void (*handler)(int)) { |
1381 // If handler is SIG_IGN or SIG_DFL then clear our user-level handler, | 1767 // If handler is SIG_IGN or SIG_DFL then clear our user-level handler, |
1382 // otherwise set one. | 1768 // otherwise set one. |
1383 if (handler == SIG_IGN || handler == SIG_DFL) { | 1769 if (handler == SIG_IGN || handler == SIG_DFL) { |
1384 if (!InstallSignal(signum, handler)) { | 1770 if (!InstallSignal(signum, handler)) { |
(...skipping 53 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
1438 | 1824 |
1439 fWait_ = true; | 1825 fWait_ = true; |
1440 while (fWait_) { | 1826 while (fWait_) { |
1441 std::vector<WSAEVENT> events; | 1827 std::vector<WSAEVENT> events; |
1442 std::vector<Dispatcher *> event_owners; | 1828 std::vector<Dispatcher *> event_owners; |
1443 | 1829 |
1444 events.push_back(socket_ev_); | 1830 events.push_back(socket_ev_); |
1445 | 1831 |
1446 { | 1832 { |
1447 CritScope cr(&crit_); | 1833 CritScope cr(&crit_); |
1448 size_t i = 0; | 1834 // TODO(jbauch): Support re-entrant waiting. |
1449 iterators_.push_back(&i); | 1835 RTC_DCHECK(!processing_dispatchers_); |
1450 // Don't track dispatchers_.size(), because we want to pick up any new | 1836 |
1451 // dispatchers that were added while processing the loop. | 1837 // Calling "CheckSignalClose" might remove a closed dispatcher from the |
1452 while (i < dispatchers_.size()) { | 1838 // set. This must be deferred to prevent invalidating the iterator. |
1453 Dispatcher* disp = dispatchers_[i++]; | 1839 processing_dispatchers_ = true; |
| 1840 for (Dispatcher* disp : dispatchers_) { |
1454 if (!process_io && (disp != signal_wakeup_)) | 1841 if (!process_io && (disp != signal_wakeup_)) |
1455 continue; | 1842 continue; |
1456 SOCKET s = disp->GetSocket(); | 1843 SOCKET s = disp->GetSocket(); |
1457 if (disp->CheckSignalClose()) { | 1844 if (disp->CheckSignalClose()) { |
1458 // We just signalled close, don't poll this socket | 1845 // We just signalled close, don't poll this socket |
1459 } else if (s != INVALID_SOCKET) { | 1846 } else if (s != INVALID_SOCKET) { |
1460 WSAEventSelect(s, | 1847 WSAEventSelect(s, |
1461 events[0], | 1848 events[0], |
1462 FlagsToEvents(disp->GetRequestedEvents())); | 1849 FlagsToEvents(disp->GetRequestedEvents())); |
1463 } else { | 1850 } else { |
1464 events.push_back(disp->GetWSAEvent()); | 1851 events.push_back(disp->GetWSAEvent()); |
1465 event_owners.push_back(disp); | 1852 event_owners.push_back(disp); |
1466 } | 1853 } |
1467 } | 1854 } |
1468 RTC_DCHECK(iterators_.back() == &i); | 1855 |
1469 iterators_.pop_back(); | 1856 processing_dispatchers_ = false; |
| 1857 // Process deferred dispatchers that have been added/removed while the |
| 1858 // events were handled above. |
| 1859 AddRemovePendingDispatchers(); |
1470 } | 1860 } |
1471 | 1861 |
1472 // Which is shorter, the delay wait or the asked wait? | 1862 // Which is shorter, the delay wait or the asked wait? |
1473 | 1863 |
1474 int64_t cmsNext; | 1864 int64_t cmsNext; |
1475 if (cmsWait == kForever) { | 1865 if (cmsWait == kForever) { |
1476 cmsNext = cmsWait; | 1866 cmsNext = cmsWait; |
1477 } else { | 1867 } else { |
1478 cmsNext = std::max<int64_t>(0, cmsTotal - cmsElapsed); | 1868 cmsNext = std::max<int64_t>(0, cmsTotal - cmsElapsed); |
1479 } | 1869 } |
(...skipping 13 matching lines...) Expand all Loading... |
1493 return false; | 1883 return false; |
1494 } else if (dw == WSA_WAIT_TIMEOUT) { | 1884 } else if (dw == WSA_WAIT_TIMEOUT) { |
1495 // Timeout? | 1885 // Timeout? |
1496 return true; | 1886 return true; |
1497 } else { | 1887 } else { |
1498 // Figure out which one it is and call it | 1888 // Figure out which one it is and call it |
1499 CritScope cr(&crit_); | 1889 CritScope cr(&crit_); |
1500 int index = dw - WSA_WAIT_EVENT_0; | 1890 int index = dw - WSA_WAIT_EVENT_0; |
1501 if (index > 0) { | 1891 if (index > 0) { |
1502 --index; // The first event is the socket event | 1892 --index; // The first event is the socket event |
1503 event_owners[index]->OnPreEvent(0); | 1893 Dispatcher* disp = event_owners[index]; |
1504 event_owners[index]->OnEvent(0, 0); | 1894 // The dispatcher could have been removed while waiting for events. |
| 1895 if (dispatchers_.find(disp) != dispatchers_.end()) { |
| 1896 disp->OnPreEvent(0); |
| 1897 disp->OnEvent(0, 0); |
| 1898 } |
1505 } else if (process_io) { | 1899 } else if (process_io) { |
1506 size_t i = 0, end = dispatchers_.size(); | 1900 processing_dispatchers_ = true; |
1507 iterators_.push_back(&i); | 1901 for (Dispatcher* disp : dispatchers_) { |
1508 iterators_.push_back(&end); // Don't iterate over new dispatchers. | |
1509 while (i < end) { | |
1510 Dispatcher* disp = dispatchers_[i++]; | |
1511 SOCKET s = disp->GetSocket(); | 1902 SOCKET s = disp->GetSocket(); |
1512 if (s == INVALID_SOCKET) | 1903 if (s == INVALID_SOCKET) |
1513 continue; | 1904 continue; |
1514 | 1905 |
1515 WSANETWORKEVENTS wsaEvents; | 1906 WSANETWORKEVENTS wsaEvents; |
1516 int err = WSAEnumNetworkEvents(s, events[0], &wsaEvents); | 1907 int err = WSAEnumNetworkEvents(s, events[0], &wsaEvents); |
1517 if (err == 0) { | 1908 if (err == 0) { |
1518 { | 1909 { |
1519 if ((wsaEvents.lNetworkEvents & FD_READ) && | 1910 if ((wsaEvents.lNetworkEvents & FD_READ) && |
1520 wsaEvents.iErrorCode[FD_READ_BIT] != 0) { | 1911 wsaEvents.iErrorCode[FD_READ_BIT] != 0) { |
(...skipping 40 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
1561 if (wsaEvents.lNetworkEvents & FD_CLOSE) { | 1952 if (wsaEvents.lNetworkEvents & FD_CLOSE) { |
1562 ff |= DE_CLOSE; | 1953 ff |= DE_CLOSE; |
1563 errcode = wsaEvents.iErrorCode[FD_CLOSE_BIT]; | 1954 errcode = wsaEvents.iErrorCode[FD_CLOSE_BIT]; |
1564 } | 1955 } |
1565 if (ff != 0) { | 1956 if (ff != 0) { |
1566 disp->OnPreEvent(ff); | 1957 disp->OnPreEvent(ff); |
1567 disp->OnEvent(ff, errcode); | 1958 disp->OnEvent(ff, errcode); |
1568 } | 1959 } |
1569 } | 1960 } |
1570 } | 1961 } |
1571 RTC_DCHECK(iterators_.back() == &end); | 1962 |
1572 iterators_.pop_back(); | 1963 processing_dispatchers_ = false; |
1573 RTC_DCHECK(iterators_.back() == &i); | 1964 // Process deferred dispatchers that have been added/removed while the |
1574 iterators_.pop_back(); | 1965 // events were handled above. |
| 1966 AddRemovePendingDispatchers(); |
1575 } | 1967 } |
1576 | 1968 |
1577 // Reset the network event until new activity occurs | 1969 // Reset the network event until new activity occurs |
1578 WSAResetEvent(socket_ev_); | 1970 WSAResetEvent(socket_ev_); |
1579 } | 1971 } |
1580 | 1972 |
1581 // Break? | 1973 // Break? |
1582 if (!fWait_) | 1974 if (!fWait_) |
1583 break; | 1975 break; |
1584 cmsElapsed = TimeSince(msStart); | 1976 cmsElapsed = TimeSince(msStart); |
1585 if ((cmsWait != kForever) && (cmsElapsed >= cmsWait)) { | 1977 if ((cmsWait != kForever) && (cmsElapsed >= cmsWait)) { |
1586 break; | 1978 break; |
1587 } | 1979 } |
1588 } | 1980 } |
1589 | 1981 |
1590 // Done | 1982 // Done |
1591 return true; | 1983 return true; |
1592 } | 1984 } |
1593 #endif // WEBRTC_WIN | 1985 #endif // WEBRTC_WIN |
1594 | 1986 |
1595 } // namespace rtc | 1987 } // namespace rtc |
OLD | NEW |