Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(310)

Side by Side Diff: webrtc/base/physicalsocketserver.cc

Issue 2880923002: Support epoll in PhysicalSocketServer. (Closed)
Patch Set: Defer adding/removing of dispatchers while processing events. Created 3 years, 6 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 /* 1 /*
2 * Copyright 2004 The WebRTC Project Authors. All rights reserved. 2 * Copyright 2004 The WebRTC Project Authors. All rights reserved.
3 * 3 *
4 * Use of this source code is governed by a BSD-style license 4 * Use of this source code is governed by a BSD-style license
5 * that can be found in the LICENSE file in the root of the source 5 * that can be found in the LICENSE file in the root of the source
6 * tree. An additional intellectual property rights grant can be found 6 * tree. An additional intellectual property rights grant can be found
7 * in the file PATENTS. All contributing project authors may 7 * in the file PATENTS. All contributing project authors may
8 * be found in the AUTHORS file in the root of the source tree. 8 * be found in the AUTHORS file in the root of the source tree.
9 */ 9 */
10 #include "webrtc/base/physicalsocketserver.h" 10 #include "webrtc/base/physicalsocketserver.h"
11 11
12 #if defined(_MSC_VER) && _MSC_VER < 1300 12 #if defined(_MSC_VER) && _MSC_VER < 1300
13 #pragma warning(disable:4786) 13 #pragma warning(disable:4786)
14 #endif 14 #endif
15 15
16 #ifdef MEMORY_SANITIZER 16 #ifdef MEMORY_SANITIZER
17 #include <sanitizer/msan_interface.h> 17 #include <sanitizer/msan_interface.h>
18 #endif 18 #endif
19 19
20 #if defined(WEBRTC_POSIX) 20 #if defined(WEBRTC_POSIX)
21 #include <string.h> 21 #include <string.h>
22 #include <errno.h> 22 #include <errno.h>
23 #include <fcntl.h> 23 #include <fcntl.h>
24 #if defined(WEBRTC_USE_EPOLL)
25 // "poll" will be used to wait for the signal dispatcher.
26 #include <poll.h>
27 #endif
24 #include <sys/ioctl.h> 28 #include <sys/ioctl.h>
25 #include <sys/time.h> 29 #include <sys/time.h>
26 #include <sys/select.h> 30 #include <sys/select.h>
27 #include <unistd.h> 31 #include <unistd.h>
28 #include <signal.h> 32 #include <signal.h>
29 #endif 33 #endif
30 34
31 #if defined(WEBRTC_WIN) 35 #if defined(WEBRTC_WIN)
32 #define WIN32_LEAN_AND_MEAN 36 #define WIN32_LEAN_AND_MEAN
33 #include <windows.h> 37 #include <windows.h>
(...skipping 39 matching lines...) Expand 10 before | Expand all | Expand 10 after
73 77
74 int64_t GetSocketRecvTimestamp(int socket) { 78 int64_t GetSocketRecvTimestamp(int socket) {
75 return -1; 79 return -1;
76 } 80 }
77 #endif 81 #endif
78 82
79 #if defined(WEBRTC_WIN) 83 #if defined(WEBRTC_WIN)
80 typedef char* SockOptArg; 84 typedef char* SockOptArg;
81 #endif 85 #endif
82 86
87 #if defined(WEBRTC_USE_EPOLL)
88 // POLLRDHUP / EPOLLRDHUP are only defined starting with Linux 2.6.17.
89 #if !defined(POLLRDHUP)
90 #define POLLRDHUP 0x2000
91 #endif
92 #if !defined(EPOLLRDHUP)
93 #define EPOLLRDHUP 0x2000
94 #endif
95 #endif
96
83 namespace rtc { 97 namespace rtc {
84 98
85 std::unique_ptr<SocketServer> SocketServer::CreateDefault() { 99 std::unique_ptr<SocketServer> SocketServer::CreateDefault() {
86 #if defined(__native_client__) 100 #if defined(__native_client__)
87 return std::unique_ptr<SocketServer>(new rtc::NullSocketServer); 101 return std::unique_ptr<SocketServer>(new rtc::NullSocketServer);
88 #else 102 #else
89 return std::unique_ptr<SocketServer>(new rtc::PhysicalSocketServer); 103 return std::unique_ptr<SocketServer>(new rtc::PhysicalSocketServer);
90 #endif 104 #endif
91 } 105 }
92 106
(...skipping 665 matching lines...) Expand 10 before | Expand all | Expand 10 after
758 } 772 }
759 if (((ff & DE_CLOSE) != 0) && (id_ == cache_id)) { 773 if (((ff & DE_CLOSE) != 0) && (id_ == cache_id)) {
760 signal_close_ = true; 774 signal_close_ = true;
761 signal_err_ = err; 775 signal_err_ = err;
762 } 776 }
763 } 777 }
764 778
765 #elif defined(WEBRTC_POSIX) 779 #elif defined(WEBRTC_POSIX)
766 780
767 void SocketDispatcher::OnEvent(uint32_t ff, int err) { 781 void SocketDispatcher::OnEvent(uint32_t ff, int err) {
782 #if defined(WEBRTC_USE_EPOLL)
783 // Remember currently enabled events so we can combine multiple changes
784 // into one update call later.
785 // The signal handlers might re-enable events disabled here, so we can't
786 // keep a list of events to disable at the end of the method. This list
787 // would not be updated with the events enabled by the signal handlers.
788 StartBatchedEventUpdates();
789 #endif
768 // Make sure we deliver connect/accept first. Otherwise, consumers may see 790 // Make sure we deliver connect/accept first. Otherwise, consumers may see
769 // something like a READ followed by a CONNECT, which would be odd. 791 // something like a READ followed by a CONNECT, which would be odd.
770 if ((ff & DE_CONNECT) != 0) { 792 if ((ff & DE_CONNECT) != 0) {
771 DisableEvents(DE_CONNECT); 793 DisableEvents(DE_CONNECT);
772 SignalConnectEvent(this); 794 SignalConnectEvent(this);
773 } 795 }
774 if ((ff & DE_ACCEPT) != 0) { 796 if ((ff & DE_ACCEPT) != 0) {
775 DisableEvents(DE_ACCEPT); 797 DisableEvents(DE_ACCEPT);
776 SignalReadEvent(this); 798 SignalReadEvent(this);
777 } 799 }
778 if ((ff & DE_READ) != 0) { 800 if ((ff & DE_READ) != 0) {
779 DisableEvents(DE_READ); 801 DisableEvents(DE_READ);
780 SignalReadEvent(this); 802 SignalReadEvent(this);
781 } 803 }
782 if ((ff & DE_WRITE) != 0) { 804 if ((ff & DE_WRITE) != 0) {
783 DisableEvents(DE_WRITE); 805 DisableEvents(DE_WRITE);
784 SignalWriteEvent(this); 806 SignalWriteEvent(this);
785 } 807 }
786 if ((ff & DE_CLOSE) != 0) { 808 if ((ff & DE_CLOSE) != 0) {
787 // The socket is now dead to us, so stop checking it. 809 // The socket is now dead to us, so stop checking it.
788 SetEnabledEvents(0); 810 SetEnabledEvents(0);
789 SignalCloseEvent(this, err); 811 SignalCloseEvent(this, err);
790 } 812 }
813 #if defined(WEBRTC_USE_EPOLL)
814 FinishBatchedEventUpdates();
815 #endif
791 } 816 }
792 817
793 #endif // WEBRTC_POSIX 818 #endif // WEBRTC_POSIX
794 819
820 #if defined(WEBRTC_USE_EPOLL)
821
822 static int GetEpollEvents(uint32_t ff) {
823 int events = 0;
824 if (ff & (DE_READ | DE_ACCEPT)) {
825 events |= EPOLLIN;
826 }
827 if (ff & (DE_WRITE | DE_CONNECT)) {
828 events |= EPOLLOUT;
829 }
830 return events;
831 }
832
833 void SocketDispatcher::StartBatchedEventUpdates() {
834 RTC_DCHECK_EQ(saved_enabled_events_, -1);
835 saved_enabled_events_ = enabled_events();
836 }
837
838 void SocketDispatcher::FinishBatchedEventUpdates() {
839 RTC_DCHECK_NE(saved_enabled_events_, -1);
840 uint8_t old_events = static_cast<uint8_t>(saved_enabled_events_);
841 saved_enabled_events_ = -1;
842 MaybeUpdateDispatcher(old_events);
843 }
844
845 void SocketDispatcher::MaybeUpdateDispatcher(uint8_t old_events) {
846 if (GetEpollEvents(enabled_events()) != GetEpollEvents(old_events) &&
847 saved_enabled_events_ == -1) {
848 ss_->Update(this);
849 }
850 }
851
852 void SocketDispatcher::SetEnabledEvents(uint8_t events) {
853 uint8_t old_events = enabled_events();
854 PhysicalSocket::SetEnabledEvents(events);
855 MaybeUpdateDispatcher(old_events);
856 }
857
858 void SocketDispatcher::EnableEvents(uint8_t events) {
859 uint8_t old_events = enabled_events();
860 PhysicalSocket::EnableEvents(events);
861 MaybeUpdateDispatcher(old_events);
862 }
863
864 void SocketDispatcher::DisableEvents(uint8_t events) {
865 uint8_t old_events = enabled_events();
866 PhysicalSocket::DisableEvents(events);
867 MaybeUpdateDispatcher(old_events);
868 }
869
870 #endif // WEBRTC_USE_EPOLL
871
795 int SocketDispatcher::Close() { 872 int SocketDispatcher::Close() {
796 if (s_ == INVALID_SOCKET) 873 if (s_ == INVALID_SOCKET)
797 return 0; 874 return 0;
798 875
799 #if defined(WEBRTC_WIN) 876 #if defined(WEBRTC_WIN)
800 id_ = 0; 877 id_ = 0;
801 signal_close_ = false; 878 signal_close_ = false;
802 #endif 879 #endif
803 ss_->Remove(this); 880 ss_->Remove(this);
804 return PhysicalSocket::Close(); 881 return PhysicalSocket::Close();
(...skipping 308 matching lines...) Expand 10 before | Expand all | Expand 10 after
1113 if (pf_) 1190 if (pf_)
1114 *pf_ = false; 1191 *pf_ = false;
1115 } 1192 }
1116 1193
1117 private: 1194 private:
1118 bool *pf_; 1195 bool *pf_;
1119 }; 1196 };
1120 1197
1121 PhysicalSocketServer::PhysicalSocketServer() 1198 PhysicalSocketServer::PhysicalSocketServer()
1122 : fWait_(false) { 1199 : fWait_(false) {
1200 #if defined(WEBRTC_USE_EPOLL)
1201 // Since Linux 2.6.8, the size argument is ignored, but must be greater than
1202 // zero. Before that the size served as hint to the kernel for the amount of
1203 // space to initially allocate in internal data structures.
1204 epoll_fd_ = epoll_create(FD_SETSIZE);
1205 if (epoll_fd_ == -1) {
1206 // Not an error, will fall back to "select" below.
1207 LOG_E(LS_WARNING, EN, errno) << "epoll_create";
1208 epoll_fd_ = INVALID_SOCKET;
1209 }
1210 #endif
1123 signal_wakeup_ = new Signaler(this, &fWait_); 1211 signal_wakeup_ = new Signaler(this, &fWait_);
1124 #if defined(WEBRTC_WIN) 1212 #if defined(WEBRTC_WIN)
1125 socket_ev_ = WSACreateEvent(); 1213 socket_ev_ = WSACreateEvent();
1126 #endif 1214 #endif
1127 } 1215 }
1128 1216
1129 PhysicalSocketServer::~PhysicalSocketServer() { 1217 PhysicalSocketServer::~PhysicalSocketServer() {
1130 #if defined(WEBRTC_WIN) 1218 #if defined(WEBRTC_WIN)
1131 WSACloseEvent(socket_ev_); 1219 WSACloseEvent(socket_ev_);
1132 #endif 1220 #endif
1133 #if defined(WEBRTC_POSIX) 1221 #if defined(WEBRTC_POSIX)
1134 signal_dispatcher_.reset(); 1222 signal_dispatcher_.reset();
1135 #endif 1223 #endif
1136 delete signal_wakeup_; 1224 delete signal_wakeup_;
1225 #if defined(WEBRTC_USE_EPOLL)
1226 if (epoll_fd_ != INVALID_SOCKET) {
1227 close(epoll_fd_);
1228 }
1229 #endif
1137 RTC_DCHECK(dispatchers_.empty()); 1230 RTC_DCHECK(dispatchers_.empty());
1138 } 1231 }
1139 1232
1140 void PhysicalSocketServer::WakeUp() { 1233 void PhysicalSocketServer::WakeUp() {
1141 signal_wakeup_->Signal(); 1234 signal_wakeup_->Signal();
1142 } 1235 }
1143 1236
1144 Socket* PhysicalSocketServer::CreateSocket(int type) { 1237 Socket* PhysicalSocketServer::CreateSocket(int type) {
1145 return CreateSocket(AF_INET, type); 1238 return CreateSocket(AF_INET, type);
1146 } 1239 }
(...skipping 27 matching lines...) Expand all
1174 if (dispatcher->Initialize()) { 1267 if (dispatcher->Initialize()) {
1175 return dispatcher; 1268 return dispatcher;
1176 } else { 1269 } else {
1177 delete dispatcher; 1270 delete dispatcher;
1178 return nullptr; 1271 return nullptr;
1179 } 1272 }
1180 } 1273 }
1181 1274
1182 void PhysicalSocketServer::Add(Dispatcher *pdispatcher) { 1275 void PhysicalSocketServer::Add(Dispatcher *pdispatcher) {
1183 CritScope cs(&crit_); 1276 CritScope cs(&crit_);
1277 #if defined(WEBRTC_WIN)
1184 // Prevent duplicates. This can cause dead dispatchers to stick around. 1278 // Prevent duplicates. This can cause dead dispatchers to stick around.
1185 DispatcherList::iterator pos = std::find(dispatchers_.begin(), 1279 DispatcherList::iterator pos = std::find(dispatchers_.begin(),
1186 dispatchers_.end(), 1280 dispatchers_.end(),
1187 pdispatcher); 1281 pdispatcher);
1188 if (pos != dispatchers_.end()) 1282 if (pos != dispatchers_.end())
1189 return; 1283 return;
1190 dispatchers_.push_back(pdispatcher); 1284 dispatchers_.push_back(pdispatcher);
1285 #else
1286 if (processing_dispatchers_) {
1287 // A dispatcher is being added while a "Wait" call is processing the
1288 // list of socket events.
1289 // Defer adding to "dispatchers_" set until processing is done to avoid
1290 // invalidating the iterator in "Wait".
1291 pending_remove_dispatchers_.erase(pdispatcher);
1292 pending_add_dispatchers_.insert(pdispatcher);
1293 } else {
1294 dispatchers_.insert(pdispatcher);
1295 }
1296 #endif // WEBRTC_WIN
1297 #if defined(WEBRTC_USE_EPOLL)
1298 if (epoll_fd_ != INVALID_SOCKET) {
1299 AddEpoll(pdispatcher);
1300 }
1301 #endif // WEBRTC_USE_EPOLL
1191 } 1302 }
1192 1303
1193 void PhysicalSocketServer::Remove(Dispatcher *pdispatcher) { 1304 void PhysicalSocketServer::Remove(Dispatcher *pdispatcher) {
1194 CritScope cs(&crit_); 1305 CritScope cs(&crit_);
1306 #if defined(WEBRTC_WIN)
1195 DispatcherList::iterator pos = std::find(dispatchers_.begin(), 1307 DispatcherList::iterator pos = std::find(dispatchers_.begin(),
1196 dispatchers_.end(), 1308 dispatchers_.end(),
1197 pdispatcher); 1309 pdispatcher);
1198 // We silently ignore duplicate calls to Add, so we should silently ignore 1310 // We silently ignore duplicate calls to Add, so we should silently ignore
1199 // the (expected) symmetric calls to Remove. Note that this may still hide 1311 // the (expected) symmetric calls to Remove. Note that this may still hide
1200 // a real issue, so we at least log a warning about it. 1312 // a real issue, so we at least log a warning about it.
1201 if (pos == dispatchers_.end()) { 1313 if (pos == dispatchers_.end()) {
1202 LOG(LS_WARNING) << "PhysicalSocketServer asked to remove a unknown " 1314 LOG(LS_WARNING) << "PhysicalSocketServer asked to remove a unknown "
1203 << "dispatcher, potentially from a duplicate call to Add."; 1315 << "dispatcher, potentially from a duplicate call to Add.";
1204 return; 1316 return;
1205 } 1317 }
1206 size_t index = pos - dispatchers_.begin(); 1318 size_t index = pos - dispatchers_.begin();
1207 dispatchers_.erase(pos); 1319 dispatchers_.erase(pos);
1208 for (IteratorList::iterator it = iterators_.begin(); it != iterators_.end(); 1320 for (IteratorList::iterator it = iterators_.begin(); it != iterators_.end();
1209 ++it) { 1321 ++it) {
1210 if (index < **it) { 1322 if (index < **it) {
1211 --**it; 1323 --**it;
1212 } 1324 }
1213 } 1325 }
1326 #else
1327 if (processing_dispatchers_) {
1328 // A dispatcher is being removed while a "Wait" call is processing the
1329 // list of socket events.
1330 // Defer removal from "dispatchers_" set until processing is done to avoid
1331 // invalidating the iterator in "Wait".
1332 if (!pending_add_dispatchers_.erase(pdispatcher) &&
1333 dispatchers_.find(pdispatcher) == dispatchers_.end()) {
1334 LOG(LS_WARNING) << "PhysicalSocketServer asked to remove a unknown "
1335 << "dispatcher, potentially from a duplicate call to "
1336 << "Add.";
1337 return;
1338 }
1339
1340 pending_remove_dispatchers_.insert(pdispatcher);
1341 } else if (!dispatchers_.erase(pdispatcher)) {
1342 LOG(LS_WARNING) << "PhysicalSocketServer asked to remove a unknown "
1343 << "dispatcher, potentially from a duplicate call to Add.";
1344 return;
1345 }
1346 #endif // WEBRTC_WIN
1347 #if defined(WEBRTC_USE_EPOLL)
1348 if (epoll_fd_ != INVALID_SOCKET) {
1349 RemoveEpoll(pdispatcher);
1350 }
1351 #endif // WEBRTC_USE_EPOLL
1352 }
1353
1354 void PhysicalSocketServer::Update(Dispatcher *pdispatcher) {
1355 #if defined(WEBRTC_USE_EPOLL)
1356 if (epoll_fd_ == INVALID_SOCKET) {
1357 return;
1358 }
1359
1360 CritScope cs(&crit_);
1361 if (dispatchers_.find(pdispatcher) == dispatchers_.end()) {
1362 return;
1363 }
1364
1365 UpdateEpoll(pdispatcher);
1366 #endif
1214 } 1367 }
1215 1368
1216 #if defined(WEBRTC_POSIX) 1369 #if defined(WEBRTC_POSIX)
1370
1217 bool PhysicalSocketServer::Wait(int cmsWait, bool process_io) { 1371 bool PhysicalSocketServer::Wait(int cmsWait, bool process_io) {
1372 #if defined(WEBRTC_USE_EPOLL)
1373 // We don't keep a dedicated "epoll" descriptor containing only the non-IO
1374 // (i.e. signaling) dispatcher, so "poll" will be used instead of the default
1375 // "select" to support sockets larger than FD_SETSIZE.
1376 if (!process_io) {
1377 return WaitPoll(cmsWait, signal_wakeup_);
1378 } else if (epoll_fd_ != INVALID_SOCKET) {
1379 return WaitEpoll(cmsWait);
1380 }
1381 #endif
1382 return WaitSelect(cmsWait, process_io);
1383 }
1384
1385 static void ProcessEvents(Dispatcher* dispatcher, bool readable, bool writable,
1386 bool check_error) {
1387 int errcode = 0;
1388 // TODO(pthatcher): Should we set errcode if getsockopt fails?
1389 if (check_error) {
1390 socklen_t len = sizeof(errcode);
1391 ::getsockopt(dispatcher->GetDescriptor(), SOL_SOCKET, SO_ERROR, &errcode,
1392 &len);
1393 }
1394
1395 uint32_t ff = 0;
1396
1397 // Check readable descriptors. If we're waiting on an accept, signal
1398 // that. Otherwise we're waiting for data, check to see if we're
1399 // readable or really closed.
1400 // TODO(pthatcher): Only peek at TCP descriptors.
1401 if (readable) {
1402 if (dispatcher->GetRequestedEvents() & DE_ACCEPT) {
1403 ff |= DE_ACCEPT;
1404 } else if (errcode || dispatcher->IsDescriptorClosed()) {
1405 ff |= DE_CLOSE;
1406 } else {
1407 ff |= DE_READ;
1408 }
1409 }
1410
1411 // Check writable descriptors. If we're waiting on a connect, detect
1412 // success versus failure by the reaped error code.
1413 if (writable) {
1414 if (dispatcher->GetRequestedEvents() & DE_CONNECT) {
1415 if (!errcode) {
1416 ff |= DE_CONNECT;
1417 } else {
1418 ff |= DE_CLOSE;
1419 }
1420 } else {
1421 ff |= DE_WRITE;
1422 }
1423 }
1424
1425 // Tell the descriptor about the event.
1426 if (ff != 0) {
1427 dispatcher->OnPreEvent(ff);
1428 dispatcher->OnEvent(ff, errcode);
1429 }
1430 }
1431
1432 void PhysicalSocketServer::AddRemovePendingDispatchers() {
1433 if (!pending_add_dispatchers_.empty()) {
1434 for (Dispatcher *pdispatcher : pending_add_dispatchers_) {
Taylor Brandstetter 2017/05/30 22:05:49 Should be "Dispatcher* pdispatcher"; can you run "
joachim 2017/05/31 17:18:02 Done.
1435 dispatchers_.insert(pdispatcher);
1436 }
1437 pending_add_dispatchers_.clear();
1438 }
1439
1440 if (!pending_remove_dispatchers_.empty()) {
1441 for (Dispatcher *pdispatcher : pending_remove_dispatchers_) {
1442 dispatchers_.erase(pdispatcher);
1443 }
1444 pending_remove_dispatchers_.clear();
1445 }
1446 }
1447
1448 bool PhysicalSocketServer::WaitSelect(int cmsWait, bool process_io) {
1218 // Calculate timing information 1449 // Calculate timing information
1219 1450
1220 struct timeval* ptvWait = nullptr; 1451 struct timeval* ptvWait = nullptr;
1221 struct timeval tvWait; 1452 struct timeval tvWait;
1222 struct timeval tvStop; 1453 struct timeval tvStop;
1223 if (cmsWait != kForever) { 1454 if (cmsWait != kForever) {
1224 // Calculate wait timeval 1455 // Calculate wait timeval
1225 tvWait.tv_sec = cmsWait / 1000; 1456 tvWait.tv_sec = cmsWait / 1000;
1226 tvWait.tv_usec = (cmsWait % 1000) * 1000; 1457 tvWait.tv_usec = (cmsWait % 1000) * 1000;
1227 ptvWait = &tvWait; 1458 ptvWait = &tvWait;
(...skipping 22 matching lines...) Expand all
1250 __msan_unpoison(&fdsRead, sizeof(fdsRead)); 1481 __msan_unpoison(&fdsRead, sizeof(fdsRead));
1251 __msan_unpoison(&fdsWrite, sizeof(fdsWrite)); 1482 __msan_unpoison(&fdsWrite, sizeof(fdsWrite));
1252 #endif 1483 #endif
1253 1484
1254 fWait_ = true; 1485 fWait_ = true;
1255 1486
1256 while (fWait_) { 1487 while (fWait_) {
1257 int fdmax = -1; 1488 int fdmax = -1;
1258 { 1489 {
1259 CritScope cr(&crit_); 1490 CritScope cr(&crit_);
1260 for (size_t i = 0; i < dispatchers_.size(); ++i) { 1491 // TODO(jbauch): Support re-entrant waiting.
1492 RTC_DCHECK(!processing_dispatchers_);
1493 for (Dispatcher* pdispatcher : dispatchers_) {
1261 // Query dispatchers for read and write wait state 1494 // Query dispatchers for read and write wait state
1262 Dispatcher *pdispatcher = dispatchers_[i];
1263 RTC_DCHECK(pdispatcher); 1495 RTC_DCHECK(pdispatcher);
1264 if (!process_io && (pdispatcher != signal_wakeup_)) 1496 if (!process_io && (pdispatcher != signal_wakeup_))
1265 continue; 1497 continue;
1266 int fd = pdispatcher->GetDescriptor(); 1498 int fd = pdispatcher->GetDescriptor();
1499 // "select"ing a file descriptor that is equal to or larger than
1500 // FD_SETSIZE will result in undefined behavior.
1501 RTC_DCHECK_LT(fd, FD_SETSIZE);
1267 if (fd > fdmax) 1502 if (fd > fdmax)
1268 fdmax = fd; 1503 fdmax = fd;
1269 1504
1270 uint32_t ff = pdispatcher->GetRequestedEvents(); 1505 uint32_t ff = pdispatcher->GetRequestedEvents();
1271 if (ff & (DE_READ | DE_ACCEPT)) 1506 if (ff & (DE_READ | DE_ACCEPT))
1272 FD_SET(fd, &fdsRead); 1507 FD_SET(fd, &fdsRead);
1273 if (ff & (DE_WRITE | DE_CONNECT)) 1508 if (ff & (DE_WRITE | DE_CONNECT))
1274 FD_SET(fd, &fdsWrite); 1509 FD_SET(fd, &fdsWrite);
1275 } 1510 }
1276 } 1511 }
(...skipping 13 matching lines...) Expand all
1290 // Else ignore the error and keep going. If this EINTR was for one of the 1525 // Else ignore the error and keep going. If this EINTR was for one of the
1291 // signals managed by this PhysicalSocketServer, the 1526 // signals managed by this PhysicalSocketServer, the
1292 // PosixSignalDeliveryDispatcher will be in the signaled state in the next 1527 // PosixSignalDeliveryDispatcher will be in the signaled state in the next
1293 // iteration. 1528 // iteration.
1294 } else if (n == 0) { 1529 } else if (n == 0) {
1295 // If timeout, return success 1530 // If timeout, return success
1296 return true; 1531 return true;
1297 } else { 1532 } else {
1298 // We have signaled descriptors 1533 // We have signaled descriptors
1299 CritScope cr(&crit_); 1534 CritScope cr(&crit_);
1300 for (size_t i = 0; i < dispatchers_.size(); ++i) { 1535 processing_dispatchers_ = true;
1301 Dispatcher *pdispatcher = dispatchers_[i]; 1536 for (Dispatcher *pdispatcher : dispatchers_) {
1302 int fd = pdispatcher->GetDescriptor(); 1537 int fd = pdispatcher->GetDescriptor();
1303 uint32_t ff = 0;
1304 int errcode = 0;
1305 1538
1306 // Reap any error code, which can be signaled through reads or writes. 1539 bool readable = FD_ISSET(fd, &fdsRead);
1307 // TODO(pthatcher): Should we set errcode if getsockopt fails? 1540 if (readable) {
1308 if (FD_ISSET(fd, &fdsRead) || FD_ISSET(fd, &fdsWrite)) { 1541 FD_CLR(fd, &fdsRead);
1309 socklen_t len = sizeof(errcode);
1310 ::getsockopt(fd, SOL_SOCKET, SO_ERROR, &errcode, &len);
1311 } 1542 }
1312 1543
1313 // Check readable descriptors. If we're waiting on an accept, signal 1544 bool writable = FD_ISSET(fd, &fdsWrite);
1314 // that. Otherwise we're waiting for data, check to see if we're 1545 if (writable) {
1315 // readable or really closed. 1546 FD_CLR(fd, &fdsWrite);
1316 // TODO(pthatcher): Only peek at TCP descriptors.
1317 if (FD_ISSET(fd, &fdsRead)) {
1318 FD_CLR(fd, &fdsRead);
1319 if (pdispatcher->GetRequestedEvents() & DE_ACCEPT) {
1320 ff |= DE_ACCEPT;
1321 } else if (errcode || pdispatcher->IsDescriptorClosed()) {
1322 ff |= DE_CLOSE;
1323 } else {
1324 ff |= DE_READ;
1325 }
1326 } 1547 }
1327 1548
1328 // Check writable descriptors. If we're waiting on a connect, detect 1549 // The error code can be signaled through reads or writes.
1329 // success versus failure by the reaped error code. 1550 ProcessEvents(pdispatcher, readable, writable, readable || writable);
1330 if (FD_ISSET(fd, &fdsWrite)) { 1551 }
1331 FD_CLR(fd, &fdsWrite);
1332 if (pdispatcher->GetRequestedEvents() & DE_CONNECT) {
1333 if (!errcode) {
1334 ff |= DE_CONNECT;
1335 } else {
1336 ff |= DE_CLOSE;
1337 }
1338 } else {
1339 ff |= DE_WRITE;
1340 }
1341 }
1342 1552
1343 // Tell the descriptor about the event. 1553 processing_dispatchers_ = false;
1344 if (ff != 0) { 1554 // Process deferred dispatchers that have been added/removed while the
1345 pdispatcher->OnPreEvent(ff); 1555 // event were handled above.
Taylor Brandstetter 2017/05/30 22:05:49 "event" -> "events"
joachim 2017/05/31 17:18:02 Done.
1346 pdispatcher->OnEvent(ff, errcode); 1556 AddRemovePendingDispatchers();
1347 }
1348 }
1349 } 1557 }
1350 1558
1351 // Recalc the time remaining to wait. Doing it here means it doesn't get 1559 // Recalc the time remaining to wait. Doing it here means it doesn't get
1352 // calced twice the first time through the loop 1560 // calced twice the first time through the loop
1353 if (ptvWait) { 1561 if (ptvWait) {
1354 ptvWait->tv_sec = 0; 1562 ptvWait->tv_sec = 0;
1355 ptvWait->tv_usec = 0; 1563 ptvWait->tv_usec = 0;
1356 struct timeval tvT; 1564 struct timeval tvT;
1357 gettimeofday(&tvT, nullptr); 1565 gettimeofday(&tvT, nullptr);
1358 if ((tvStop.tv_sec > tvT.tv_sec) 1566 if ((tvStop.tv_sec > tvT.tv_sec)
1359 || ((tvStop.tv_sec == tvT.tv_sec) 1567 || ((tvStop.tv_sec == tvT.tv_sec)
1360 && (tvStop.tv_usec > tvT.tv_usec))) { 1568 && (tvStop.tv_usec > tvT.tv_usec))) {
1361 ptvWait->tv_sec = tvStop.tv_sec - tvT.tv_sec; 1569 ptvWait->tv_sec = tvStop.tv_sec - tvT.tv_sec;
1362 ptvWait->tv_usec = tvStop.tv_usec - tvT.tv_usec; 1570 ptvWait->tv_usec = tvStop.tv_usec - tvT.tv_usec;
1363 if (ptvWait->tv_usec < 0) { 1571 if (ptvWait->tv_usec < 0) {
1364 RTC_DCHECK(ptvWait->tv_sec > 0); 1572 RTC_DCHECK(ptvWait->tv_sec > 0);
1365 ptvWait->tv_usec += 1000000; 1573 ptvWait->tv_usec += 1000000;
1366 ptvWait->tv_sec -= 1; 1574 ptvWait->tv_sec -= 1;
1367 } 1575 }
1368 } 1576 }
1369 } 1577 }
1370 } 1578 }
1371 1579
1372 return true; 1580 return true;
1373 } 1581 }
1374 1582
1583 #if defined(WEBRTC_USE_EPOLL)
1584
1585 // Initial number of events to process with one call to "epoll_wait".
1586 static const size_t kInitialEpollEvents = 128;
1587
1588 // Maximum number of events to process with one call to "epoll_wait".
1589 static const size_t kMaxEpollEvents = 8192;
1590
1591 void PhysicalSocketServer::AddEpoll(Dispatcher* pdispatcher) {
1592 RTC_DCHECK(epoll_fd_ != INVALID_SOCKET);
1593 int fd = pdispatcher->GetDescriptor();
1594 RTC_DCHECK(fd != INVALID_SOCKET);
1595 if (fd == INVALID_SOCKET) {
1596 return;
1597 }
1598
1599 struct epoll_event event = {0};
1600 event.events = GetEpollEvents(pdispatcher->GetRequestedEvents());
1601 event.data.ptr = pdispatcher;
1602 int err = epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, fd, &event);
1603 RTC_DCHECK_EQ(err, 0);
1604 if (err == -1) {
1605 LOG_E(LS_ERROR, EN, errno) << "epoll_ctl EPOLL_CTL_ADD";
1606 }
1607 }
1608
1609 void PhysicalSocketServer::RemoveEpoll(Dispatcher* pdispatcher) {
1610 RTC_DCHECK(epoll_fd_ != INVALID_SOCKET);
1611 int fd = pdispatcher->GetDescriptor();
1612 RTC_DCHECK(fd != INVALID_SOCKET);
1613 if (fd == INVALID_SOCKET) {
1614 return;
1615 }
1616
1617 struct epoll_event event = {0};
1618 int err = epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, fd, &event);
1619 RTC_DCHECK(err == 0 || errno == ENOENT);
1620 if (err == -1) {
1621 if (errno == ENOENT) {
1622 // Socket has already been closed.
1623 LOG_E(LS_VERBOSE, EN, errno) << "epoll_ctl EPOLL_CTL_DEL";
1624 } else {
1625 LOG_E(LS_ERROR, EN, errno) << "epoll_ctl EPOLL_CTL_DEL";
1626 }
1627 }
1628 }
1629
1630 void PhysicalSocketServer::UpdateEpoll(Dispatcher* pdispatcher) {
1631 RTC_DCHECK(epoll_fd_ != INVALID_SOCKET);
1632 int fd = pdispatcher->GetDescriptor();
1633 RTC_DCHECK(fd != INVALID_SOCKET);
1634 if (fd == INVALID_SOCKET) {
1635 return;
1636 }
1637
1638 struct epoll_event event = {0};
1639 event.events = GetEpollEvents(pdispatcher->GetRequestedEvents());
1640 event.data.ptr = pdispatcher;
1641 int err = epoll_ctl(epoll_fd_, EPOLL_CTL_MOD, fd, &event);
1642 RTC_DCHECK_EQ(err, 0);
1643 if (err == -1) {
1644 LOG_E(LS_ERROR, EN, errno) << "epoll_ctl EPOLL_CTL_MOD";
1645 }
1646 }
1647
1648 bool PhysicalSocketServer::WaitEpoll(int cmsWait) {
1649 RTC_DCHECK(epoll_fd_ != INVALID_SOCKET);
1650 int64_t tvWait = -1;
1651 int64_t tvStop = -1;
1652 if (cmsWait != kForever) {
1653 tvWait = cmsWait;
1654 tvStop = TimeAfter(cmsWait);
1655 }
1656
1657 if (epoll_events_.empty()) {
1658 // The initial space to receive events is created only if epoll is used.
1659 epoll_events_.resize(kInitialEpollEvents);
1660 }
1661
1662 fWait_ = true;
1663
1664 while (fWait_) {
1665 // Wait then call handlers as appropriate
1666 // < 0 means error
1667 // 0 means timeout
1668 // > 0 means count of descriptors ready
1669 int n = epoll_wait(epoll_fd_, &epoll_events_[0],
1670 static_cast<int>(epoll_events_.size()), static_cast<int>(tvWait));
1671 if (n < 0) {
1672 if (errno != EINTR) {
1673 LOG_E(LS_ERROR, EN, errno) << "epoll";
1674 return false;
1675 }
1676 // Else ignore the error and keep going. If this EINTR was for one of the
1677 // signals managed by this PhysicalSocketServer, the
1678 // PosixSignalDeliveryDispatcher will be in the signaled state in the next
1679 // iteration.
1680 } else if (n == 0) {
1681 // If timeout, return success
1682 return true;
1683 } else {
1684 // We have signaled descriptors
1685 CritScope cr(&crit_);
1686 for (int i = 0; i < n; ++i) {
1687 const epoll_event& event = epoll_events_[i];
1688 Dispatcher* pdispatcher = static_cast<Dispatcher*>(event.data.ptr);
1689 if (dispatchers_.find(pdispatcher) == dispatchers_.end()) {
1690 // The dispatcher for this socket no longer exists.
1691 continue;
1692 }
1693
1694 bool readable = (event.events & (EPOLLIN | EPOLLPRI));
1695 bool writable = (event.events & EPOLLOUT);
1696 bool check_error = (event.events & (EPOLLRDHUP | EPOLLERR | EPOLLHUP));
1697
1698 ProcessEvents(pdispatcher, readable, writable, check_error);
1699 }
1700 }
1701
1702 if (static_cast<size_t>(n) == epoll_events_.size() &&
1703 epoll_events_.size() < kMaxEpollEvents) {
1704 // We used the complete space to receive events, increase size for future
1705 // iterations.
1706 epoll_events_.resize(std::max(epoll_events_.size() * 2, kMaxEpollEvents));
1707 }
1708
1709 if (cmsWait != kForever) {
1710 tvWait = TimeDiff(tvStop, TimeMillis());
1711 if (tvWait < 0) {
1712 // Return success on timeout.
1713 return true;
1714 }
1715 }
1716 }
1717
1718 return true;
1719 }
1720
1721 bool PhysicalSocketServer::WaitPoll(int cmsWait, Dispatcher* dispatcher) {
1722 RTC_DCHECK(dispatcher);
1723 int64_t tvWait = -1;
1724 int64_t tvStop = -1;
1725 if (cmsWait != kForever) {
1726 tvWait = cmsWait;
1727 tvStop = TimeAfter(cmsWait);
1728 }
1729
1730 fWait_ = true;
1731
1732 struct pollfd fds = {0};
1733 int fd = dispatcher->GetDescriptor();
1734 fds.fd = fd;
1735
1736 while (fWait_) {
1737 uint32_t ff = dispatcher->GetRequestedEvents();
1738 fds.events = 0;
1739 if (ff & (DE_READ | DE_ACCEPT)) {
1740 fds.events |= POLLIN;
1741 }
1742 if (ff & (DE_WRITE | DE_CONNECT)) {
1743 fds.events |= POLLOUT;
1744 }
1745 fds.revents = 0;
1746
1747 // Wait then call handlers as appropriate
1748 // < 0 means error
1749 // 0 means timeout
1750 // > 0 means count of descriptors ready
1751 int n = poll(&fds, 1, static_cast<int>(tvWait));
1752 if (n < 0) {
1753 if (errno != EINTR) {
1754 LOG_E(LS_ERROR, EN, errno) << "poll";
1755 return false;
1756 }
1757 // Else ignore the error and keep going. If this EINTR was for one of the
1758 // signals managed by this PhysicalSocketServer, the
1759 // PosixSignalDeliveryDispatcher will be in the signaled state in the next
1760 // iteration.
1761 } else if (n == 0) {
1762 // If timeout, return success
1763 return true;
1764 } else {
1765 // We have signaled descriptors (should only be the passed dispatcher).
1766 RTC_DCHECK_EQ(n, 1);
1767 RTC_DCHECK_EQ(fds.fd, fd);
1768
1769 bool readable = (fds.revents & (POLLIN | POLLPRI));
1770 bool writable = (fds.revents & POLLOUT);
1771 bool check_error = (fds.revents & (POLLRDHUP | POLLERR | POLLHUP));
1772
1773 ProcessEvents(dispatcher, readable, writable, check_error);
1774 }
1775
1776 if (cmsWait != kForever) {
1777 tvWait = TimeDiff(tvStop, TimeMillis());
1778 if (tvWait < 0) {
1779 // Return success on timeout.
1780 return true;
1781 }
1782 }
1783 }
1784
1785 return true;
1786 }
1787
1788 #endif // WEBRTC_USE_EPOLL
1789
1375 static void GlobalSignalHandler(int signum) { 1790 static void GlobalSignalHandler(int signum) {
1376 PosixSignalHandler::Instance()->OnPosixSignalReceived(signum); 1791 PosixSignalHandler::Instance()->OnPosixSignalReceived(signum);
1377 } 1792 }
1378 1793
1379 bool PhysicalSocketServer::SetPosixSignalHandler(int signum, 1794 bool PhysicalSocketServer::SetPosixSignalHandler(int signum,
1380 void (*handler)(int)) { 1795 void (*handler)(int)) {
1381 // If handler is SIG_IGN or SIG_DFL then clear our user-level handler, 1796 // If handler is SIG_IGN or SIG_DFL then clear our user-level handler,
1382 // otherwise set one. 1797 // otherwise set one.
1383 if (handler == SIG_IGN || handler == SIG_DFL) { 1798 if (handler == SIG_IGN || handler == SIG_DFL) {
1384 if (!InstallSignal(signum, handler)) { 1799 if (!InstallSignal(signum, handler)) {
(...skipping 201 matching lines...) Expand 10 before | Expand all | Expand 10 after
1586 break; 2001 break;
1587 } 2002 }
1588 } 2003 }
1589 2004
1590 // Done 2005 // Done
1591 return true; 2006 return true;
1592 } 2007 }
1593 #endif // WEBRTC_WIN 2008 #endif // WEBRTC_WIN
1594 2009
1595 } // namespace rtc 2010 } // namespace rtc
OLDNEW
« webrtc/base/physicalsocketserver.h ('K') | « webrtc/base/physicalsocketserver.h ('k') | no next file » | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698