Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(365)

Side by Side Diff: webrtc/base/physicalsocketserver.cc

Issue 2880923002: Support epoll in PhysicalSocketServer. (Closed)
Patch Set: Renamed methods, added comment. Created 3 years, 7 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « webrtc/base/physicalsocketserver.h ('k') | no next file » | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 /* 1 /*
2 * Copyright 2004 The WebRTC Project Authors. All rights reserved. 2 * Copyright 2004 The WebRTC Project Authors. All rights reserved.
3 * 3 *
4 * Use of this source code is governed by a BSD-style license 4 * Use of this source code is governed by a BSD-style license
5 * that can be found in the LICENSE file in the root of the source 5 * that can be found in the LICENSE file in the root of the source
6 * tree. An additional intellectual property rights grant can be found 6 * tree. An additional intellectual property rights grant can be found
7 * in the file PATENTS. All contributing project authors may 7 * in the file PATENTS. All contributing project authors may
8 * be found in the AUTHORS file in the root of the source tree. 8 * be found in the AUTHORS file in the root of the source tree.
9 */ 9 */
10 #include "webrtc/base/physicalsocketserver.h" 10 #include "webrtc/base/physicalsocketserver.h"
11 11
12 #if defined(_MSC_VER) && _MSC_VER < 1300 12 #if defined(_MSC_VER) && _MSC_VER < 1300
13 #pragma warning(disable:4786) 13 #pragma warning(disable:4786)
14 #endif 14 #endif
15 15
16 #ifdef MEMORY_SANITIZER 16 #ifdef MEMORY_SANITIZER
17 #include <sanitizer/msan_interface.h> 17 #include <sanitizer/msan_interface.h>
18 #endif 18 #endif
19 19
20 #if defined(WEBRTC_POSIX) 20 #if defined(WEBRTC_POSIX)
21 #include <string.h> 21 #include <string.h>
22 #include <errno.h> 22 #include <errno.h>
23 #include <fcntl.h> 23 #include <fcntl.h>
24 #if defined(WEBRTC_USE_EPOLL)
25 // "poll" will be used to wait for the signal dispatcher.
26 #include <poll.h>
27 #endif
24 #include <sys/ioctl.h> 28 #include <sys/ioctl.h>
25 #include <sys/time.h> 29 #include <sys/time.h>
26 #include <sys/select.h> 30 #include <sys/select.h>
27 #include <unistd.h> 31 #include <unistd.h>
28 #include <signal.h> 32 #include <signal.h>
29 #endif 33 #endif
30 34
31 #if defined(WEBRTC_WIN) 35 #if defined(WEBRTC_WIN)
32 #define WIN32_LEAN_AND_MEAN 36 #define WIN32_LEAN_AND_MEAN
33 #include <windows.h> 37 #include <windows.h>
(...skipping 39 matching lines...) Expand 10 before | Expand all | Expand 10 after
73 77
74 int64_t GetSocketRecvTimestamp(int socket) { 78 int64_t GetSocketRecvTimestamp(int socket) {
75 return -1; 79 return -1;
76 } 80 }
77 #endif 81 #endif
78 82
79 #if defined(WEBRTC_WIN) 83 #if defined(WEBRTC_WIN)
80 typedef char* SockOptArg; 84 typedef char* SockOptArg;
81 #endif 85 #endif
82 86
87 #if defined(WEBRTC_USE_EPOLL)
88 // POLLRDHUP / EPOLLRDHUP are only defined starting with Linux 2.6.17.
89 #if !defined(POLLRDHUP)
90 #define POLLRDHUP 0x2000
91 #endif
92 #if !defined(EPOLLRDHUP)
93 #define EPOLLRDHUP 0x2000
94 #endif
95 #endif
96
83 namespace rtc { 97 namespace rtc {
84 98
85 std::unique_ptr<SocketServer> SocketServer::CreateDefault() { 99 std::unique_ptr<SocketServer> SocketServer::CreateDefault() {
86 #if defined(__native_client__) 100 #if defined(__native_client__)
87 return std::unique_ptr<SocketServer>(new rtc::NullSocketServer); 101 return std::unique_ptr<SocketServer>(new rtc::NullSocketServer);
88 #else 102 #else
89 return std::unique_ptr<SocketServer>(new rtc::PhysicalSocketServer); 103 return std::unique_ptr<SocketServer>(new rtc::PhysicalSocketServer);
90 #endif 104 #endif
91 } 105 }
92 106
(...skipping 665 matching lines...) Expand 10 before | Expand all | Expand 10 after
758 } 772 }
759 if (((ff & DE_CLOSE) != 0) && (id_ == cache_id)) { 773 if (((ff & DE_CLOSE) != 0) && (id_ == cache_id)) {
760 signal_close_ = true; 774 signal_close_ = true;
761 signal_err_ = err; 775 signal_err_ = err;
762 } 776 }
763 } 777 }
764 778
765 #elif defined(WEBRTC_POSIX) 779 #elif defined(WEBRTC_POSIX)
766 780
767 void SocketDispatcher::OnEvent(uint32_t ff, int err) { 781 void SocketDispatcher::OnEvent(uint32_t ff, int err) {
782 #if defined(WEBRTC_USE_EPOLL)
783 // Remember currently enabled events so we can combine multiple changes
784 // into one update call later.
785 // The signal handlers might re-enable events disabled here, so we can't
786 // keep a list of events to disable at the end of the method. This list
787 // would not not be updated with the events enabled by the signal handlers.
Taylor Brandstetter 2017/05/19 20:52:43 nit: "not not"
joachim 2017/05/19 20:55:43 Whoops, thanks.
788 StartBatchedEventUpdates();
789 #endif
768 // Make sure we deliver connect/accept first. Otherwise, consumers may see 790 // Make sure we deliver connect/accept first. Otherwise, consumers may see
769 // something like a READ followed by a CONNECT, which would be odd. 791 // something like a READ followed by a CONNECT, which would be odd.
770 if ((ff & DE_CONNECT) != 0) { 792 if ((ff & DE_CONNECT) != 0) {
771 DisableEvents(DE_CONNECT); 793 DisableEvents(DE_CONNECT);
772 SignalConnectEvent(this); 794 SignalConnectEvent(this);
773 } 795 }
774 if ((ff & DE_ACCEPT) != 0) { 796 if ((ff & DE_ACCEPT) != 0) {
775 DisableEvents(DE_ACCEPT); 797 DisableEvents(DE_ACCEPT);
776 SignalReadEvent(this); 798 SignalReadEvent(this);
777 } 799 }
778 if ((ff & DE_READ) != 0) { 800 if ((ff & DE_READ) != 0) {
779 DisableEvents(DE_READ); 801 DisableEvents(DE_READ);
780 SignalReadEvent(this); 802 SignalReadEvent(this);
781 } 803 }
782 if ((ff & DE_WRITE) != 0) { 804 if ((ff & DE_WRITE) != 0) {
783 DisableEvents(DE_WRITE); 805 DisableEvents(DE_WRITE);
784 SignalWriteEvent(this); 806 SignalWriteEvent(this);
785 } 807 }
786 if ((ff & DE_CLOSE) != 0) { 808 if ((ff & DE_CLOSE) != 0) {
787 // The socket is now dead to us, so stop checking it. 809 // The socket is now dead to us, so stop checking it.
788 SetEnabledEvents(0); 810 SetEnabledEvents(0);
789 SignalCloseEvent(this, err); 811 SignalCloseEvent(this, err);
790 } 812 }
813 #if defined(WEBRTC_USE_EPOLL)
814 FinishBatchedEventUpdates();
815 #endif
791 } 816 }
792 817
793 #endif // WEBRTC_POSIX 818 #endif // WEBRTC_POSIX
794 819
820 #if defined(WEBRTC_USE_EPOLL)
821
822 static int GetEpollEvents(uint32_t ff) {
823 int events = 0;
824 if (ff & (DE_READ | DE_ACCEPT)) {
825 events |= EPOLLIN;
826 }
827 if (ff & (DE_WRITE | DE_CONNECT)) {
828 events |= EPOLLOUT;
829 }
830 return events;
831 }
832
833 void SocketDispatcher::StartBatchedEventUpdates() {
834 RTC_DCHECK_EQ(saved_enabled_events_, -1);
835 saved_enabled_events_ = enabled_events();
836 }
837
838 void SocketDispatcher::FinishBatchedEventUpdates() {
839 RTC_DCHECK_NE(saved_enabled_events_, -1);
840 uint8_t old_events = static_cast<uint8_t>(saved_enabled_events_);
841 saved_enabled_events_ = -1;
842 MaybeUpdateDispatcher(old_events);
843 }
844
845 void SocketDispatcher::MaybeUpdateDispatcher(uint8_t old_events) {
846 if (GetEpollEvents(enabled_events()) != GetEpollEvents(old_events) &&
847 saved_enabled_events_ == -1) {
848 ss_->Update(this);
849 }
850 }
851
852 void SocketDispatcher::SetEnabledEvents(uint8_t events) {
853 uint8_t old_events = enabled_events();
854 PhysicalSocket::SetEnabledEvents(events);
855 MaybeUpdateDispatcher(old_events);
856 }
857
858 void SocketDispatcher::EnableEvents(uint8_t events) {
859 uint8_t old_events = enabled_events();
860 PhysicalSocket::EnableEvents(events);
861 MaybeUpdateDispatcher(old_events);
862 }
863
864 void SocketDispatcher::DisableEvents(uint8_t events) {
865 uint8_t old_events = enabled_events();
866 PhysicalSocket::DisableEvents(events);
867 MaybeUpdateDispatcher(old_events);
868 }
869
870 #endif // WEBRTC_USE_EPOLL
871
795 int SocketDispatcher::Close() { 872 int SocketDispatcher::Close() {
796 if (s_ == INVALID_SOCKET) 873 if (s_ == INVALID_SOCKET)
797 return 0; 874 return 0;
798 875
799 #if defined(WEBRTC_WIN) 876 #if defined(WEBRTC_WIN)
800 id_ = 0; 877 id_ = 0;
801 signal_close_ = false; 878 signal_close_ = false;
802 #endif 879 #endif
803 ss_->Remove(this); 880 ss_->Remove(this);
804 return PhysicalSocket::Close(); 881 return PhysicalSocket::Close();
(...skipping 308 matching lines...) Expand 10 before | Expand all | Expand 10 after
1113 if (pf_) 1190 if (pf_)
1114 *pf_ = false; 1191 *pf_ = false;
1115 } 1192 }
1116 1193
1117 private: 1194 private:
1118 bool *pf_; 1195 bool *pf_;
1119 }; 1196 };
1120 1197
1121 PhysicalSocketServer::PhysicalSocketServer() 1198 PhysicalSocketServer::PhysicalSocketServer()
1122 : fWait_(false) { 1199 : fWait_(false) {
1200 #if defined(WEBRTC_USE_EPOLL)
1201 // Since Linux 2.6.8, the size argument is ignored, but must be greater than
1202 // zero. Before that the size served as hint to the kernel for the amount of
1203 // space to initially allocate in internal data structures.
1204 epoll_fd_ = epoll_create(FD_SETSIZE);
1205 if (epoll_fd_ == -1) {
1206 // Not an error, will fall back to "select" below.
1207 LOG_E(LS_WARNING, EN, errno) << "epoll_create";
1208 epoll_fd_ = INVALID_SOCKET;
1209 }
1210 #endif
1123 signal_wakeup_ = new Signaler(this, &fWait_); 1211 signal_wakeup_ = new Signaler(this, &fWait_);
1124 #if defined(WEBRTC_WIN) 1212 #if defined(WEBRTC_WIN)
1125 socket_ev_ = WSACreateEvent(); 1213 socket_ev_ = WSACreateEvent();
1126 #endif 1214 #endif
1127 } 1215 }
1128 1216
1129 PhysicalSocketServer::~PhysicalSocketServer() { 1217 PhysicalSocketServer::~PhysicalSocketServer() {
1130 #if defined(WEBRTC_WIN) 1218 #if defined(WEBRTC_WIN)
1131 WSACloseEvent(socket_ev_); 1219 WSACloseEvent(socket_ev_);
1132 #endif 1220 #endif
1133 #if defined(WEBRTC_POSIX) 1221 #if defined(WEBRTC_POSIX)
1134 signal_dispatcher_.reset(); 1222 signal_dispatcher_.reset();
1135 #endif 1223 #endif
1136 delete signal_wakeup_; 1224 delete signal_wakeup_;
1225 #if defined(WEBRTC_USE_EPOLL)
1226 if (epoll_fd_ != INVALID_SOCKET) {
1227 close(epoll_fd_);
1228 }
1229 #endif
1137 RTC_DCHECK(dispatchers_.empty()); 1230 RTC_DCHECK(dispatchers_.empty());
1138 } 1231 }
1139 1232
1140 void PhysicalSocketServer::WakeUp() { 1233 void PhysicalSocketServer::WakeUp() {
1141 signal_wakeup_->Signal(); 1234 signal_wakeup_->Signal();
1142 } 1235 }
1143 1236
1144 Socket* PhysicalSocketServer::CreateSocket(int type) { 1237 Socket* PhysicalSocketServer::CreateSocket(int type) {
1145 return CreateSocket(AF_INET, type); 1238 return CreateSocket(AF_INET, type);
1146 } 1239 }
(...skipping 27 matching lines...) Expand all
1174 if (dispatcher->Initialize()) { 1267 if (dispatcher->Initialize()) {
1175 return dispatcher; 1268 return dispatcher;
1176 } else { 1269 } else {
1177 delete dispatcher; 1270 delete dispatcher;
1178 return nullptr; 1271 return nullptr;
1179 } 1272 }
1180 } 1273 }
1181 1274
1182 void PhysicalSocketServer::Add(Dispatcher *pdispatcher) { 1275 void PhysicalSocketServer::Add(Dispatcher *pdispatcher) {
1183 CritScope cs(&crit_); 1276 CritScope cs(&crit_);
1277 #if defined(WEBRTC_WIN)
1184 // Prevent duplicates. This can cause dead dispatchers to stick around. 1278 // Prevent duplicates. This can cause dead dispatchers to stick around.
1185 DispatcherList::iterator pos = std::find(dispatchers_.begin(), 1279 DispatcherList::iterator pos = std::find(dispatchers_.begin(),
1186 dispatchers_.end(), 1280 dispatchers_.end(),
1187 pdispatcher); 1281 pdispatcher);
1188 if (pos != dispatchers_.end()) 1282 if (pos != dispatchers_.end())
1189 return; 1283 return;
1190 dispatchers_.push_back(pdispatcher); 1284 dispatchers_.push_back(pdispatcher);
1285 #else
1286 dispatchers_.insert(pdispatcher);
1287 #endif // WEBRTC_WIN
1288 #if defined(WEBRTC_USE_EPOLL)
1289 if (epoll_fd_ != INVALID_SOCKET) {
1290 AddEpoll(pdispatcher);
1291 }
1292 #endif // WEBRTC_USE_EPOLL
1191 } 1293 }
1192 1294
1193 void PhysicalSocketServer::Remove(Dispatcher *pdispatcher) { 1295 void PhysicalSocketServer::Remove(Dispatcher *pdispatcher) {
1194 CritScope cs(&crit_); 1296 CritScope cs(&crit_);
1297 #if defined(WEBRTC_WIN)
1195 DispatcherList::iterator pos = std::find(dispatchers_.begin(), 1298 DispatcherList::iterator pos = std::find(dispatchers_.begin(),
1196 dispatchers_.end(), 1299 dispatchers_.end(),
1197 pdispatcher); 1300 pdispatcher);
1198 // We silently ignore duplicate calls to Add, so we should silently ignore 1301 // We silently ignore duplicate calls to Add, so we should silently ignore
1199 // the (expected) symmetric calls to Remove. Note that this may still hide 1302 // the (expected) symmetric calls to Remove. Note that this may still hide
1200 // a real issue, so we at least log a warning about it. 1303 // a real issue, so we at least log a warning about it.
1201 if (pos == dispatchers_.end()) { 1304 if (pos == dispatchers_.end()) {
1202 LOG(LS_WARNING) << "PhysicalSocketServer asked to remove a unknown " 1305 LOG(LS_WARNING) << "PhysicalSocketServer asked to remove a unknown "
1203 << "dispatcher, potentially from a duplicate call to Add."; 1306 << "dispatcher, potentially from a duplicate call to Add.";
1204 return; 1307 return;
1205 } 1308 }
1206 size_t index = pos - dispatchers_.begin(); 1309 size_t index = pos - dispatchers_.begin();
1207 dispatchers_.erase(pos); 1310 dispatchers_.erase(pos);
1208 for (IteratorList::iterator it = iterators_.begin(); it != iterators_.end(); 1311 for (IteratorList::iterator it = iterators_.begin(); it != iterators_.end();
1209 ++it) { 1312 ++it) {
1210 if (index < **it) { 1313 if (index < **it) {
1211 --**it; 1314 --**it;
1212 } 1315 }
1213 } 1316 }
1317 #else
1318 if (!dispatchers_.erase(pdispatcher)) {
1319 LOG(LS_WARNING) << "PhysicalSocketServer asked to remove a unknown "
1320 << "dispatcher, potentially from a duplicate call to Add.";
1321 return;
1322 }
1323 #endif // WEBRTC_WIN
1324 #if defined(WEBRTC_USE_EPOLL)
1325 if (epoll_fd_ != INVALID_SOCKET) {
1326 RemoveEpoll(pdispatcher);
1327 }
1328 #endif // WEBRTC_USE_EPOLL
1329 }
1330
1331 void PhysicalSocketServer::Update(Dispatcher *pdispatcher) {
1332 #if defined(WEBRTC_USE_EPOLL)
1333 if (epoll_fd_ == INVALID_SOCKET) {
1334 return;
1335 }
1336
1337 CritScope cs(&crit_);
1338 if (dispatchers_.find(pdispatcher) == dispatchers_.end()) {
1339 return;
1340 }
1341
1342 UpdateEpoll(pdispatcher);
1343 #endif
1214 } 1344 }
1215 1345
1216 #if defined(WEBRTC_POSIX) 1346 #if defined(WEBRTC_POSIX)
1347
1217 bool PhysicalSocketServer::Wait(int cmsWait, bool process_io) { 1348 bool PhysicalSocketServer::Wait(int cmsWait, bool process_io) {
1349 #if defined(WEBRTC_USE_EPOLL)
1350 // We don't keep a dedicated "epoll" descriptor containing only the non-IO
1351 // (i.e. signaling) dispatcher, so "poll" will be used instead of the default
1352 // "select" to support sockets larger than FD_SETSIZE.
1353 if (!process_io) {
1354 return WaitPoll(cmsWait, signal_wakeup_);
1355 } else if (epoll_fd_ != INVALID_SOCKET) {
1356 return WaitEpoll(cmsWait);
1357 }
1358 #endif
1359 return WaitSelect(cmsWait, process_io);
1360 }
1361
1362 static void ProcessEvents(Dispatcher* dispatcher, bool readable, bool writable,
1363 bool check_error) {
1364 int errcode = 0;
1365 // TODO(pthatcher): Should we set errcode if getsockopt fails?
1366 if (check_error) {
1367 socklen_t len = sizeof(errcode);
1368 ::getsockopt(dispatcher->GetDescriptor(), SOL_SOCKET, SO_ERROR, &errcode,
1369 &len);
1370 }
1371
1372 uint32_t ff = 0;
1373
1374 // Check readable descriptors. If we're waiting on an accept, signal
1375 // that. Otherwise we're waiting for data, check to see if we're
1376 // readable or really closed.
1377 // TODO(pthatcher): Only peek at TCP descriptors.
1378 if (readable) {
1379 if (dispatcher->GetRequestedEvents() & DE_ACCEPT) {
1380 ff |= DE_ACCEPT;
1381 } else if (errcode || dispatcher->IsDescriptorClosed()) {
1382 ff |= DE_CLOSE;
1383 } else {
1384 ff |= DE_READ;
1385 }
1386 }
1387
1388 // Check writable descriptors. If we're waiting on a connect, detect
1389 // success versus failure by the reaped error code.
1390 if (writable) {
1391 if (dispatcher->GetRequestedEvents() & DE_CONNECT) {
1392 if (!errcode) {
1393 ff |= DE_CONNECT;
1394 } else {
1395 ff |= DE_CLOSE;
1396 }
1397 } else {
1398 ff |= DE_WRITE;
1399 }
1400 }
1401
1402 // Tell the descriptor about the event.
1403 if (ff != 0) {
1404 dispatcher->OnPreEvent(ff);
1405 dispatcher->OnEvent(ff, errcode);
1406 }
1407 }
1408
1409 bool PhysicalSocketServer::WaitSelect(int cmsWait, bool process_io) {
1218 // Calculate timing information 1410 // Calculate timing information
1219 1411
1220 struct timeval* ptvWait = nullptr; 1412 struct timeval* ptvWait = nullptr;
1221 struct timeval tvWait; 1413 struct timeval tvWait;
1222 struct timeval tvStop; 1414 struct timeval tvStop;
1223 if (cmsWait != kForever) { 1415 if (cmsWait != kForever) {
1224 // Calculate wait timeval 1416 // Calculate wait timeval
1225 tvWait.tv_sec = cmsWait / 1000; 1417 tvWait.tv_sec = cmsWait / 1000;
1226 tvWait.tv_usec = (cmsWait % 1000) * 1000; 1418 tvWait.tv_usec = (cmsWait % 1000) * 1000;
1227 ptvWait = &tvWait; 1419 ptvWait = &tvWait;
(...skipping 22 matching lines...) Expand all
1250 __msan_unpoison(&fdsRead, sizeof(fdsRead)); 1442 __msan_unpoison(&fdsRead, sizeof(fdsRead));
1251 __msan_unpoison(&fdsWrite, sizeof(fdsWrite)); 1443 __msan_unpoison(&fdsWrite, sizeof(fdsWrite));
1252 #endif 1444 #endif
1253 1445
1254 fWait_ = true; 1446 fWait_ = true;
1255 1447
1256 while (fWait_) { 1448 while (fWait_) {
1257 int fdmax = -1; 1449 int fdmax = -1;
1258 { 1450 {
1259 CritScope cr(&crit_); 1451 CritScope cr(&crit_);
1260 for (size_t i = 0; i < dispatchers_.size(); ++i) { 1452 for (Dispatcher* pdispatcher : dispatchers_) {
1261 // Query dispatchers for read and write wait state 1453 // Query dispatchers for read and write wait state
1262 Dispatcher *pdispatcher = dispatchers_[i];
1263 RTC_DCHECK(pdispatcher); 1454 RTC_DCHECK(pdispatcher);
1264 if (!process_io && (pdispatcher != signal_wakeup_)) 1455 if (!process_io && (pdispatcher != signal_wakeup_))
1265 continue; 1456 continue;
1266 int fd = pdispatcher->GetDescriptor(); 1457 int fd = pdispatcher->GetDescriptor();
1458 // "select"ing a file descriptor that is equal to or larger than
1459 // FD_SETSIZE will result in undefined behavior.
1460 RTC_DCHECK_LT(fd, FD_SETSIZE);
1267 if (fd > fdmax) 1461 if (fd > fdmax)
1268 fdmax = fd; 1462 fdmax = fd;
1269 1463
1270 uint32_t ff = pdispatcher->GetRequestedEvents(); 1464 uint32_t ff = pdispatcher->GetRequestedEvents();
1271 if (ff & (DE_READ | DE_ACCEPT)) 1465 if (ff & (DE_READ | DE_ACCEPT))
1272 FD_SET(fd, &fdsRead); 1466 FD_SET(fd, &fdsRead);
1273 if (ff & (DE_WRITE | DE_CONNECT)) 1467 if (ff & (DE_WRITE | DE_CONNECT))
1274 FD_SET(fd, &fdsWrite); 1468 FD_SET(fd, &fdsWrite);
1275 } 1469 }
1276 } 1470 }
(...skipping 13 matching lines...) Expand all
1290 // Else ignore the error and keep going. If this EINTR was for one of the 1484 // Else ignore the error and keep going. If this EINTR was for one of the
1291 // signals managed by this PhysicalSocketServer, the 1485 // signals managed by this PhysicalSocketServer, the
1292 // PosixSignalDeliveryDispatcher will be in the signaled state in the next 1486 // PosixSignalDeliveryDispatcher will be in the signaled state in the next
1293 // iteration. 1487 // iteration.
1294 } else if (n == 0) { 1488 } else if (n == 0) {
1295 // If timeout, return success 1489 // If timeout, return success
1296 return true; 1490 return true;
1297 } else { 1491 } else {
1298 // We have signaled descriptors 1492 // We have signaled descriptors
1299 CritScope cr(&crit_); 1493 CritScope cr(&crit_);
1300 for (size_t i = 0; i < dispatchers_.size(); ++i) { 1494 for (Dispatcher *pdispatcher : dispatchers_) {
1301 Dispatcher *pdispatcher = dispatchers_[i];
1302 int fd = pdispatcher->GetDescriptor(); 1495 int fd = pdispatcher->GetDescriptor();
1303 uint32_t ff = 0;
1304 int errcode = 0;
1305 1496
1306 // Reap any error code, which can be signaled through reads or writes. 1497 bool readable = FD_ISSET(fd, &fdsRead);
1307 // TODO(pthatcher): Should we set errcode if getsockopt fails? 1498 if (readable) {
1308 if (FD_ISSET(fd, &fdsRead) || FD_ISSET(fd, &fdsWrite)) { 1499 FD_CLR(fd, &fdsRead);
1309 socklen_t len = sizeof(errcode);
1310 ::getsockopt(fd, SOL_SOCKET, SO_ERROR, &errcode, &len);
1311 } 1500 }
1312 1501
1313 // Check readable descriptors. If we're waiting on an accept, signal 1502 bool writable = FD_ISSET(fd, &fdsWrite);
1314 // that. Otherwise we're waiting for data, check to see if we're 1503 if (writable) {
1315 // readable or really closed. 1504 FD_CLR(fd, &fdsWrite);
1316 // TODO(pthatcher): Only peek at TCP descriptors.
1317 if (FD_ISSET(fd, &fdsRead)) {
1318 FD_CLR(fd, &fdsRead);
1319 if (pdispatcher->GetRequestedEvents() & DE_ACCEPT) {
1320 ff |= DE_ACCEPT;
1321 } else if (errcode || pdispatcher->IsDescriptorClosed()) {
1322 ff |= DE_CLOSE;
1323 } else {
1324 ff |= DE_READ;
1325 }
1326 } 1505 }
1327 1506
1328 // Check writable descriptors. If we're waiting on a connect, detect 1507 // The error code can be signaled through reads or writes.
1329 // success versus failure by the reaped error code. 1508 ProcessEvents(pdispatcher, readable, writable, readable || writable);
1330 if (FD_ISSET(fd, &fdsWrite)) {
1331 FD_CLR(fd, &fdsWrite);
1332 if (pdispatcher->GetRequestedEvents() & DE_CONNECT) {
1333 if (!errcode) {
1334 ff |= DE_CONNECT;
1335 } else {
1336 ff |= DE_CLOSE;
1337 }
1338 } else {
1339 ff |= DE_WRITE;
1340 }
1341 }
1342
1343 // Tell the descriptor about the event.
1344 if (ff != 0) {
1345 pdispatcher->OnPreEvent(ff);
1346 pdispatcher->OnEvent(ff, errcode);
1347 }
1348 } 1509 }
1349 } 1510 }
1350 1511
1351 // Recalc the time remaining to wait. Doing it here means it doesn't get 1512 // Recalc the time remaining to wait. Doing it here means it doesn't get
1352 // calced twice the first time through the loop 1513 // calced twice the first time through the loop
1353 if (ptvWait) { 1514 if (ptvWait) {
1354 ptvWait->tv_sec = 0; 1515 ptvWait->tv_sec = 0;
1355 ptvWait->tv_usec = 0; 1516 ptvWait->tv_usec = 0;
1356 struct timeval tvT; 1517 struct timeval tvT;
1357 gettimeofday(&tvT, nullptr); 1518 gettimeofday(&tvT, nullptr);
1358 if ((tvStop.tv_sec > tvT.tv_sec) 1519 if ((tvStop.tv_sec > tvT.tv_sec)
1359 || ((tvStop.tv_sec == tvT.tv_sec) 1520 || ((tvStop.tv_sec == tvT.tv_sec)
1360 && (tvStop.tv_usec > tvT.tv_usec))) { 1521 && (tvStop.tv_usec > tvT.tv_usec))) {
1361 ptvWait->tv_sec = tvStop.tv_sec - tvT.tv_sec; 1522 ptvWait->tv_sec = tvStop.tv_sec - tvT.tv_sec;
1362 ptvWait->tv_usec = tvStop.tv_usec - tvT.tv_usec; 1523 ptvWait->tv_usec = tvStop.tv_usec - tvT.tv_usec;
1363 if (ptvWait->tv_usec < 0) { 1524 if (ptvWait->tv_usec < 0) {
1364 RTC_DCHECK(ptvWait->tv_sec > 0); 1525 RTC_DCHECK(ptvWait->tv_sec > 0);
1365 ptvWait->tv_usec += 1000000; 1526 ptvWait->tv_usec += 1000000;
1366 ptvWait->tv_sec -= 1; 1527 ptvWait->tv_sec -= 1;
1367 } 1528 }
1368 } 1529 }
1369 } 1530 }
1370 } 1531 }
1371 1532
1372 return true; 1533 return true;
1373 } 1534 }
1374 1535
1536 #if defined(WEBRTC_USE_EPOLL)
1537
1538 // Initial number of events to process with one call to "epoll_wait".
1539 static const size_t kInitialEpollEvents = 128;
1540
1541 // Maximum number of events to process with one call to "epoll_wait".
1542 static const size_t kMaxEpollEvents = 8192;
1543
1544 void PhysicalSocketServer::AddEpoll(Dispatcher* pdispatcher) {
1545 RTC_DCHECK(epoll_fd_ != INVALID_SOCKET);
1546 int fd = pdispatcher->GetDescriptor();
1547 RTC_DCHECK(fd != INVALID_SOCKET);
1548 if (fd == INVALID_SOCKET) {
1549 return;
1550 }
1551
1552 struct epoll_event event = {0};
1553 event.events = GetEpollEvents(pdispatcher->GetRequestedEvents());
1554 event.data.ptr = pdispatcher;
1555 int err = epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, fd, &event);
1556 RTC_DCHECK_EQ(err, 0);
1557 if (err == -1) {
1558 LOG_E(LS_ERROR, EN, errno) << "epoll_ctl EPOLL_CTL_ADD";
1559 }
1560 }
1561
1562 void PhysicalSocketServer::RemoveEpoll(Dispatcher* pdispatcher) {
1563 RTC_DCHECK(epoll_fd_ != INVALID_SOCKET);
1564 int fd = pdispatcher->GetDescriptor();
1565 RTC_DCHECK(fd != INVALID_SOCKET);
1566 if (fd == INVALID_SOCKET) {
1567 return;
1568 }
1569
1570 struct epoll_event event = {0};
1571 int err = epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, fd, &event);
1572 RTC_DCHECK(err == 0 || errno == ENOENT);
1573 if (err == -1) {
1574 if (errno == ENOENT) {
1575 // Socket has already been closed.
1576 LOG_E(LS_VERBOSE, EN, errno) << "epoll_ctl EPOLL_CTL_DEL";
1577 } else {
1578 LOG_E(LS_ERROR, EN, errno) << "epoll_ctl EPOLL_CTL_DEL";
1579 }
1580 }
1581 }
1582
1583 void PhysicalSocketServer::UpdateEpoll(Dispatcher* pdispatcher) {
1584 RTC_DCHECK(epoll_fd_ != INVALID_SOCKET);
1585 int fd = pdispatcher->GetDescriptor();
1586 RTC_DCHECK(fd != INVALID_SOCKET);
1587 if (fd == INVALID_SOCKET) {
1588 return;
1589 }
1590
1591 struct epoll_event event = {0};
1592 event.events = GetEpollEvents(pdispatcher->GetRequestedEvents());
1593 event.data.ptr = pdispatcher;
1594 int err = epoll_ctl(epoll_fd_, EPOLL_CTL_MOD, fd, &event);
1595 RTC_DCHECK_EQ(err, 0);
1596 if (err == -1) {
1597 LOG_E(LS_ERROR, EN, errno) << "epoll_ctl EPOLL_CTL_MOD";
1598 }
1599 }
1600
1601 bool PhysicalSocketServer::WaitEpoll(int cmsWait) {
1602 RTC_DCHECK(epoll_fd_ != INVALID_SOCKET);
1603 int64_t tvWait = -1;
1604 int64_t tvStop = -1;
1605 if (cmsWait != kForever) {
1606 tvWait = cmsWait;
1607 tvStop = TimeAfter(cmsWait);
1608 }
1609
1610 if (epoll_events_.empty()) {
1611 // The initial space to receive events is created only if epoll is used.
1612 epoll_events_.resize(kInitialEpollEvents);
1613 }
1614
1615 fWait_ = true;
1616
1617 while (fWait_) {
1618 // Wait then call handlers as appropriate
1619 // < 0 means error
1620 // 0 means timeout
1621 // > 0 means count of descriptors ready
1622 int n = epoll_wait(epoll_fd_, &epoll_events_[0],
1623 static_cast<int>(epoll_events_.size()), static_cast<int>(tvWait));
1624 if (n < 0) {
1625 if (errno != EINTR) {
1626 LOG_E(LS_ERROR, EN, errno) << "epoll";
1627 return false;
1628 }
1629 // Else ignore the error and keep going. If this EINTR was for one of the
1630 // signals managed by this PhysicalSocketServer, the
1631 // PosixSignalDeliveryDispatcher will be in the signaled state in the next
1632 // iteration.
1633 } else if (n == 0) {
1634 // If timeout, return success
1635 return true;
1636 } else {
1637 // We have signaled descriptors
1638 CritScope cr(&crit_);
1639 for (int i = 0; i < n; ++i) {
1640 const epoll_event& event = epoll_events_[i];
1641 Dispatcher* pdispatcher = static_cast<Dispatcher*>(event.data.ptr);
1642 if (dispatchers_.find(pdispatcher) == dispatchers_.end()) {
1643 // The dispatcher for this socket no longer exists.
1644 continue;
1645 }
1646
1647 bool readable = (event.events & (EPOLLIN | EPOLLPRI));
1648 bool writable = (event.events & EPOLLOUT);
1649 bool check_error = (event.events & (EPOLLRDHUP | EPOLLERR | EPOLLHUP));
1650
1651 ProcessEvents(pdispatcher, readable, writable, check_error);
1652 }
1653 }
1654
1655 if (static_cast<size_t>(n) == epoll_events_.size() &&
1656 epoll_events_.size() < kMaxEpollEvents) {
1657 // We used the complete space to receive events, increase size for future
1658 // iterations.
1659 epoll_events_.resize(std::max(epoll_events_.size() * 2, kMaxEpollEvents));
1660 }
1661
1662 if (cmsWait != kForever) {
1663 tvWait = TimeDiff(tvStop, TimeMillis());
1664 if (tvWait < 0) {
1665 // Return success on timeout.
1666 return true;
1667 }
1668 }
1669 }
1670
1671 return true;
1672 }
1673
1674 bool PhysicalSocketServer::WaitPoll(int cmsWait, Dispatcher* dispatcher) {
1675 RTC_DCHECK(dispatcher);
1676 int64_t tvWait = -1;
1677 int64_t tvStop = -1;
1678 if (cmsWait != kForever) {
1679 tvWait = cmsWait;
1680 tvStop = TimeAfter(cmsWait);
1681 }
1682
1683 fWait_ = true;
1684
1685 struct pollfd fds = {0};
1686 int fd = dispatcher->GetDescriptor();
1687 fds.fd = fd;
1688
1689 while (fWait_) {
1690 uint32_t ff = dispatcher->GetRequestedEvents();
1691 fds.events = 0;
1692 if (ff & (DE_READ | DE_ACCEPT)) {
1693 fds.events |= POLLIN;
1694 }
1695 if (ff & (DE_WRITE | DE_CONNECT)) {
1696 fds.events |= POLLOUT;
1697 }
1698 fds.revents = 0;
1699
1700 // Wait then call handlers as appropriate
1701 // < 0 means error
1702 // 0 means timeout
1703 // > 0 means count of descriptors ready
1704 int n = poll(&fds, 1, static_cast<int>(tvWait));
1705 if (n < 0) {
1706 if (errno != EINTR) {
1707 LOG_E(LS_ERROR, EN, errno) << "poll";
1708 return false;
1709 }
1710 // Else ignore the error and keep going. If this EINTR was for one of the
1711 // signals managed by this PhysicalSocketServer, the
1712 // PosixSignalDeliveryDispatcher will be in the signaled state in the next
1713 // iteration.
1714 } else if (n == 0) {
1715 // If timeout, return success
1716 return true;
1717 } else {
1718 // We have signaled descriptors (should only be the passed dispatcher).
1719 RTC_DCHECK_EQ(n, 1);
1720 RTC_DCHECK_EQ(fds.fd, fd);
1721
1722 bool readable = (fds.revents & (POLLIN | POLLPRI));
1723 bool writable = (fds.revents & POLLOUT);
1724 bool check_error = (fds.revents & (POLLRDHUP | POLLERR | POLLHUP));
1725
1726 ProcessEvents(dispatcher, readable, writable, check_error);
1727 }
1728
1729 if (cmsWait != kForever) {
1730 tvWait = TimeDiff(tvStop, TimeMillis());
1731 if (tvWait < 0) {
1732 // Return success on timeout.
1733 return true;
1734 }
1735 }
1736 }
1737
1738 return true;
1739 }
1740
1741 #endif // WEBRTC_USE_EPOLL
1742
1375 static void GlobalSignalHandler(int signum) { 1743 static void GlobalSignalHandler(int signum) {
1376 PosixSignalHandler::Instance()->OnPosixSignalReceived(signum); 1744 PosixSignalHandler::Instance()->OnPosixSignalReceived(signum);
1377 } 1745 }
1378 1746
1379 bool PhysicalSocketServer::SetPosixSignalHandler(int signum, 1747 bool PhysicalSocketServer::SetPosixSignalHandler(int signum,
1380 void (*handler)(int)) { 1748 void (*handler)(int)) {
1381 // If handler is SIG_IGN or SIG_DFL then clear our user-level handler, 1749 // If handler is SIG_IGN or SIG_DFL then clear our user-level handler,
1382 // otherwise set one. 1750 // otherwise set one.
1383 if (handler == SIG_IGN || handler == SIG_DFL) { 1751 if (handler == SIG_IGN || handler == SIG_DFL) {
1384 if (!InstallSignal(signum, handler)) { 1752 if (!InstallSignal(signum, handler)) {
(...skipping 201 matching lines...) Expand 10 before | Expand all | Expand 10 after
1586 break; 1954 break;
1587 } 1955 }
1588 } 1956 }
1589 1957
1590 // Done 1958 // Done
1591 return true; 1959 return true;
1592 } 1960 }
1593 #endif // WEBRTC_WIN 1961 #endif // WEBRTC_WIN
1594 1962
1595 } // namespace rtc 1963 } // namespace rtc
OLDNEW
« no previous file with comments | « webrtc/base/physicalsocketserver.h ('k') | no next file » | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698