Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(78)

Side by Side Diff: webrtc/base/physicalsocketserver.cc

Issue 2880923002: Support epoll in PhysicalSocketServer. (Closed)
Patch Set: Win: Prevent updates to dispatcher in loop before waiting. Created 3 years, 6 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « webrtc/base/physicalsocketserver.h ('k') | no next file » | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 /* 1 /*
2 * Copyright 2004 The WebRTC Project Authors. All rights reserved. 2 * Copyright 2004 The WebRTC Project Authors. All rights reserved.
3 * 3 *
4 * Use of this source code is governed by a BSD-style license 4 * Use of this source code is governed by a BSD-style license
5 * that can be found in the LICENSE file in the root of the source 5 * that can be found in the LICENSE file in the root of the source
6 * tree. An additional intellectual property rights grant can be found 6 * tree. An additional intellectual property rights grant can be found
7 * in the file PATENTS. All contributing project authors may 7 * in the file PATENTS. All contributing project authors may
8 * be found in the AUTHORS file in the root of the source tree. 8 * be found in the AUTHORS file in the root of the source tree.
9 */ 9 */
10 #include "webrtc/base/physicalsocketserver.h" 10 #include "webrtc/base/physicalsocketserver.h"
11 11
12 #if defined(_MSC_VER) && _MSC_VER < 1300 12 #if defined(_MSC_VER) && _MSC_VER < 1300
13 #pragma warning(disable:4786) 13 #pragma warning(disable:4786)
14 #endif 14 #endif
15 15
16 #ifdef MEMORY_SANITIZER 16 #ifdef MEMORY_SANITIZER
17 #include <sanitizer/msan_interface.h> 17 #include <sanitizer/msan_interface.h>
18 #endif 18 #endif
19 19
20 #if defined(WEBRTC_POSIX) 20 #if defined(WEBRTC_POSIX)
21 #include <string.h> 21 #include <string.h>
22 #include <errno.h> 22 #include <errno.h>
23 #include <fcntl.h> 23 #include <fcntl.h>
24 #if defined(WEBRTC_USE_EPOLL)
25 // "poll" will be used to wait for the signal dispatcher.
26 #include <poll.h>
27 #endif
24 #include <sys/ioctl.h> 28 #include <sys/ioctl.h>
25 #include <sys/time.h> 29 #include <sys/time.h>
26 #include <sys/select.h> 30 #include <sys/select.h>
27 #include <unistd.h> 31 #include <unistd.h>
28 #include <signal.h> 32 #include <signal.h>
29 #endif 33 #endif
30 34
31 #if defined(WEBRTC_WIN) 35 #if defined(WEBRTC_WIN)
32 #define WIN32_LEAN_AND_MEAN 36 #define WIN32_LEAN_AND_MEAN
33 #include <windows.h> 37 #include <windows.h>
(...skipping 39 matching lines...) Expand 10 before | Expand all | Expand 10 after
73 77
74 int64_t GetSocketRecvTimestamp(int socket) { 78 int64_t GetSocketRecvTimestamp(int socket) {
75 return -1; 79 return -1;
76 } 80 }
77 #endif 81 #endif
78 82
79 #if defined(WEBRTC_WIN) 83 #if defined(WEBRTC_WIN)
80 typedef char* SockOptArg; 84 typedef char* SockOptArg;
81 #endif 85 #endif
82 86
87 #if defined(WEBRTC_USE_EPOLL)
88 // POLLRDHUP / EPOLLRDHUP are only defined starting with Linux 2.6.17.
89 #if !defined(POLLRDHUP)
90 #define POLLRDHUP 0x2000
91 #endif
92 #if !defined(EPOLLRDHUP)
93 #define EPOLLRDHUP 0x2000
94 #endif
95 #endif
96
83 namespace rtc { 97 namespace rtc {
84 98
85 std::unique_ptr<SocketServer> SocketServer::CreateDefault() { 99 std::unique_ptr<SocketServer> SocketServer::CreateDefault() {
86 #if defined(__native_client__) 100 #if defined(__native_client__)
87 return std::unique_ptr<SocketServer>(new rtc::NullSocketServer); 101 return std::unique_ptr<SocketServer>(new rtc::NullSocketServer);
88 #else 102 #else
89 return std::unique_ptr<SocketServer>(new rtc::PhysicalSocketServer); 103 return std::unique_ptr<SocketServer>(new rtc::PhysicalSocketServer);
90 #endif 104 #endif
91 } 105 }
92 106
(...skipping 665 matching lines...) Expand 10 before | Expand all | Expand 10 after
758 } 772 }
759 if (((ff & DE_CLOSE) != 0) && (id_ == cache_id)) { 773 if (((ff & DE_CLOSE) != 0) && (id_ == cache_id)) {
760 signal_close_ = true; 774 signal_close_ = true;
761 signal_err_ = err; 775 signal_err_ = err;
762 } 776 }
763 } 777 }
764 778
765 #elif defined(WEBRTC_POSIX) 779 #elif defined(WEBRTC_POSIX)
766 780
767 void SocketDispatcher::OnEvent(uint32_t ff, int err) { 781 void SocketDispatcher::OnEvent(uint32_t ff, int err) {
782 #if defined(WEBRTC_USE_EPOLL)
783 // Remember currently enabled events so we can combine multiple changes
784 // into one update call later.
785 // The signal handlers might re-enable events disabled here, so we can't
786 // keep a list of events to disable at the end of the method. This list
787 // would not be updated with the events enabled by the signal handlers.
788 StartBatchedEventUpdates();
789 #endif
768 // Make sure we deliver connect/accept first. Otherwise, consumers may see 790 // Make sure we deliver connect/accept first. Otherwise, consumers may see
769 // something like a READ followed by a CONNECT, which would be odd. 791 // something like a READ followed by a CONNECT, which would be odd.
770 if ((ff & DE_CONNECT) != 0) { 792 if ((ff & DE_CONNECT) != 0) {
771 DisableEvents(DE_CONNECT); 793 DisableEvents(DE_CONNECT);
772 SignalConnectEvent(this); 794 SignalConnectEvent(this);
773 } 795 }
774 if ((ff & DE_ACCEPT) != 0) { 796 if ((ff & DE_ACCEPT) != 0) {
775 DisableEvents(DE_ACCEPT); 797 DisableEvents(DE_ACCEPT);
776 SignalReadEvent(this); 798 SignalReadEvent(this);
777 } 799 }
778 if ((ff & DE_READ) != 0) { 800 if ((ff & DE_READ) != 0) {
779 DisableEvents(DE_READ); 801 DisableEvents(DE_READ);
780 SignalReadEvent(this); 802 SignalReadEvent(this);
781 } 803 }
782 if ((ff & DE_WRITE) != 0) { 804 if ((ff & DE_WRITE) != 0) {
783 DisableEvents(DE_WRITE); 805 DisableEvents(DE_WRITE);
784 SignalWriteEvent(this); 806 SignalWriteEvent(this);
785 } 807 }
786 if ((ff & DE_CLOSE) != 0) { 808 if ((ff & DE_CLOSE) != 0) {
787 // The socket is now dead to us, so stop checking it. 809 // The socket is now dead to us, so stop checking it.
788 SetEnabledEvents(0); 810 SetEnabledEvents(0);
789 SignalCloseEvent(this, err); 811 SignalCloseEvent(this, err);
790 } 812 }
813 #if defined(WEBRTC_USE_EPOLL)
814 FinishBatchedEventUpdates();
815 #endif
791 } 816 }
792 817
793 #endif // WEBRTC_POSIX 818 #endif // WEBRTC_POSIX
794 819
820 #if defined(WEBRTC_USE_EPOLL)
821
822 static int GetEpollEvents(uint32_t ff) {
823 int events = 0;
824 if (ff & (DE_READ | DE_ACCEPT)) {
825 events |= EPOLLIN;
826 }
827 if (ff & (DE_WRITE | DE_CONNECT)) {
828 events |= EPOLLOUT;
829 }
830 return events;
831 }
832
833 void SocketDispatcher::StartBatchedEventUpdates() {
834 RTC_DCHECK_EQ(saved_enabled_events_, -1);
835 saved_enabled_events_ = enabled_events();
836 }
837
838 void SocketDispatcher::FinishBatchedEventUpdates() {
839 RTC_DCHECK_NE(saved_enabled_events_, -1);
840 uint8_t old_events = static_cast<uint8_t>(saved_enabled_events_);
841 saved_enabled_events_ = -1;
842 MaybeUpdateDispatcher(old_events);
843 }
844
845 void SocketDispatcher::MaybeUpdateDispatcher(uint8_t old_events) {
846 if (GetEpollEvents(enabled_events()) != GetEpollEvents(old_events) &&
847 saved_enabled_events_ == -1) {
848 ss_->Update(this);
849 }
850 }
851
852 void SocketDispatcher::SetEnabledEvents(uint8_t events) {
853 uint8_t old_events = enabled_events();
854 PhysicalSocket::SetEnabledEvents(events);
855 MaybeUpdateDispatcher(old_events);
856 }
857
858 void SocketDispatcher::EnableEvents(uint8_t events) {
859 uint8_t old_events = enabled_events();
860 PhysicalSocket::EnableEvents(events);
861 MaybeUpdateDispatcher(old_events);
862 }
863
864 void SocketDispatcher::DisableEvents(uint8_t events) {
865 uint8_t old_events = enabled_events();
866 PhysicalSocket::DisableEvents(events);
867 MaybeUpdateDispatcher(old_events);
868 }
869
870 #endif // WEBRTC_USE_EPOLL
871
795 int SocketDispatcher::Close() { 872 int SocketDispatcher::Close() {
796 if (s_ == INVALID_SOCKET) 873 if (s_ == INVALID_SOCKET)
797 return 0; 874 return 0;
798 875
799 #if defined(WEBRTC_WIN) 876 #if defined(WEBRTC_WIN)
800 id_ = 0; 877 id_ = 0;
801 signal_close_ = false; 878 signal_close_ = false;
802 #endif 879 #endif
803 ss_->Remove(this); 880 ss_->Remove(this);
804 return PhysicalSocket::Close(); 881 return PhysicalSocket::Close();
(...skipping 308 matching lines...) Expand 10 before | Expand all | Expand 10 after
1113 if (pf_) 1190 if (pf_)
1114 *pf_ = false; 1191 *pf_ = false;
1115 } 1192 }
1116 1193
1117 private: 1194 private:
1118 bool *pf_; 1195 bool *pf_;
1119 }; 1196 };
1120 1197
1121 PhysicalSocketServer::PhysicalSocketServer() 1198 PhysicalSocketServer::PhysicalSocketServer()
1122 : fWait_(false) { 1199 : fWait_(false) {
1200 #if defined(WEBRTC_USE_EPOLL)
1201 // Since Linux 2.6.8, the size argument is ignored, but must be greater than
1202 // zero. Before that the size served as hint to the kernel for the amount of
1203 // space to initially allocate in internal data structures.
1204 epoll_fd_ = epoll_create(FD_SETSIZE);
1205 if (epoll_fd_ == -1) {
1206 // Not an error, will fall back to "select" below.
1207 LOG_E(LS_WARNING, EN, errno) << "epoll_create";
1208 epoll_fd_ = INVALID_SOCKET;
1209 }
1210 #endif
1123 signal_wakeup_ = new Signaler(this, &fWait_); 1211 signal_wakeup_ = new Signaler(this, &fWait_);
1124 #if defined(WEBRTC_WIN) 1212 #if defined(WEBRTC_WIN)
1125 socket_ev_ = WSACreateEvent(); 1213 socket_ev_ = WSACreateEvent();
1126 #endif 1214 #endif
1127 } 1215 }
1128 1216
1129 PhysicalSocketServer::~PhysicalSocketServer() { 1217 PhysicalSocketServer::~PhysicalSocketServer() {
1130 #if defined(WEBRTC_WIN) 1218 #if defined(WEBRTC_WIN)
1131 WSACloseEvent(socket_ev_); 1219 WSACloseEvent(socket_ev_);
1132 #endif 1220 #endif
1133 #if defined(WEBRTC_POSIX) 1221 #if defined(WEBRTC_POSIX)
1134 signal_dispatcher_.reset(); 1222 signal_dispatcher_.reset();
1135 #endif 1223 #endif
1136 delete signal_wakeup_; 1224 delete signal_wakeup_;
1225 #if defined(WEBRTC_USE_EPOLL)
1226 if (epoll_fd_ != INVALID_SOCKET) {
1227 close(epoll_fd_);
1228 }
1229 #endif
1137 RTC_DCHECK(dispatchers_.empty()); 1230 RTC_DCHECK(dispatchers_.empty());
1138 } 1231 }
1139 1232
1140 void PhysicalSocketServer::WakeUp() { 1233 void PhysicalSocketServer::WakeUp() {
1141 signal_wakeup_->Signal(); 1234 signal_wakeup_->Signal();
1142 } 1235 }
1143 1236
1144 Socket* PhysicalSocketServer::CreateSocket(int type) { 1237 Socket* PhysicalSocketServer::CreateSocket(int type) {
1145 return CreateSocket(AF_INET, type); 1238 return CreateSocket(AF_INET, type);
1146 } 1239 }
(...skipping 27 matching lines...) Expand all
1174 if (dispatcher->Initialize()) { 1267 if (dispatcher->Initialize()) {
1175 return dispatcher; 1268 return dispatcher;
1176 } else { 1269 } else {
1177 delete dispatcher; 1270 delete dispatcher;
1178 return nullptr; 1271 return nullptr;
1179 } 1272 }
1180 } 1273 }
1181 1274
1182 void PhysicalSocketServer::Add(Dispatcher *pdispatcher) { 1275 void PhysicalSocketServer::Add(Dispatcher *pdispatcher) {
1183 CritScope cs(&crit_); 1276 CritScope cs(&crit_);
1184 // Prevent duplicates. This can cause dead dispatchers to stick around. 1277 if (processing_dispatchers_) {
1185 DispatcherList::iterator pos = std::find(dispatchers_.begin(), 1278 // A dispatcher is being added while a "Wait" call is processing the
1186 dispatchers_.end(), 1279 // list of socket events.
1187 pdispatcher); 1280 // Defer adding to "dispatchers_" set until processing is done to avoid
1188 if (pos != dispatchers_.end()) 1281 // invalidating the iterator in "Wait".
1189 return; 1282 pending_remove_dispatchers_.erase(pdispatcher);
1190 dispatchers_.push_back(pdispatcher); 1283 pending_add_dispatchers_.insert(pdispatcher);
1284 } else {
1285 dispatchers_.insert(pdispatcher);
1286 }
1287 #if defined(WEBRTC_USE_EPOLL)
1288 if (epoll_fd_ != INVALID_SOCKET) {
1289 AddEpoll(pdispatcher);
1290 }
1291 #endif // WEBRTC_USE_EPOLL
1191 } 1292 }
1192 1293
1193 void PhysicalSocketServer::Remove(Dispatcher *pdispatcher) { 1294 void PhysicalSocketServer::Remove(Dispatcher *pdispatcher) {
1194 CritScope cs(&crit_); 1295 CritScope cs(&crit_);
1195 DispatcherList::iterator pos = std::find(dispatchers_.begin(), 1296 if (processing_dispatchers_) {
1196 dispatchers_.end(), 1297 // A dispatcher is being removed while a "Wait" call is processing the
1197 pdispatcher); 1298 // list of socket events.
1198 // We silently ignore duplicate calls to Add, so we should silently ignore 1299 // Defer removal from "dispatchers_" set until processing is done to avoid
1199 // the (expected) symmetric calls to Remove. Note that this may still hide 1300 // invalidating the iterator in "Wait".
1200 // a real issue, so we at least log a warning about it. 1301 if (!pending_add_dispatchers_.erase(pdispatcher) &&
1201 if (pos == dispatchers_.end()) { 1302 dispatchers_.find(pdispatcher) == dispatchers_.end()) {
1303 LOG(LS_WARNING) << "PhysicalSocketServer asked to remove a unknown "
1304 << "dispatcher, potentially from a duplicate call to "
1305 << "Add.";
1306 return;
1307 }
1308
1309 pending_remove_dispatchers_.insert(pdispatcher);
1310 } else if (!dispatchers_.erase(pdispatcher)) {
1202 LOG(LS_WARNING) << "PhysicalSocketServer asked to remove a unknown " 1311 LOG(LS_WARNING) << "PhysicalSocketServer asked to remove a unknown "
1203 << "dispatcher, potentially from a duplicate call to Add."; 1312 << "dispatcher, potentially from a duplicate call to Add.";
1204 return; 1313 return;
1205 } 1314 }
1206 size_t index = pos - dispatchers_.begin(); 1315 #if defined(WEBRTC_USE_EPOLL)
1207 dispatchers_.erase(pos); 1316 if (epoll_fd_ != INVALID_SOCKET) {
1208 for (IteratorList::iterator it = iterators_.begin(); it != iterators_.end(); 1317 RemoveEpoll(pdispatcher);
1209 ++it) { 1318 }
1210 if (index < **it) { 1319 #endif // WEBRTC_USE_EPOLL
1211 --**it; 1320 }
1321
1322 void PhysicalSocketServer::Update(Dispatcher* pdispatcher) {
1323 #if defined(WEBRTC_USE_EPOLL)
1324 if (epoll_fd_ == INVALID_SOCKET) {
1325 return;
1326 }
1327
1328 CritScope cs(&crit_);
1329 if (dispatchers_.find(pdispatcher) == dispatchers_.end()) {
1330 return;
1331 }
1332
1333 UpdateEpoll(pdispatcher);
1334 #endif
1335 }
1336
1337 void PhysicalSocketServer::AddRemovePendingDispatchers() {
1338 if (!pending_add_dispatchers_.empty()) {
1339 for (Dispatcher* pdispatcher : pending_add_dispatchers_) {
1340 dispatchers_.insert(pdispatcher);
1212 } 1341 }
1342 pending_add_dispatchers_.clear();
1343 }
1344
1345 if (!pending_remove_dispatchers_.empty()) {
1346 for (Dispatcher* pdispatcher : pending_remove_dispatchers_) {
1347 dispatchers_.erase(pdispatcher);
1348 }
1349 pending_remove_dispatchers_.clear();
1213 } 1350 }
1214 } 1351 }
1215 1352
1216 #if defined(WEBRTC_POSIX) 1353 #if defined(WEBRTC_POSIX)
1354
1217 bool PhysicalSocketServer::Wait(int cmsWait, bool process_io) { 1355 bool PhysicalSocketServer::Wait(int cmsWait, bool process_io) {
1356 #if defined(WEBRTC_USE_EPOLL)
1357 // We don't keep a dedicated "epoll" descriptor containing only the non-IO
1358 // (i.e. signaling) dispatcher, so "poll" will be used instead of the default
1359 // "select" to support sockets larger than FD_SETSIZE.
1360 if (!process_io) {
1361 return WaitPoll(cmsWait, signal_wakeup_);
1362 } else if (epoll_fd_ != INVALID_SOCKET) {
1363 return WaitEpoll(cmsWait);
1364 }
1365 #endif
1366 return WaitSelect(cmsWait, process_io);
1367 }
1368
1369 static void ProcessEvents(Dispatcher* dispatcher,
1370 bool readable,
1371 bool writable,
1372 bool check_error) {
1373 int errcode = 0;
1374 // TODO(pthatcher): Should we set errcode if getsockopt fails?
1375 if (check_error) {
1376 socklen_t len = sizeof(errcode);
1377 ::getsockopt(dispatcher->GetDescriptor(), SOL_SOCKET, SO_ERROR, &errcode,
1378 &len);
1379 }
1380
1381 uint32_t ff = 0;
1382
1383 // Check readable descriptors. If we're waiting on an accept, signal
1384 // that. Otherwise we're waiting for data, check to see if we're
1385 // readable or really closed.
1386 // TODO(pthatcher): Only peek at TCP descriptors.
1387 if (readable) {
1388 if (dispatcher->GetRequestedEvents() & DE_ACCEPT) {
1389 ff |= DE_ACCEPT;
1390 } else if (errcode || dispatcher->IsDescriptorClosed()) {
1391 ff |= DE_CLOSE;
1392 } else {
1393 ff |= DE_READ;
1394 }
1395 }
1396
1397 // Check writable descriptors. If we're waiting on a connect, detect
1398 // success versus failure by the reaped error code.
1399 if (writable) {
1400 if (dispatcher->GetRequestedEvents() & DE_CONNECT) {
1401 if (!errcode) {
1402 ff |= DE_CONNECT;
1403 } else {
1404 ff |= DE_CLOSE;
1405 }
1406 } else {
1407 ff |= DE_WRITE;
1408 }
1409 }
1410
1411 // Tell the descriptor about the event.
1412 if (ff != 0) {
1413 dispatcher->OnPreEvent(ff);
1414 dispatcher->OnEvent(ff, errcode);
1415 }
1416 }
1417
1418 bool PhysicalSocketServer::WaitSelect(int cmsWait, bool process_io) {
1218 // Calculate timing information 1419 // Calculate timing information
1219 1420
1220 struct timeval* ptvWait = nullptr; 1421 struct timeval* ptvWait = nullptr;
1221 struct timeval tvWait; 1422 struct timeval tvWait;
1222 struct timeval tvStop; 1423 struct timeval tvStop;
1223 if (cmsWait != kForever) { 1424 if (cmsWait != kForever) {
1224 // Calculate wait timeval 1425 // Calculate wait timeval
1225 tvWait.tv_sec = cmsWait / 1000; 1426 tvWait.tv_sec = cmsWait / 1000;
1226 tvWait.tv_usec = (cmsWait % 1000) * 1000; 1427 tvWait.tv_usec = (cmsWait % 1000) * 1000;
1227 ptvWait = &tvWait; 1428 ptvWait = &tvWait;
(...skipping 22 matching lines...) Expand all
1250 __msan_unpoison(&fdsRead, sizeof(fdsRead)); 1451 __msan_unpoison(&fdsRead, sizeof(fdsRead));
1251 __msan_unpoison(&fdsWrite, sizeof(fdsWrite)); 1452 __msan_unpoison(&fdsWrite, sizeof(fdsWrite));
1252 #endif 1453 #endif
1253 1454
1254 fWait_ = true; 1455 fWait_ = true;
1255 1456
1256 while (fWait_) { 1457 while (fWait_) {
1257 int fdmax = -1; 1458 int fdmax = -1;
1258 { 1459 {
1259 CritScope cr(&crit_); 1460 CritScope cr(&crit_);
1260 for (size_t i = 0; i < dispatchers_.size(); ++i) { 1461 // TODO(jbauch): Support re-entrant waiting.
1462 RTC_DCHECK(!processing_dispatchers_);
1463 for (Dispatcher* pdispatcher : dispatchers_) {
1261 // Query dispatchers for read and write wait state 1464 // Query dispatchers for read and write wait state
1262 Dispatcher *pdispatcher = dispatchers_[i];
1263 RTC_DCHECK(pdispatcher); 1465 RTC_DCHECK(pdispatcher);
1264 if (!process_io && (pdispatcher != signal_wakeup_)) 1466 if (!process_io && (pdispatcher != signal_wakeup_))
1265 continue; 1467 continue;
1266 int fd = pdispatcher->GetDescriptor(); 1468 int fd = pdispatcher->GetDescriptor();
1469 // "select"ing a file descriptor that is equal to or larger than
1470 // FD_SETSIZE will result in undefined behavior.
1471 RTC_DCHECK_LT(fd, FD_SETSIZE);
1267 if (fd > fdmax) 1472 if (fd > fdmax)
1268 fdmax = fd; 1473 fdmax = fd;
1269 1474
1270 uint32_t ff = pdispatcher->GetRequestedEvents(); 1475 uint32_t ff = pdispatcher->GetRequestedEvents();
1271 if (ff & (DE_READ | DE_ACCEPT)) 1476 if (ff & (DE_READ | DE_ACCEPT))
1272 FD_SET(fd, &fdsRead); 1477 FD_SET(fd, &fdsRead);
1273 if (ff & (DE_WRITE | DE_CONNECT)) 1478 if (ff & (DE_WRITE | DE_CONNECT))
1274 FD_SET(fd, &fdsWrite); 1479 FD_SET(fd, &fdsWrite);
1275 } 1480 }
1276 } 1481 }
(...skipping 13 matching lines...) Expand all
1290 // Else ignore the error and keep going. If this EINTR was for one of the 1495 // Else ignore the error and keep going. If this EINTR was for one of the
1291 // signals managed by this PhysicalSocketServer, the 1496 // signals managed by this PhysicalSocketServer, the
1292 // PosixSignalDeliveryDispatcher will be in the signaled state in the next 1497 // PosixSignalDeliveryDispatcher will be in the signaled state in the next
1293 // iteration. 1498 // iteration.
1294 } else if (n == 0) { 1499 } else if (n == 0) {
1295 // If timeout, return success 1500 // If timeout, return success
1296 return true; 1501 return true;
1297 } else { 1502 } else {
1298 // We have signaled descriptors 1503 // We have signaled descriptors
1299 CritScope cr(&crit_); 1504 CritScope cr(&crit_);
1300 for (size_t i = 0; i < dispatchers_.size(); ++i) { 1505 processing_dispatchers_ = true;
1301 Dispatcher *pdispatcher = dispatchers_[i]; 1506 for (Dispatcher* pdispatcher : dispatchers_) {
1302 int fd = pdispatcher->GetDescriptor(); 1507 int fd = pdispatcher->GetDescriptor();
1303 uint32_t ff = 0;
1304 int errcode = 0;
1305 1508
1306 // Reap any error code, which can be signaled through reads or writes. 1509 bool readable = FD_ISSET(fd, &fdsRead);
1307 // TODO(pthatcher): Should we set errcode if getsockopt fails? 1510 if (readable) {
1308 if (FD_ISSET(fd, &fdsRead) || FD_ISSET(fd, &fdsWrite)) { 1511 FD_CLR(fd, &fdsRead);
1309 socklen_t len = sizeof(errcode);
1310 ::getsockopt(fd, SOL_SOCKET, SO_ERROR, &errcode, &len);
1311 } 1512 }
1312 1513
1313 // Check readable descriptors. If we're waiting on an accept, signal 1514 bool writable = FD_ISSET(fd, &fdsWrite);
1314 // that. Otherwise we're waiting for data, check to see if we're 1515 if (writable) {
1315 // readable or really closed. 1516 FD_CLR(fd, &fdsWrite);
1316 // TODO(pthatcher): Only peek at TCP descriptors.
1317 if (FD_ISSET(fd, &fdsRead)) {
1318 FD_CLR(fd, &fdsRead);
1319 if (pdispatcher->GetRequestedEvents() & DE_ACCEPT) {
1320 ff |= DE_ACCEPT;
1321 } else if (errcode || pdispatcher->IsDescriptorClosed()) {
1322 ff |= DE_CLOSE;
1323 } else {
1324 ff |= DE_READ;
1325 }
1326 } 1517 }
1327 1518
1328 // Check writable descriptors. If we're waiting on a connect, detect 1519 // The error code can be signaled through reads or writes.
1329 // success versus failure by the reaped error code. 1520 ProcessEvents(pdispatcher, readable, writable, readable || writable);
1330 if (FD_ISSET(fd, &fdsWrite)) { 1521 }
1331 FD_CLR(fd, &fdsWrite);
1332 if (pdispatcher->GetRequestedEvents() & DE_CONNECT) {
1333 if (!errcode) {
1334 ff |= DE_CONNECT;
1335 } else {
1336 ff |= DE_CLOSE;
1337 }
1338 } else {
1339 ff |= DE_WRITE;
1340 }
1341 }
1342 1522
1343 // Tell the descriptor about the event. 1523 processing_dispatchers_ = false;
1344 if (ff != 0) { 1524 // Process deferred dispatchers that have been added/removed while the
1345 pdispatcher->OnPreEvent(ff); 1525 // events were handled above.
1346 pdispatcher->OnEvent(ff, errcode); 1526 AddRemovePendingDispatchers();
1347 }
1348 }
1349 } 1527 }
1350 1528
1351 // Recalc the time remaining to wait. Doing it here means it doesn't get 1529 // Recalc the time remaining to wait. Doing it here means it doesn't get
1352 // calced twice the first time through the loop 1530 // calced twice the first time through the loop
1353 if (ptvWait) { 1531 if (ptvWait) {
1354 ptvWait->tv_sec = 0; 1532 ptvWait->tv_sec = 0;
1355 ptvWait->tv_usec = 0; 1533 ptvWait->tv_usec = 0;
1356 struct timeval tvT; 1534 struct timeval tvT;
1357 gettimeofday(&tvT, nullptr); 1535 gettimeofday(&tvT, nullptr);
1358 if ((tvStop.tv_sec > tvT.tv_sec) 1536 if ((tvStop.tv_sec > tvT.tv_sec)
1359 || ((tvStop.tv_sec == tvT.tv_sec) 1537 || ((tvStop.tv_sec == tvT.tv_sec)
1360 && (tvStop.tv_usec > tvT.tv_usec))) { 1538 && (tvStop.tv_usec > tvT.tv_usec))) {
1361 ptvWait->tv_sec = tvStop.tv_sec - tvT.tv_sec; 1539 ptvWait->tv_sec = tvStop.tv_sec - tvT.tv_sec;
1362 ptvWait->tv_usec = tvStop.tv_usec - tvT.tv_usec; 1540 ptvWait->tv_usec = tvStop.tv_usec - tvT.tv_usec;
1363 if (ptvWait->tv_usec < 0) { 1541 if (ptvWait->tv_usec < 0) {
1364 RTC_DCHECK(ptvWait->tv_sec > 0); 1542 RTC_DCHECK(ptvWait->tv_sec > 0);
1365 ptvWait->tv_usec += 1000000; 1543 ptvWait->tv_usec += 1000000;
1366 ptvWait->tv_sec -= 1; 1544 ptvWait->tv_sec -= 1;
1367 } 1545 }
1368 } 1546 }
1369 } 1547 }
1370 } 1548 }
1371 1549
1372 return true; 1550 return true;
1373 } 1551 }
1374 1552
1553 #if defined(WEBRTC_USE_EPOLL)
1554
1555 // Initial number of events to process with one call to "epoll_wait".
1556 static const size_t kInitialEpollEvents = 128;
1557
1558 // Maximum number of events to process with one call to "epoll_wait".
1559 static const size_t kMaxEpollEvents = 8192;
1560
1561 void PhysicalSocketServer::AddEpoll(Dispatcher* pdispatcher) {
1562 RTC_DCHECK(epoll_fd_ != INVALID_SOCKET);
1563 int fd = pdispatcher->GetDescriptor();
1564 RTC_DCHECK(fd != INVALID_SOCKET);
1565 if (fd == INVALID_SOCKET) {
1566 return;
1567 }
1568
1569 struct epoll_event event = {0};
1570 event.events = GetEpollEvents(pdispatcher->GetRequestedEvents());
1571 event.data.ptr = pdispatcher;
1572 int err = epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, fd, &event);
1573 RTC_DCHECK_EQ(err, 0);
1574 if (err == -1) {
1575 LOG_E(LS_ERROR, EN, errno) << "epoll_ctl EPOLL_CTL_ADD";
1576 }
1577 }
1578
1579 void PhysicalSocketServer::RemoveEpoll(Dispatcher* pdispatcher) {
1580 RTC_DCHECK(epoll_fd_ != INVALID_SOCKET);
1581 int fd = pdispatcher->GetDescriptor();
1582 RTC_DCHECK(fd != INVALID_SOCKET);
1583 if (fd == INVALID_SOCKET) {
1584 return;
1585 }
1586
1587 struct epoll_event event = {0};
1588 int err = epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, fd, &event);
1589 RTC_DCHECK(err == 0 || errno == ENOENT);
1590 if (err == -1) {
1591 if (errno == ENOENT) {
1592 // Socket has already been closed.
1593 LOG_E(LS_VERBOSE, EN, errno) << "epoll_ctl EPOLL_CTL_DEL";
1594 } else {
1595 LOG_E(LS_ERROR, EN, errno) << "epoll_ctl EPOLL_CTL_DEL";
1596 }
1597 }
1598 }
1599
1600 void PhysicalSocketServer::UpdateEpoll(Dispatcher* pdispatcher) {
1601 RTC_DCHECK(epoll_fd_ != INVALID_SOCKET);
1602 int fd = pdispatcher->GetDescriptor();
1603 RTC_DCHECK(fd != INVALID_SOCKET);
1604 if (fd == INVALID_SOCKET) {
1605 return;
1606 }
1607
1608 struct epoll_event event = {0};
1609 event.events = GetEpollEvents(pdispatcher->GetRequestedEvents());
1610 event.data.ptr = pdispatcher;
1611 int err = epoll_ctl(epoll_fd_, EPOLL_CTL_MOD, fd, &event);
1612 RTC_DCHECK_EQ(err, 0);
1613 if (err == -1) {
1614 LOG_E(LS_ERROR, EN, errno) << "epoll_ctl EPOLL_CTL_MOD";
1615 }
1616 }
1617
1618 bool PhysicalSocketServer::WaitEpoll(int cmsWait) {
1619 RTC_DCHECK(epoll_fd_ != INVALID_SOCKET);
1620 int64_t tvWait = -1;
1621 int64_t tvStop = -1;
1622 if (cmsWait != kForever) {
1623 tvWait = cmsWait;
1624 tvStop = TimeAfter(cmsWait);
1625 }
1626
1627 if (epoll_events_.empty()) {
1628 // The initial space to receive events is created only if epoll is used.
1629 epoll_events_.resize(kInitialEpollEvents);
1630 }
1631
1632 fWait_ = true;
1633
1634 while (fWait_) {
1635 // Wait then call handlers as appropriate
1636 // < 0 means error
1637 // 0 means timeout
1638 // > 0 means count of descriptors ready
1639 int n = epoll_wait(epoll_fd_, &epoll_events_[0],
1640 static_cast<int>(epoll_events_.size()),
1641 static_cast<int>(tvWait));
1642 if (n < 0) {
1643 if (errno != EINTR) {
1644 LOG_E(LS_ERROR, EN, errno) << "epoll";
1645 return false;
1646 }
1647 // Else ignore the error and keep going. If this EINTR was for one of the
1648 // signals managed by this PhysicalSocketServer, the
1649 // PosixSignalDeliveryDispatcher will be in the signaled state in the next
1650 // iteration.
1651 } else if (n == 0) {
1652 // If timeout, return success
1653 return true;
1654 } else {
1655 // We have signaled descriptors
1656 CritScope cr(&crit_);
1657 for (int i = 0; i < n; ++i) {
1658 const epoll_event& event = epoll_events_[i];
1659 Dispatcher* pdispatcher = static_cast<Dispatcher*>(event.data.ptr);
1660 if (dispatchers_.find(pdispatcher) == dispatchers_.end()) {
1661 // The dispatcher for this socket no longer exists.
1662 continue;
1663 }
1664
1665 bool readable = (event.events & (EPOLLIN | EPOLLPRI));
1666 bool writable = (event.events & EPOLLOUT);
1667 bool check_error = (event.events & (EPOLLRDHUP | EPOLLERR | EPOLLHUP));
1668
1669 ProcessEvents(pdispatcher, readable, writable, check_error);
1670 }
1671 }
1672
1673 if (static_cast<size_t>(n) == epoll_events_.size() &&
1674 epoll_events_.size() < kMaxEpollEvents) {
1675 // We used the complete space to receive events, increase size for future
1676 // iterations.
1677 epoll_events_.resize(std::max(epoll_events_.size() * 2, kMaxEpollEvents));
1678 }
1679
1680 if (cmsWait != kForever) {
1681 tvWait = TimeDiff(tvStop, TimeMillis());
1682 if (tvWait < 0) {
1683 // Return success on timeout.
1684 return true;
1685 }
1686 }
1687 }
1688
1689 return true;
1690 }
1691
1692 bool PhysicalSocketServer::WaitPoll(int cmsWait, Dispatcher* dispatcher) {
1693 RTC_DCHECK(dispatcher);
1694 int64_t tvWait = -1;
1695 int64_t tvStop = -1;
1696 if (cmsWait != kForever) {
1697 tvWait = cmsWait;
1698 tvStop = TimeAfter(cmsWait);
1699 }
1700
1701 fWait_ = true;
1702
1703 struct pollfd fds = {0};
1704 int fd = dispatcher->GetDescriptor();
1705 fds.fd = fd;
1706
1707 while (fWait_) {
1708 uint32_t ff = dispatcher->GetRequestedEvents();
1709 fds.events = 0;
1710 if (ff & (DE_READ | DE_ACCEPT)) {
1711 fds.events |= POLLIN;
1712 }
1713 if (ff & (DE_WRITE | DE_CONNECT)) {
1714 fds.events |= POLLOUT;
1715 }
1716 fds.revents = 0;
1717
1718 // Wait then call handlers as appropriate
1719 // < 0 means error
1720 // 0 means timeout
1721 // > 0 means count of descriptors ready
1722 int n = poll(&fds, 1, static_cast<int>(tvWait));
1723 if (n < 0) {
1724 if (errno != EINTR) {
1725 LOG_E(LS_ERROR, EN, errno) << "poll";
1726 return false;
1727 }
1728 // Else ignore the error and keep going. If this EINTR was for one of the
1729 // signals managed by this PhysicalSocketServer, the
1730 // PosixSignalDeliveryDispatcher will be in the signaled state in the next
1731 // iteration.
1732 } else if (n == 0) {
1733 // If timeout, return success
1734 return true;
1735 } else {
1736 // We have signaled descriptors (should only be the passed dispatcher).
1737 RTC_DCHECK_EQ(n, 1);
1738 RTC_DCHECK_EQ(fds.fd, fd);
1739
1740 bool readable = (fds.revents & (POLLIN | POLLPRI));
1741 bool writable = (fds.revents & POLLOUT);
1742 bool check_error = (fds.revents & (POLLRDHUP | POLLERR | POLLHUP));
1743
1744 ProcessEvents(dispatcher, readable, writable, check_error);
1745 }
1746
1747 if (cmsWait != kForever) {
1748 tvWait = TimeDiff(tvStop, TimeMillis());
1749 if (tvWait < 0) {
1750 // Return success on timeout.
1751 return true;
1752 }
1753 }
1754 }
1755
1756 return true;
1757 }
1758
1759 #endif // WEBRTC_USE_EPOLL
1760
1375 static void GlobalSignalHandler(int signum) { 1761 static void GlobalSignalHandler(int signum) {
1376 PosixSignalHandler::Instance()->OnPosixSignalReceived(signum); 1762 PosixSignalHandler::Instance()->OnPosixSignalReceived(signum);
1377 } 1763 }
1378 1764
1379 bool PhysicalSocketServer::SetPosixSignalHandler(int signum, 1765 bool PhysicalSocketServer::SetPosixSignalHandler(int signum,
1380 void (*handler)(int)) { 1766 void (*handler)(int)) {
1381 // If handler is SIG_IGN or SIG_DFL then clear our user-level handler, 1767 // If handler is SIG_IGN or SIG_DFL then clear our user-level handler,
1382 // otherwise set one. 1768 // otherwise set one.
1383 if (handler == SIG_IGN || handler == SIG_DFL) { 1769 if (handler == SIG_IGN || handler == SIG_DFL) {
1384 if (!InstallSignal(signum, handler)) { 1770 if (!InstallSignal(signum, handler)) {
(...skipping 53 matching lines...) Expand 10 before | Expand all | Expand 10 after
1438 1824
1439 fWait_ = true; 1825 fWait_ = true;
1440 while (fWait_) { 1826 while (fWait_) {
1441 std::vector<WSAEVENT> events; 1827 std::vector<WSAEVENT> events;
1442 std::vector<Dispatcher *> event_owners; 1828 std::vector<Dispatcher *> event_owners;
1443 1829
1444 events.push_back(socket_ev_); 1830 events.push_back(socket_ev_);
1445 1831
1446 { 1832 {
1447 CritScope cr(&crit_); 1833 CritScope cr(&crit_);
1448 size_t i = 0; 1834 // TODO(jbauch): Support re-entrant waiting.
1449 iterators_.push_back(&i); 1835 RTC_DCHECK(!processing_dispatchers_);
1450 // Don't track dispatchers_.size(), because we want to pick up any new 1836
1451 // dispatchers that were added while processing the loop. 1837 // Calling "CheckSignalClose" might remove a closed dispatcher from the
1452 while (i < dispatchers_.size()) { 1838 // set. This must be deferred to prevent invalidating the iterator.
1453 Dispatcher* disp = dispatchers_[i++]; 1839 processing_dispatchers_ = true;
1840 for (Dispatcher* disp : dispatchers_) {
1454 if (!process_io && (disp != signal_wakeup_)) 1841 if (!process_io && (disp != signal_wakeup_))
1455 continue; 1842 continue;
1456 SOCKET s = disp->GetSocket(); 1843 SOCKET s = disp->GetSocket();
1457 if (disp->CheckSignalClose()) { 1844 if (disp->CheckSignalClose()) {
1458 // We just signalled close, don't poll this socket 1845 // We just signalled close, don't poll this socket
1459 } else if (s != INVALID_SOCKET) { 1846 } else if (s != INVALID_SOCKET) {
1460 WSAEventSelect(s, 1847 WSAEventSelect(s,
1461 events[0], 1848 events[0],
1462 FlagsToEvents(disp->GetRequestedEvents())); 1849 FlagsToEvents(disp->GetRequestedEvents()));
1463 } else { 1850 } else {
1464 events.push_back(disp->GetWSAEvent()); 1851 events.push_back(disp->GetWSAEvent());
1465 event_owners.push_back(disp); 1852 event_owners.push_back(disp);
1466 } 1853 }
1467 } 1854 }
1468 RTC_DCHECK(iterators_.back() == &i); 1855
1469 iterators_.pop_back(); 1856 processing_dispatchers_ = false;
1857 // Process deferred dispatchers that have been added/removed while the
1858 // events were handled above.
1859 AddRemovePendingDispatchers();
1470 } 1860 }
1471 1861
1472 // Which is shorter, the delay wait or the asked wait? 1862 // Which is shorter, the delay wait or the asked wait?
1473 1863
1474 int64_t cmsNext; 1864 int64_t cmsNext;
1475 if (cmsWait == kForever) { 1865 if (cmsWait == kForever) {
1476 cmsNext = cmsWait; 1866 cmsNext = cmsWait;
1477 } else { 1867 } else {
1478 cmsNext = std::max<int64_t>(0, cmsTotal - cmsElapsed); 1868 cmsNext = std::max<int64_t>(0, cmsTotal - cmsElapsed);
1479 } 1869 }
(...skipping 13 matching lines...) Expand all
1493 return false; 1883 return false;
1494 } else if (dw == WSA_WAIT_TIMEOUT) { 1884 } else if (dw == WSA_WAIT_TIMEOUT) {
1495 // Timeout? 1885 // Timeout?
1496 return true; 1886 return true;
1497 } else { 1887 } else {
1498 // Figure out which one it is and call it 1888 // Figure out which one it is and call it
1499 CritScope cr(&crit_); 1889 CritScope cr(&crit_);
1500 int index = dw - WSA_WAIT_EVENT_0; 1890 int index = dw - WSA_WAIT_EVENT_0;
1501 if (index > 0) { 1891 if (index > 0) {
1502 --index; // The first event is the socket event 1892 --index; // The first event is the socket event
1503 event_owners[index]->OnPreEvent(0); 1893 Dispatcher* disp = event_owners[index];
1504 event_owners[index]->OnEvent(0, 0); 1894 // The dispatcher could have been removed while waiting for events.
1895 if (dispatchers_.find(disp) != dispatchers_.end()) {
1896 disp->OnPreEvent(0);
1897 disp->OnEvent(0, 0);
1898 }
1505 } else if (process_io) { 1899 } else if (process_io) {
1506 size_t i = 0, end = dispatchers_.size(); 1900 processing_dispatchers_ = true;
1507 iterators_.push_back(&i); 1901 for (Dispatcher* disp : dispatchers_) {
1508 iterators_.push_back(&end); // Don't iterate over new dispatchers.
1509 while (i < end) {
1510 Dispatcher* disp = dispatchers_[i++];
1511 SOCKET s = disp->GetSocket(); 1902 SOCKET s = disp->GetSocket();
1512 if (s == INVALID_SOCKET) 1903 if (s == INVALID_SOCKET)
1513 continue; 1904 continue;
1514 1905
1515 WSANETWORKEVENTS wsaEvents; 1906 WSANETWORKEVENTS wsaEvents;
1516 int err = WSAEnumNetworkEvents(s, events[0], &wsaEvents); 1907 int err = WSAEnumNetworkEvents(s, events[0], &wsaEvents);
1517 if (err == 0) { 1908 if (err == 0) {
1518 { 1909 {
1519 if ((wsaEvents.lNetworkEvents & FD_READ) && 1910 if ((wsaEvents.lNetworkEvents & FD_READ) &&
1520 wsaEvents.iErrorCode[FD_READ_BIT] != 0) { 1911 wsaEvents.iErrorCode[FD_READ_BIT] != 0) {
(...skipping 40 matching lines...) Expand 10 before | Expand all | Expand 10 after
1561 if (wsaEvents.lNetworkEvents & FD_CLOSE) { 1952 if (wsaEvents.lNetworkEvents & FD_CLOSE) {
1562 ff |= DE_CLOSE; 1953 ff |= DE_CLOSE;
1563 errcode = wsaEvents.iErrorCode[FD_CLOSE_BIT]; 1954 errcode = wsaEvents.iErrorCode[FD_CLOSE_BIT];
1564 } 1955 }
1565 if (ff != 0) { 1956 if (ff != 0) {
1566 disp->OnPreEvent(ff); 1957 disp->OnPreEvent(ff);
1567 disp->OnEvent(ff, errcode); 1958 disp->OnEvent(ff, errcode);
1568 } 1959 }
1569 } 1960 }
1570 } 1961 }
1571 RTC_DCHECK(iterators_.back() == &end); 1962
1572 iterators_.pop_back(); 1963 processing_dispatchers_ = false;
1573 RTC_DCHECK(iterators_.back() == &i); 1964 // Process deferred dispatchers that have been added/removed while the
1574 iterators_.pop_back(); 1965 // events were handled above.
1966 AddRemovePendingDispatchers();
1575 } 1967 }
1576 1968
1577 // Reset the network event until new activity occurs 1969 // Reset the network event until new activity occurs
1578 WSAResetEvent(socket_ev_); 1970 WSAResetEvent(socket_ev_);
1579 } 1971 }
1580 1972
1581 // Break? 1973 // Break?
1582 if (!fWait_) 1974 if (!fWait_)
1583 break; 1975 break;
1584 cmsElapsed = TimeSince(msStart); 1976 cmsElapsed = TimeSince(msStart);
1585 if ((cmsWait != kForever) && (cmsElapsed >= cmsWait)) { 1977 if ((cmsWait != kForever) && (cmsElapsed >= cmsWait)) {
1586 break; 1978 break;
1587 } 1979 }
1588 } 1980 }
1589 1981
1590 // Done 1982 // Done
1591 return true; 1983 return true;
1592 } 1984 }
1593 #endif // WEBRTC_WIN 1985 #endif // WEBRTC_WIN
1594 1986
1595 } // namespace rtc 1987 } // namespace rtc
OLDNEW
« no previous file with comments | « webrtc/base/physicalsocketserver.h ('k') | no next file » | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698