Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(81)

Side by Side Diff: webrtc/base/physicalsocketserver.cc

Issue 2880923002: Support epoll in PhysicalSocketServer. (Closed)
Patch Set: More feedback. Created 3 years, 7 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « webrtc/base/physicalsocketserver.h ('k') | no next file » | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 /* 1 /*
2 * Copyright 2004 The WebRTC Project Authors. All rights reserved. 2 * Copyright 2004 The WebRTC Project Authors. All rights reserved.
3 * 3 *
4 * Use of this source code is governed by a BSD-style license 4 * Use of this source code is governed by a BSD-style license
5 * that can be found in the LICENSE file in the root of the source 5 * that can be found in the LICENSE file in the root of the source
6 * tree. An additional intellectual property rights grant can be found 6 * tree. An additional intellectual property rights grant can be found
7 * in the file PATENTS. All contributing project authors may 7 * in the file PATENTS. All contributing project authors may
8 * be found in the AUTHORS file in the root of the source tree. 8 * be found in the AUTHORS file in the root of the source tree.
9 */ 9 */
10 #include "webrtc/base/physicalsocketserver.h" 10 #include "webrtc/base/physicalsocketserver.h"
11 11
12 #if defined(_MSC_VER) && _MSC_VER < 1300 12 #if defined(_MSC_VER) && _MSC_VER < 1300
13 #pragma warning(disable:4786) 13 #pragma warning(disable:4786)
14 #endif 14 #endif
15 15
16 #ifdef MEMORY_SANITIZER 16 #ifdef MEMORY_SANITIZER
17 #include <sanitizer/msan_interface.h> 17 #include <sanitizer/msan_interface.h>
18 #endif 18 #endif
19 19
20 #if defined(WEBRTC_POSIX) 20 #if defined(WEBRTC_POSIX)
21 #include <string.h> 21 #include <string.h>
22 #include <errno.h> 22 #include <errno.h>
23 #include <fcntl.h> 23 #include <fcntl.h>
24 #if defined(WEBRTC_USE_EPOLL)
25 // "poll" will be used to wait for the signal dispatcher.
26 #include <poll.h>
27 #endif
24 #include <sys/ioctl.h> 28 #include <sys/ioctl.h>
25 #include <sys/time.h> 29 #include <sys/time.h>
26 #include <sys/select.h> 30 #include <sys/select.h>
27 #include <unistd.h> 31 #include <unistd.h>
28 #include <signal.h> 32 #include <signal.h>
29 #endif 33 #endif
30 34
31 #if defined(WEBRTC_WIN) 35 #if defined(WEBRTC_WIN)
32 #define WIN32_LEAN_AND_MEAN 36 #define WIN32_LEAN_AND_MEAN
33 #include <windows.h> 37 #include <windows.h>
(...skipping 39 matching lines...) Expand 10 before | Expand all | Expand 10 after
73 77
74 int64_t GetSocketRecvTimestamp(int socket) { 78 int64_t GetSocketRecvTimestamp(int socket) {
75 return -1; 79 return -1;
76 } 80 }
77 #endif 81 #endif
78 82
79 #if defined(WEBRTC_WIN) 83 #if defined(WEBRTC_WIN)
80 typedef char* SockOptArg; 84 typedef char* SockOptArg;
81 #endif 85 #endif
82 86
87 #if defined(WEBRTC_USE_EPOLL)
88 // POLLRDHUP / EPOLLRDHUP are only defined starting with Linux 2.6.17.
89 #if !defined(POLLRDHUP)
90 #define POLLRDHUP 0x2000
91 #endif
92 #if !defined(EPOLLRDHUP)
93 #define EPOLLRDHUP 0x2000
94 #endif
95 #endif
96
83 namespace rtc { 97 namespace rtc {
84 98
85 std::unique_ptr<SocketServer> SocketServer::CreateDefault() { 99 std::unique_ptr<SocketServer> SocketServer::CreateDefault() {
86 #if defined(__native_client__) 100 #if defined(__native_client__)
87 return std::unique_ptr<SocketServer>(new rtc::NullSocketServer); 101 return std::unique_ptr<SocketServer>(new rtc::NullSocketServer);
88 #else 102 #else
89 return std::unique_ptr<SocketServer>(new rtc::PhysicalSocketServer); 103 return std::unique_ptr<SocketServer>(new rtc::PhysicalSocketServer);
90 #endif 104 #endif
91 } 105 }
92 106
(...skipping 665 matching lines...) Expand 10 before | Expand all | Expand 10 after
758 } 772 }
759 if (((ff & DE_CLOSE) != 0) && (id_ == cache_id)) { 773 if (((ff & DE_CLOSE) != 0) && (id_ == cache_id)) {
760 signal_close_ = true; 774 signal_close_ = true;
761 signal_err_ = err; 775 signal_err_ = err;
762 } 776 }
763 } 777 }
764 778
765 #elif defined(WEBRTC_POSIX) 779 #elif defined(WEBRTC_POSIX)
766 780
767 void SocketDispatcher::OnEvent(uint32_t ff, int err) { 781 void SocketDispatcher::OnEvent(uint32_t ff, int err) {
782 #if defined(WEBRTC_USE_EPOLL)
783 // Remember currently enabled events so we can combine multiple changes
784 // into one update call later.
785 SaveEnabledEvents();
786 #endif
768 // Make sure we deliver connect/accept first. Otherwise, consumers may see 787 // Make sure we deliver connect/accept first. Otherwise, consumers may see
769 // something like a READ followed by a CONNECT, which would be odd. 788 // something like a READ followed by a CONNECT, which would be odd.
770 if ((ff & DE_CONNECT) != 0) { 789 if ((ff & DE_CONNECT) != 0) {
771 DisableEvents(DE_CONNECT); 790 DisableEvents(DE_CONNECT);
772 SignalConnectEvent(this); 791 SignalConnectEvent(this);
773 } 792 }
774 if ((ff & DE_ACCEPT) != 0) { 793 if ((ff & DE_ACCEPT) != 0) {
775 DisableEvents(DE_ACCEPT); 794 DisableEvents(DE_ACCEPT);
776 SignalReadEvent(this); 795 SignalReadEvent(this);
777 } 796 }
778 if ((ff & DE_READ) != 0) { 797 if ((ff & DE_READ) != 0) {
779 DisableEvents(DE_READ); 798 DisableEvents(DE_READ);
780 SignalReadEvent(this); 799 SignalReadEvent(this);
781 } 800 }
782 if ((ff & DE_WRITE) != 0) { 801 if ((ff & DE_WRITE) != 0) {
783 DisableEvents(DE_WRITE); 802 DisableEvents(DE_WRITE);
784 SignalWriteEvent(this); 803 SignalWriteEvent(this);
785 } 804 }
786 if ((ff & DE_CLOSE) != 0) { 805 if ((ff & DE_CLOSE) != 0) {
787 // The socket is now dead to us, so stop checking it. 806 // The socket is now dead to us, so stop checking it.
788 SetEnabledEvents(0); 807 SetEnabledEvents(0);
789 SignalCloseEvent(this, err); 808 SignalCloseEvent(this, err);
790 } 809 }
810 #if defined(WEBRTC_USE_EPOLL)
811 RestoreEnabledEvents();
812 #endif
791 } 813 }
792 814
793 #endif // WEBRTC_POSIX 815 #endif // WEBRTC_POSIX
794 816
817 #if defined(WEBRTC_USE_EPOLL)
818
819 static int GetEpollEvents(uint32_t ff) {
820 int events = 0;
821 if (ff & (DE_READ | DE_ACCEPT)) {
822 events |= EPOLLIN;
823 }
824 if (ff & (DE_WRITE | DE_CONNECT)) {
825 events |= EPOLLOUT;
826 }
827 return events;
828 }
829
830 void SocketDispatcher::SaveEnabledEvents() {
831 RTC_DCHECK_EQ(saved_enabled_events_, -1);
832 saved_enabled_events_ = enabled_events();
833 }
834
835 void SocketDispatcher::RestoreEnabledEvents() {
Taylor Brandstetter 2017/05/18 21:59:57 I still think the names "Save/Restore" give the wr
836 RTC_DCHECK_NE(saved_enabled_events_, -1);
837 uint8_t old_events = static_cast<uint8_t>(saved_enabled_events_);
838 saved_enabled_events_ = -1;
839 MaybeUpdateDispatcher(old_events);
840 }
841
842 void SocketDispatcher::MaybeUpdateDispatcher(uint8_t old_events) {
843 if (GetEpollEvents(enabled_events()) != GetEpollEvents(old_events) &&
844 saved_enabled_events_ == -1) {
845 ss_->Update(this);
846 }
847 }
848
849 void SocketDispatcher::SetEnabledEvents(uint8_t events) {
850 uint8_t old_events = enabled_events();
851 PhysicalSocket::SetEnabledEvents(events);
852 MaybeUpdateDispatcher(old_events);
853 }
854
855 void SocketDispatcher::EnableEvents(uint8_t events) {
856 uint8_t old_events = enabled_events();
857 PhysicalSocket::EnableEvents(events);
858 MaybeUpdateDispatcher(old_events);
859 }
860
861 void SocketDispatcher::DisableEvents(uint8_t events) {
862 uint8_t old_events = enabled_events();
863 PhysicalSocket::DisableEvents(events);
864 MaybeUpdateDispatcher(old_events);
865 }
866
867 #endif // WEBRTC_USE_EPOLL
868
795 int SocketDispatcher::Close() { 869 int SocketDispatcher::Close() {
796 if (s_ == INVALID_SOCKET) 870 if (s_ == INVALID_SOCKET)
797 return 0; 871 return 0;
798 872
799 #if defined(WEBRTC_WIN) 873 #if defined(WEBRTC_WIN)
800 id_ = 0; 874 id_ = 0;
801 signal_close_ = false; 875 signal_close_ = false;
802 #endif 876 #endif
803 ss_->Remove(this); 877 ss_->Remove(this);
804 return PhysicalSocket::Close(); 878 return PhysicalSocket::Close();
(...skipping 308 matching lines...) Expand 10 before | Expand all | Expand 10 after
1113 if (pf_) 1187 if (pf_)
1114 *pf_ = false; 1188 *pf_ = false;
1115 } 1189 }
1116 1190
1117 private: 1191 private:
1118 bool *pf_; 1192 bool *pf_;
1119 }; 1193 };
1120 1194
1121 PhysicalSocketServer::PhysicalSocketServer() 1195 PhysicalSocketServer::PhysicalSocketServer()
1122 : fWait_(false) { 1196 : fWait_(false) {
1197 #if defined(WEBRTC_USE_EPOLL)
1198 // Since Linux 2.6.8, the size argument is ignored, but must be greater than
1199 // zero. Before that the size served as hint to the kernel for the amount of
1200 // space to initially allocate in internal data structures.
1201 epoll_fd_ = epoll_create(FD_SETSIZE);
1202 if (epoll_fd_ == -1) {
1203 // Not an error, will fall back to "select" below.
1204 LOG_E(LS_WARNING, EN, errno) << "epoll_create";
1205 epoll_fd_ = INVALID_SOCKET;
1206 }
1207 #endif
1123 signal_wakeup_ = new Signaler(this, &fWait_); 1208 signal_wakeup_ = new Signaler(this, &fWait_);
1124 #if defined(WEBRTC_WIN) 1209 #if defined(WEBRTC_WIN)
1125 socket_ev_ = WSACreateEvent(); 1210 socket_ev_ = WSACreateEvent();
1126 #endif 1211 #endif
1127 } 1212 }
1128 1213
1129 PhysicalSocketServer::~PhysicalSocketServer() { 1214 PhysicalSocketServer::~PhysicalSocketServer() {
1130 #if defined(WEBRTC_WIN) 1215 #if defined(WEBRTC_WIN)
1131 WSACloseEvent(socket_ev_); 1216 WSACloseEvent(socket_ev_);
1132 #endif 1217 #endif
1133 #if defined(WEBRTC_POSIX) 1218 #if defined(WEBRTC_POSIX)
1134 signal_dispatcher_.reset(); 1219 signal_dispatcher_.reset();
1135 #endif 1220 #endif
1136 delete signal_wakeup_; 1221 delete signal_wakeup_;
1222 #if defined(WEBRTC_USE_EPOLL)
1223 if (epoll_fd_ != INVALID_SOCKET) {
1224 close(epoll_fd_);
1225 }
1226 #endif
1137 RTC_DCHECK(dispatchers_.empty()); 1227 RTC_DCHECK(dispatchers_.empty());
1138 } 1228 }
1139 1229
1140 void PhysicalSocketServer::WakeUp() { 1230 void PhysicalSocketServer::WakeUp() {
1141 signal_wakeup_->Signal(); 1231 signal_wakeup_->Signal();
1142 } 1232 }
1143 1233
1144 Socket* PhysicalSocketServer::CreateSocket(int type) { 1234 Socket* PhysicalSocketServer::CreateSocket(int type) {
1145 return CreateSocket(AF_INET, type); 1235 return CreateSocket(AF_INET, type);
1146 } 1236 }
(...skipping 27 matching lines...) Expand all
1174 if (dispatcher->Initialize()) { 1264 if (dispatcher->Initialize()) {
1175 return dispatcher; 1265 return dispatcher;
1176 } else { 1266 } else {
1177 delete dispatcher; 1267 delete dispatcher;
1178 return nullptr; 1268 return nullptr;
1179 } 1269 }
1180 } 1270 }
1181 1271
1182 void PhysicalSocketServer::Add(Dispatcher *pdispatcher) { 1272 void PhysicalSocketServer::Add(Dispatcher *pdispatcher) {
1183 CritScope cs(&crit_); 1273 CritScope cs(&crit_);
1274 #if defined(WEBRTC_WIN)
1184 // Prevent duplicates. This can cause dead dispatchers to stick around. 1275 // Prevent duplicates. This can cause dead dispatchers to stick around.
1185 DispatcherList::iterator pos = std::find(dispatchers_.begin(), 1276 DispatcherList::iterator pos = std::find(dispatchers_.begin(),
1186 dispatchers_.end(), 1277 dispatchers_.end(),
1187 pdispatcher); 1278 pdispatcher);
1188 if (pos != dispatchers_.end()) 1279 if (pos != dispatchers_.end())
1189 return; 1280 return;
1190 dispatchers_.push_back(pdispatcher); 1281 dispatchers_.push_back(pdispatcher);
1282 #else
1283 dispatchers_.insert(pdispatcher);
1284 #endif // WEBRTC_WIN
1285 #if defined(WEBRTC_USE_EPOLL)
1286 if (epoll_fd_ != INVALID_SOCKET) {
1287 AddEpoll(pdispatcher);
1288 }
1289 #endif // WEBRTC_USE_EPOLL
1191 } 1290 }
1192 1291
1193 void PhysicalSocketServer::Remove(Dispatcher *pdispatcher) { 1292 void PhysicalSocketServer::Remove(Dispatcher *pdispatcher) {
1194 CritScope cs(&crit_); 1293 CritScope cs(&crit_);
1294 #if defined(WEBRTC_WIN)
1195 DispatcherList::iterator pos = std::find(dispatchers_.begin(), 1295 DispatcherList::iterator pos = std::find(dispatchers_.begin(),
1196 dispatchers_.end(), 1296 dispatchers_.end(),
1197 pdispatcher); 1297 pdispatcher);
1198 // We silently ignore duplicate calls to Add, so we should silently ignore 1298 // We silently ignore duplicate calls to Add, so we should silently ignore
1199 // the (expected) symmetric calls to Remove. Note that this may still hide 1299 // the (expected) symmetric calls to Remove. Note that this may still hide
1200 // a real issue, so we at least log a warning about it. 1300 // a real issue, so we at least log a warning about it.
1201 if (pos == dispatchers_.end()) { 1301 if (pos == dispatchers_.end()) {
1202 LOG(LS_WARNING) << "PhysicalSocketServer asked to remove a unknown " 1302 LOG(LS_WARNING) << "PhysicalSocketServer asked to remove a unknown "
1203 << "dispatcher, potentially from a duplicate call to Add."; 1303 << "dispatcher, potentially from a duplicate call to Add.";
1204 return; 1304 return;
1205 } 1305 }
1206 size_t index = pos - dispatchers_.begin(); 1306 size_t index = pos - dispatchers_.begin();
1207 dispatchers_.erase(pos); 1307 dispatchers_.erase(pos);
1208 for (IteratorList::iterator it = iterators_.begin(); it != iterators_.end(); 1308 for (IteratorList::iterator it = iterators_.begin(); it != iterators_.end();
1209 ++it) { 1309 ++it) {
1210 if (index < **it) { 1310 if (index < **it) {
1211 --**it; 1311 --**it;
1212 } 1312 }
1213 } 1313 }
1314 #else
1315 if (!dispatchers_.erase(pdispatcher)) {
1316 LOG(LS_WARNING) << "PhysicalSocketServer asked to remove a unknown "
1317 << "dispatcher, potentially from a duplicate call to Add.";
1318 return;
1319 }
1320 #endif // WEBRTC_WIN
1321 #if defined(WEBRTC_USE_EPOLL)
1322 if (epoll_fd_ != INVALID_SOCKET) {
1323 RemoveEpoll(pdispatcher);
1324 }
1325 #endif // WEBRTC_USE_EPOLL
1326 }
1327
1328 void PhysicalSocketServer::Update(Dispatcher *pdispatcher) {
1329 #if defined(WEBRTC_USE_EPOLL)
1330 if (epoll_fd_ == INVALID_SOCKET) {
1331 return;
1332 }
1333
1334 CritScope cs(&crit_);
1335 if (dispatchers_.find(pdispatcher) == dispatchers_.end()) {
1336 return;
1337 }
1338
1339 UpdateEpoll(pdispatcher);
1340 #endif
1214 } 1341 }
1215 1342
1216 #if defined(WEBRTC_POSIX) 1343 #if defined(WEBRTC_POSIX)
1344
1217 bool PhysicalSocketServer::Wait(int cmsWait, bool process_io) { 1345 bool PhysicalSocketServer::Wait(int cmsWait, bool process_io) {
1346 #if defined(WEBRTC_USE_EPOLL)
1347 // We don't keep a dedicated "epoll" descriptor containing only the non-IO
1348 // (i.e. signaling) dispatcher, so "poll" will be used instead of the default
1349 // "select" to support sockets larger than FD_SETSIZE.
1350 if (!process_io) {
1351 return WaitPoll(cmsWait, signal_wakeup_);
1352 } else if (epoll_fd_ != INVALID_SOCKET) {
1353 return WaitEpoll(cmsWait);
1354 }
1355 #endif
1356 return WaitSelect(cmsWait, process_io);
1357 }
1358
1359 static void ProcessEvents(Dispatcher* dispatcher, bool readable, bool writable,
1360 bool check_error) {
1361 int errcode = 0;
1362 // TODO(pthatcher): Should we set errcode if getsockopt fails?
1363 if (check_error) {
1364 socklen_t len = sizeof(errcode);
1365 ::getsockopt(dispatcher->GetDescriptor(), SOL_SOCKET, SO_ERROR, &errcode,
1366 &len);
1367 }
1368
1369 uint32_t ff = 0;
1370
1371 // Check readable descriptors. If we're waiting on an accept, signal
1372 // that. Otherwise we're waiting for data, check to see if we're
1373 // readable or really closed.
1374 // TODO(pthatcher): Only peek at TCP descriptors.
1375 if (readable) {
1376 if (dispatcher->GetRequestedEvents() & DE_ACCEPT) {
1377 ff |= DE_ACCEPT;
1378 } else if (errcode || dispatcher->IsDescriptorClosed()) {
1379 ff |= DE_CLOSE;
1380 } else {
1381 ff |= DE_READ;
1382 }
1383 }
1384
1385 // Check writable descriptors. If we're waiting on a connect, detect
1386 // success versus failure by the reaped error code.
1387 if (writable) {
1388 if (dispatcher->GetRequestedEvents() & DE_CONNECT) {
1389 if (!errcode) {
1390 ff |= DE_CONNECT;
1391 } else {
1392 ff |= DE_CLOSE;
1393 }
1394 } else {
1395 ff |= DE_WRITE;
1396 }
1397 }
1398
1399 // Tell the descriptor about the event.
1400 if (ff != 0) {
1401 dispatcher->OnPreEvent(ff);
1402 dispatcher->OnEvent(ff, errcode);
1403 }
1404 }
1405
1406 bool PhysicalSocketServer::WaitSelect(int cmsWait, bool process_io) {
1218 // Calculate timing information 1407 // Calculate timing information
1219 1408
1220 struct timeval* ptvWait = nullptr; 1409 struct timeval* ptvWait = nullptr;
1221 struct timeval tvWait; 1410 struct timeval tvWait;
1222 struct timeval tvStop; 1411 struct timeval tvStop;
1223 if (cmsWait != kForever) { 1412 if (cmsWait != kForever) {
1224 // Calculate wait timeval 1413 // Calculate wait timeval
1225 tvWait.tv_sec = cmsWait / 1000; 1414 tvWait.tv_sec = cmsWait / 1000;
1226 tvWait.tv_usec = (cmsWait % 1000) * 1000; 1415 tvWait.tv_usec = (cmsWait % 1000) * 1000;
1227 ptvWait = &tvWait; 1416 ptvWait = &tvWait;
(...skipping 22 matching lines...) Expand all
1250 __msan_unpoison(&fdsRead, sizeof(fdsRead)); 1439 __msan_unpoison(&fdsRead, sizeof(fdsRead));
1251 __msan_unpoison(&fdsWrite, sizeof(fdsWrite)); 1440 __msan_unpoison(&fdsWrite, sizeof(fdsWrite));
1252 #endif 1441 #endif
1253 1442
1254 fWait_ = true; 1443 fWait_ = true;
1255 1444
1256 while (fWait_) { 1445 while (fWait_) {
1257 int fdmax = -1; 1446 int fdmax = -1;
1258 { 1447 {
1259 CritScope cr(&crit_); 1448 CritScope cr(&crit_);
1260 for (size_t i = 0; i < dispatchers_.size(); ++i) { 1449 for (Dispatcher* pdispatcher : dispatchers_) {
1261 // Query dispatchers for read and write wait state 1450 // Query dispatchers for read and write wait state
1262 Dispatcher *pdispatcher = dispatchers_[i];
1263 RTC_DCHECK(pdispatcher); 1451 RTC_DCHECK(pdispatcher);
1264 if (!process_io && (pdispatcher != signal_wakeup_)) 1452 if (!process_io && (pdispatcher != signal_wakeup_))
1265 continue; 1453 continue;
1266 int fd = pdispatcher->GetDescriptor(); 1454 int fd = pdispatcher->GetDescriptor();
1455 // "select"ing a file descriptor that is equal to or larger than
1456 // FD_SETSIZE will result in undefined behavior.
1457 RTC_DCHECK_LT(fd, FD_SETSIZE);
1267 if (fd > fdmax) 1458 if (fd > fdmax)
1268 fdmax = fd; 1459 fdmax = fd;
1269 1460
1270 uint32_t ff = pdispatcher->GetRequestedEvents(); 1461 uint32_t ff = pdispatcher->GetRequestedEvents();
1271 if (ff & (DE_READ | DE_ACCEPT)) 1462 if (ff & (DE_READ | DE_ACCEPT))
1272 FD_SET(fd, &fdsRead); 1463 FD_SET(fd, &fdsRead);
1273 if (ff & (DE_WRITE | DE_CONNECT)) 1464 if (ff & (DE_WRITE | DE_CONNECT))
1274 FD_SET(fd, &fdsWrite); 1465 FD_SET(fd, &fdsWrite);
1275 } 1466 }
1276 } 1467 }
(...skipping 13 matching lines...) Expand all
1290 // Else ignore the error and keep going. If this EINTR was for one of the 1481 // Else ignore the error and keep going. If this EINTR was for one of the
1291 // signals managed by this PhysicalSocketServer, the 1482 // signals managed by this PhysicalSocketServer, the
1292 // PosixSignalDeliveryDispatcher will be in the signaled state in the next 1483 // PosixSignalDeliveryDispatcher will be in the signaled state in the next
1293 // iteration. 1484 // iteration.
1294 } else if (n == 0) { 1485 } else if (n == 0) {
1295 // If timeout, return success 1486 // If timeout, return success
1296 return true; 1487 return true;
1297 } else { 1488 } else {
1298 // We have signaled descriptors 1489 // We have signaled descriptors
1299 CritScope cr(&crit_); 1490 CritScope cr(&crit_);
1300 for (size_t i = 0; i < dispatchers_.size(); ++i) { 1491 for (Dispatcher *pdispatcher : dispatchers_) {
1301 Dispatcher *pdispatcher = dispatchers_[i];
1302 int fd = pdispatcher->GetDescriptor(); 1492 int fd = pdispatcher->GetDescriptor();
1303 uint32_t ff = 0;
1304 int errcode = 0;
1305 1493
1306 // Reap any error code, which can be signaled through reads or writes. 1494 bool readable = FD_ISSET(fd, &fdsRead);
1307 // TODO(pthatcher): Should we set errcode if getsockopt fails? 1495 if (readable) {
1308 if (FD_ISSET(fd, &fdsRead) || FD_ISSET(fd, &fdsWrite)) { 1496 FD_CLR(fd, &fdsRead);
1309 socklen_t len = sizeof(errcode);
1310 ::getsockopt(fd, SOL_SOCKET, SO_ERROR, &errcode, &len);
1311 } 1497 }
1312 1498
1313 // Check readable descriptors. If we're waiting on an accept, signal 1499 bool writable = FD_ISSET(fd, &fdsWrite);
1314 // that. Otherwise we're waiting for data, check to see if we're 1500 if (writable) {
1315 // readable or really closed. 1501 FD_CLR(fd, &fdsWrite);
1316 // TODO(pthatcher): Only peek at TCP descriptors.
1317 if (FD_ISSET(fd, &fdsRead)) {
1318 FD_CLR(fd, &fdsRead);
1319 if (pdispatcher->GetRequestedEvents() & DE_ACCEPT) {
1320 ff |= DE_ACCEPT;
1321 } else if (errcode || pdispatcher->IsDescriptorClosed()) {
1322 ff |= DE_CLOSE;
1323 } else {
1324 ff |= DE_READ;
1325 }
1326 } 1502 }
1327 1503
1328 // Check writable descriptors. If we're waiting on a connect, detect 1504 // The error code can be signaled through reads or writes.
1329 // success versus failure by the reaped error code. 1505 ProcessEvents(pdispatcher, readable, writable, readable || writable);
1330 if (FD_ISSET(fd, &fdsWrite)) {
1331 FD_CLR(fd, &fdsWrite);
1332 if (pdispatcher->GetRequestedEvents() & DE_CONNECT) {
1333 if (!errcode) {
1334 ff |= DE_CONNECT;
1335 } else {
1336 ff |= DE_CLOSE;
1337 }
1338 } else {
1339 ff |= DE_WRITE;
1340 }
1341 }
1342
1343 // Tell the descriptor about the event.
1344 if (ff != 0) {
1345 pdispatcher->OnPreEvent(ff);
1346 pdispatcher->OnEvent(ff, errcode);
1347 }
1348 } 1506 }
1349 } 1507 }
1350 1508
1351 // Recalc the time remaining to wait. Doing it here means it doesn't get 1509 // Recalc the time remaining to wait. Doing it here means it doesn't get
1352 // calced twice the first time through the loop 1510 // calced twice the first time through the loop
1353 if (ptvWait) { 1511 if (ptvWait) {
1354 ptvWait->tv_sec = 0; 1512 ptvWait->tv_sec = 0;
1355 ptvWait->tv_usec = 0; 1513 ptvWait->tv_usec = 0;
1356 struct timeval tvT; 1514 struct timeval tvT;
1357 gettimeofday(&tvT, nullptr); 1515 gettimeofday(&tvT, nullptr);
1358 if ((tvStop.tv_sec > tvT.tv_sec) 1516 if ((tvStop.tv_sec > tvT.tv_sec)
1359 || ((tvStop.tv_sec == tvT.tv_sec) 1517 || ((tvStop.tv_sec == tvT.tv_sec)
1360 && (tvStop.tv_usec > tvT.tv_usec))) { 1518 && (tvStop.tv_usec > tvT.tv_usec))) {
1361 ptvWait->tv_sec = tvStop.tv_sec - tvT.tv_sec; 1519 ptvWait->tv_sec = tvStop.tv_sec - tvT.tv_sec;
1362 ptvWait->tv_usec = tvStop.tv_usec - tvT.tv_usec; 1520 ptvWait->tv_usec = tvStop.tv_usec - tvT.tv_usec;
1363 if (ptvWait->tv_usec < 0) { 1521 if (ptvWait->tv_usec < 0) {
1364 RTC_DCHECK(ptvWait->tv_sec > 0); 1522 RTC_DCHECK(ptvWait->tv_sec > 0);
1365 ptvWait->tv_usec += 1000000; 1523 ptvWait->tv_usec += 1000000;
1366 ptvWait->tv_sec -= 1; 1524 ptvWait->tv_sec -= 1;
1367 } 1525 }
1368 } 1526 }
1369 } 1527 }
1370 } 1528 }
1371 1529
1372 return true; 1530 return true;
1373 } 1531 }
1374 1532
1533 #if defined(WEBRTC_USE_EPOLL)
1534
1535 // Initial number of events to process with one call to "epoll_wait".
1536 static const size_t kInitialEpollEvents = 128;
1537
1538 // Maximum number of events to process with one call to "epoll_wait".
1539 static const size_t kMaxEpollEvents = 8192;
1540
1541 void PhysicalSocketServer::AddEpoll(Dispatcher* pdispatcher) {
1542 RTC_DCHECK(epoll_fd_ != INVALID_SOCKET);
1543 int fd = pdispatcher->GetDescriptor();
1544 RTC_DCHECK(fd != INVALID_SOCKET);
1545 if (fd == INVALID_SOCKET) {
1546 return;
1547 }
1548
1549 struct epoll_event event = {0};
1550 event.events = GetEpollEvents(pdispatcher->GetRequestedEvents());
1551 event.data.ptr = pdispatcher;
1552 int err = epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, fd, &event);
1553 RTC_DCHECK_EQ(err, 0);
1554 if (err == -1) {
1555 LOG_E(LS_ERROR, EN, errno) << "epoll_ctl EPOLL_CTL_ADD";
1556 }
1557 }
1558
1559 void PhysicalSocketServer::RemoveEpoll(Dispatcher* pdispatcher) {
1560 RTC_DCHECK(epoll_fd_ != INVALID_SOCKET);
1561 int fd = pdispatcher->GetDescriptor();
1562 RTC_DCHECK(fd != INVALID_SOCKET);
1563 if (fd == INVALID_SOCKET) {
1564 return;
1565 }
1566
1567 struct epoll_event event = {0};
1568 int err = epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, fd, &event);
1569 RTC_DCHECK(err == 0 || errno == ENOENT);
1570 if (err == -1) {
1571 if (errno == ENOENT) {
1572 // Socket has already been closed.
1573 LOG_E(LS_VERBOSE, EN, errno) << "epoll_ctl EPOLL_CTL_DEL";
1574 } else {
1575 LOG_E(LS_ERROR, EN, errno) << "epoll_ctl EPOLL_CTL_DEL";
1576 }
1577 }
1578 }
1579
1580 void PhysicalSocketServer::UpdateEpoll(Dispatcher* pdispatcher) {
1581 RTC_DCHECK(epoll_fd_ != INVALID_SOCKET);
1582 int fd = pdispatcher->GetDescriptor();
1583 RTC_DCHECK(fd != INVALID_SOCKET);
1584 if (fd == INVALID_SOCKET) {
1585 return;
1586 }
1587
1588 struct epoll_event event = {0};
1589 event.events = GetEpollEvents(pdispatcher->GetRequestedEvents());
1590 event.data.ptr = pdispatcher;
1591 int err = epoll_ctl(epoll_fd_, EPOLL_CTL_MOD, fd, &event);
1592 RTC_DCHECK_EQ(err, 0);
1593 if (err == -1) {
1594 LOG_E(LS_ERROR, EN, errno) << "epoll_ctl EPOLL_CTL_MOD";
1595 }
1596 }
1597
1598 bool PhysicalSocketServer::WaitEpoll(int cmsWait) {
1599 RTC_DCHECK(epoll_fd_ != INVALID_SOCKET);
1600 int64_t tvWait = -1;
1601 int64_t tvStop = -1;
1602 if (cmsWait != kForever) {
1603 tvWait = cmsWait;
1604 tvStop = TimeAfter(cmsWait);
1605 }
1606
1607 if (epoll_events_.empty()) {
1608 // The initial space to receive events is created only if epoll is used.
1609 epoll_events_.resize(kInitialEpollEvents);
1610 }
1611
1612 fWait_ = true;
1613
1614 while (fWait_) {
1615 // Wait then call handlers as appropriate
1616 // < 0 means error
1617 // 0 means timeout
1618 // > 0 means count of descriptors ready
1619 int n = epoll_wait(epoll_fd_, &epoll_events_[0],
1620 static_cast<int>(epoll_events_.size()), static_cast<int>(tvWait));
1621 if (n < 0) {
1622 if (errno != EINTR) {
1623 LOG_E(LS_ERROR, EN, errno) << "epoll";
1624 return false;
1625 }
1626 // Else ignore the error and keep going. If this EINTR was for one of the
1627 // signals managed by this PhysicalSocketServer, the
1628 // PosixSignalDeliveryDispatcher will be in the signaled state in the next
1629 // iteration.
1630 } else if (n == 0) {
1631 // If timeout, return success
1632 return true;
1633 } else {
1634 // We have signaled descriptors
1635 CritScope cr(&crit_);
1636 for (int i = 0; i < n; ++i) {
1637 const epoll_event& event = epoll_events_[i];
1638 Dispatcher* pdispatcher = static_cast<Dispatcher*>(event.data.ptr);
1639 if (dispatchers_.find(pdispatcher) == dispatchers_.end()) {
1640 // The dispatcher for this socket no longer exists.
1641 continue;
1642 }
1643
1644 bool readable = (event.events & (EPOLLIN | EPOLLPRI));
1645 bool writable = (event.events & EPOLLOUT);
1646 bool check_error = (event.events & (EPOLLRDHUP | EPOLLERR | EPOLLHUP));
1647
1648 ProcessEvents(pdispatcher, readable, writable, check_error);
1649 }
1650 }
1651
1652 if (static_cast<size_t>(n) == epoll_events_.size() &&
1653 epoll_events_.size() < kMaxEpollEvents) {
1654 // We used the complete space to receive events, increase size for future
1655 // iterations.
1656 epoll_events_.resize(std::max(epoll_events_.size() * 2, kMaxEpollEvents));
1657 }
1658
1659 if (cmsWait != kForever) {
1660 tvWait = TimeDiff(tvStop, TimeMillis());
1661 if (tvWait < 0) {
1662 // Return success on timeout.
1663 return true;
1664 }
1665 }
1666 }
1667
1668 return true;
1669 }
1670
1671 bool PhysicalSocketServer::WaitPoll(int cmsWait, Dispatcher* dispatcher) {
1672 RTC_DCHECK(dispatcher);
1673 int64_t tvWait = -1;
1674 int64_t tvStop = -1;
1675 if (cmsWait != kForever) {
1676 tvWait = cmsWait;
1677 tvStop = TimeAfter(cmsWait);
1678 }
1679
1680 fWait_ = true;
1681
1682 struct pollfd fds = {0};
1683 int fd = dispatcher->GetDescriptor();
1684 fds.fd = fd;
1685
1686 while (fWait_) {
1687 uint32_t ff = dispatcher->GetRequestedEvents();
1688 fds.events = 0;
1689 if (ff & (DE_READ | DE_ACCEPT)) {
1690 fds.events |= POLLIN;
1691 }
1692 if (ff & (DE_WRITE | DE_CONNECT)) {
1693 fds.events |= POLLOUT;
1694 }
1695 fds.revents = 0;
1696
1697 // Wait then call handlers as appropriate
1698 // < 0 means error
1699 // 0 means timeout
1700 // > 0 means count of descriptors ready
1701 int n = poll(&fds, 1, static_cast<int>(tvWait));
1702 if (n < 0) {
1703 if (errno != EINTR) {
1704 LOG_E(LS_ERROR, EN, errno) << "poll";
1705 return false;
1706 }
1707 // Else ignore the error and keep going. If this EINTR was for one of the
1708 // signals managed by this PhysicalSocketServer, the
1709 // PosixSignalDeliveryDispatcher will be in the signaled state in the next
1710 // iteration.
1711 } else if (n == 0) {
1712 // If timeout, return success
1713 return true;
1714 } else {
1715 // We have signaled descriptors (should only be the passed dispatcher).
1716 RTC_DCHECK_EQ(n, 1);
1717 RTC_DCHECK_EQ(fds.fd, fd);
1718
1719 bool readable = (fds.revents & (POLLIN | POLLPRI));
1720 bool writable = (fds.revents & POLLOUT);
1721 bool check_error = (fds.revents & (POLLRDHUP | POLLERR | POLLHUP));
1722
1723 ProcessEvents(dispatcher, readable, writable, check_error);
1724 }
1725
1726 if (cmsWait != kForever) {
1727 tvWait = TimeDiff(tvStop, TimeMillis());
1728 if (tvWait < 0) {
1729 // Return success on timeout.
1730 return true;
1731 }
1732 }
1733 }
1734
1735 return true;
1736 }
1737
1738 #endif // WEBRTC_USE_EPOLL
1739
1375 static void GlobalSignalHandler(int signum) { 1740 static void GlobalSignalHandler(int signum) {
1376 PosixSignalHandler::Instance()->OnPosixSignalReceived(signum); 1741 PosixSignalHandler::Instance()->OnPosixSignalReceived(signum);
1377 } 1742 }
1378 1743
1379 bool PhysicalSocketServer::SetPosixSignalHandler(int signum, 1744 bool PhysicalSocketServer::SetPosixSignalHandler(int signum,
1380 void (*handler)(int)) { 1745 void (*handler)(int)) {
1381 // If handler is SIG_IGN or SIG_DFL then clear our user-level handler, 1746 // If handler is SIG_IGN or SIG_DFL then clear our user-level handler,
1382 // otherwise set one. 1747 // otherwise set one.
1383 if (handler == SIG_IGN || handler == SIG_DFL) { 1748 if (handler == SIG_IGN || handler == SIG_DFL) {
1384 if (!InstallSignal(signum, handler)) { 1749 if (!InstallSignal(signum, handler)) {
(...skipping 201 matching lines...) Expand 10 before | Expand all | Expand 10 after
1586 break; 1951 break;
1587 } 1952 }
1588 } 1953 }
1589 1954
1590 // Done 1955 // Done
1591 return true; 1956 return true;
1592 } 1957 }
1593 #endif // WEBRTC_WIN 1958 #endif // WEBRTC_WIN
1594 1959
1595 } // namespace rtc 1960 } // namespace rtc
OLDNEW
« no previous file with comments | « webrtc/base/physicalsocketserver.h ('k') | no next file » | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698