Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(29)

Side by Side Diff: webrtc/base/physicalsocketserver.cc

Issue 2880923002: Support epoll in PhysicalSocketServer. (Closed)
Patch Set: Feedback from Taylor. Created 3 years, 7 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « webrtc/base/physicalsocketserver.h ('k') | no next file » | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 /* 1 /*
2 * Copyright 2004 The WebRTC Project Authors. All rights reserved. 2 * Copyright 2004 The WebRTC Project Authors. All rights reserved.
3 * 3 *
4 * Use of this source code is governed by a BSD-style license 4 * Use of this source code is governed by a BSD-style license
5 * that can be found in the LICENSE file in the root of the source 5 * that can be found in the LICENSE file in the root of the source
6 * tree. An additional intellectual property rights grant can be found 6 * tree. An additional intellectual property rights grant can be found
7 * in the file PATENTS. All contributing project authors may 7 * in the file PATENTS. All contributing project authors may
8 * be found in the AUTHORS file in the root of the source tree. 8 * be found in the AUTHORS file in the root of the source tree.
9 */ 9 */
10 #include "webrtc/base/physicalsocketserver.h" 10 #include "webrtc/base/physicalsocketserver.h"
(...skipping 62 matching lines...) Expand 10 before | Expand all | Expand 10 after
73 73
74 int64_t GetSocketRecvTimestamp(int socket) { 74 int64_t GetSocketRecvTimestamp(int socket) {
75 return -1; 75 return -1;
76 } 76 }
77 #endif 77 #endif
78 78
79 #if defined(WEBRTC_WIN) 79 #if defined(WEBRTC_WIN)
80 typedef char* SockOptArg; 80 typedef char* SockOptArg;
81 #endif 81 #endif
82 82
83 #if defined(WEBRTC_USE_EPOLL)
84 // EPOLLRDHUP is only defined starting with Linux 2.6.17.
85 #if !defined(EPOLLRDHUP)
86 #define EPOLLRDHUP 0x2000
87 #endif
88 #endif
89
83 namespace rtc { 90 namespace rtc {
84 91
85 std::unique_ptr<SocketServer> SocketServer::CreateDefault() { 92 std::unique_ptr<SocketServer> SocketServer::CreateDefault() {
86 #if defined(__native_client__) 93 #if defined(__native_client__)
87 return std::unique_ptr<SocketServer>(new rtc::NullSocketServer); 94 return std::unique_ptr<SocketServer>(new rtc::NullSocketServer);
88 #else 95 #else
89 return std::unique_ptr<SocketServer>(new rtc::PhysicalSocketServer); 96 return std::unique_ptr<SocketServer>(new rtc::PhysicalSocketServer);
90 #endif 97 #endif
91 } 98 }
92 99
(...skipping 665 matching lines...) Expand 10 before | Expand all | Expand 10 after
758 } 765 }
759 if (((ff & DE_CLOSE) != 0) && (id_ == cache_id)) { 766 if (((ff & DE_CLOSE) != 0) && (id_ == cache_id)) {
760 signal_close_ = true; 767 signal_close_ = true;
761 signal_err_ = err; 768 signal_err_ = err;
762 } 769 }
763 } 770 }
764 771
765 #elif defined(WEBRTC_POSIX) 772 #elif defined(WEBRTC_POSIX)
766 773
767 void SocketDispatcher::OnEvent(uint32_t ff, int err) { 774 void SocketDispatcher::OnEvent(uint32_t ff, int err) {
775 #if defined(WEBRTC_USE_EPOLL)
776 // Remember currently enabled events so we can combine multiple changes
777 // into one update call later.
778 SaveEnabledEvents();
779 #endif
768 // Make sure we deliver connect/accept first. Otherwise, consumers may see 780 // Make sure we deliver connect/accept first. Otherwise, consumers may see
769 // something like a READ followed by a CONNECT, which would be odd. 781 // something like a READ followed by a CONNECT, which would be odd.
770 if ((ff & DE_CONNECT) != 0) { 782 if ((ff & DE_CONNECT) != 0) {
771 DisableEvents(DE_CONNECT); 783 DisableEvents(DE_CONNECT);
772 SignalConnectEvent(this); 784 SignalConnectEvent(this);
773 } 785 }
774 if ((ff & DE_ACCEPT) != 0) { 786 if ((ff & DE_ACCEPT) != 0) {
775 DisableEvents(DE_ACCEPT); 787 DisableEvents(DE_ACCEPT);
776 SignalReadEvent(this); 788 SignalReadEvent(this);
777 } 789 }
778 if ((ff & DE_READ) != 0) { 790 if ((ff & DE_READ) != 0) {
779 DisableEvents(DE_READ); 791 DisableEvents(DE_READ);
780 SignalReadEvent(this); 792 SignalReadEvent(this);
781 } 793 }
782 if ((ff & DE_WRITE) != 0) { 794 if ((ff & DE_WRITE) != 0) {
783 DisableEvents(DE_WRITE); 795 DisableEvents(DE_WRITE);
784 SignalWriteEvent(this); 796 SignalWriteEvent(this);
785 } 797 }
786 if ((ff & DE_CLOSE) != 0) { 798 if ((ff & DE_CLOSE) != 0) {
787 // The socket is now dead to us, so stop checking it. 799 // The socket is now dead to us, so stop checking it.
788 SetEnabledEvents(0); 800 SetEnabledEvents(0);
789 SignalCloseEvent(this, err); 801 SignalCloseEvent(this, err);
790 } 802 }
803 #if defined(WEBRTC_USE_EPOLL)
804 RestoreEnabledEvents();
Taylor Brandstetter 2017/05/17 23:19:02 This name isn't very intuitive; it sounds like it
joachim 2017/05/18 12:41:49 Keeping a bitmask of events to disable won't work,
Taylor Brandstetter 2017/05/18 21:59:56 Ah, that's what I was missing. Can you mention it
joachim 2017/05/19 20:46:23 Done.
805 #endif
791 } 806 }
792 807
793 #endif // WEBRTC_POSIX 808 #endif // WEBRTC_POSIX
794 809
810 #if defined(WEBRTC_USE_EPOLL)
811
812 void SocketDispatcher::SaveEnabledEvents() {
813 RTC_DCHECK_EQ(saved_enabled_events_, -1);
814 saved_enabled_events_ = enabled_events();
815 }
816
817 void SocketDispatcher::RestoreEnabledEvents() {
818 RTC_DCHECK_NE(saved_enabled_events_, -1);
819 uint8_t old_events = static_cast<uint8_t>(saved_enabled_events_);
820 saved_enabled_events_ = -1;
821 MaybeUpdateDispatcher(old_events);
822 }
823
824 void SocketDispatcher::MaybeUpdateDispatcher(uint8_t old_events) {
825 if (enabled_events() != old_events && saved_enabled_events_ == -1) {
Taylor Brandstetter 2017/05/17 23:19:02 Since DE_READ and DE_ACCEPT both map to EPOLLIN: s
joachim 2017/05/18 12:41:49 Right, thanks for spotting.
826 ss_->Update(this);
827 }
828 }
829
830 void SocketDispatcher::SetEnabledEvents(uint8_t events) {
831 uint8_t old_events = enabled_events();
832 PhysicalSocket::SetEnabledEvents(events);
833 MaybeUpdateDispatcher(old_events);
834 }
835
836 void SocketDispatcher::EnableEvents(uint8_t events) {
837 uint8_t old_events = enabled_events();
838 PhysicalSocket::EnableEvents(events);
839 MaybeUpdateDispatcher(old_events);
840 }
841
842 void SocketDispatcher::DisableEvents(uint8_t events) {
843 uint8_t old_events = enabled_events();
844 PhysicalSocket::DisableEvents(events);
845 MaybeUpdateDispatcher(old_events);
846 }
847
848 #endif // WEBRTC_USE_EPOLL
849
795 int SocketDispatcher::Close() { 850 int SocketDispatcher::Close() {
796 if (s_ == INVALID_SOCKET) 851 if (s_ == INVALID_SOCKET)
797 return 0; 852 return 0;
798 853
799 #if defined(WEBRTC_WIN) 854 #if defined(WEBRTC_WIN)
800 id_ = 0; 855 id_ = 0;
801 signal_close_ = false; 856 signal_close_ = false;
802 #endif 857 #endif
803 ss_->Remove(this); 858 ss_->Remove(this);
804 return PhysicalSocket::Close(); 859 return PhysicalSocket::Close();
(...skipping 308 matching lines...) Expand 10 before | Expand all | Expand 10 after
1113 if (pf_) 1168 if (pf_)
1114 *pf_ = false; 1169 *pf_ = false;
1115 } 1170 }
1116 1171
1117 private: 1172 private:
1118 bool *pf_; 1173 bool *pf_;
1119 }; 1174 };
1120 1175
1121 PhysicalSocketServer::PhysicalSocketServer() 1176 PhysicalSocketServer::PhysicalSocketServer()
1122 : fWait_(false) { 1177 : fWait_(false) {
1178 #if defined(WEBRTC_USE_EPOLL)
1179 // Since Linux 2.6.8, the size argument is ignored, but must be greater than
1180 // zero. Before that the size served as hint to the kernel for the amount of
1181 // space to initially allocate in internal data structures.
1182 epoll_fd_ = epoll_create(FD_SETSIZE);
1183 if (epoll_fd_ == -1) {
1184 // Not an error, will fall back to "select" below.
1185 LOG_E(LS_WARNING, EN, errno) << "epoll_create";
1186 epoll_fd_ = INVALID_SOCKET;
1187 }
1188 #endif
1123 signal_wakeup_ = new Signaler(this, &fWait_); 1189 signal_wakeup_ = new Signaler(this, &fWait_);
1124 #if defined(WEBRTC_WIN) 1190 #if defined(WEBRTC_WIN)
1125 socket_ev_ = WSACreateEvent(); 1191 socket_ev_ = WSACreateEvent();
1126 #endif 1192 #endif
1127 } 1193 }
1128 1194
1129 PhysicalSocketServer::~PhysicalSocketServer() { 1195 PhysicalSocketServer::~PhysicalSocketServer() {
1130 #if defined(WEBRTC_WIN) 1196 #if defined(WEBRTC_WIN)
1131 WSACloseEvent(socket_ev_); 1197 WSACloseEvent(socket_ev_);
1132 #endif 1198 #endif
1133 #if defined(WEBRTC_POSIX) 1199 #if defined(WEBRTC_POSIX)
1134 signal_dispatcher_.reset(); 1200 signal_dispatcher_.reset();
1135 #endif 1201 #endif
1136 delete signal_wakeup_; 1202 delete signal_wakeup_;
1203 #if defined(WEBRTC_USE_EPOLL)
1204 if (epoll_fd_ != INVALID_SOCKET) {
1205 close(epoll_fd_);
1206 }
1207 #endif
1137 RTC_DCHECK(dispatchers_.empty()); 1208 RTC_DCHECK(dispatchers_.empty());
1138 } 1209 }
1139 1210
1140 void PhysicalSocketServer::WakeUp() { 1211 void PhysicalSocketServer::WakeUp() {
1141 signal_wakeup_->Signal(); 1212 signal_wakeup_->Signal();
1142 } 1213 }
1143 1214
1144 Socket* PhysicalSocketServer::CreateSocket(int type) { 1215 Socket* PhysicalSocketServer::CreateSocket(int type) {
1145 return CreateSocket(AF_INET, type); 1216 return CreateSocket(AF_INET, type);
1146 } 1217 }
(...skipping 27 matching lines...) Expand all
1174 if (dispatcher->Initialize()) { 1245 if (dispatcher->Initialize()) {
1175 return dispatcher; 1246 return dispatcher;
1176 } else { 1247 } else {
1177 delete dispatcher; 1248 delete dispatcher;
1178 return nullptr; 1249 return nullptr;
1179 } 1250 }
1180 } 1251 }
1181 1252
1182 void PhysicalSocketServer::Add(Dispatcher *pdispatcher) { 1253 void PhysicalSocketServer::Add(Dispatcher *pdispatcher) {
1183 CritScope cs(&crit_); 1254 CritScope cs(&crit_);
1255 #if defined(WEBRTC_WIN)
1184 // Prevent duplicates. This can cause dead dispatchers to stick around. 1256 // Prevent duplicates. This can cause dead dispatchers to stick around.
1185 DispatcherList::iterator pos = std::find(dispatchers_.begin(), 1257 DispatcherList::iterator pos = std::find(dispatchers_.begin(),
1186 dispatchers_.end(), 1258 dispatchers_.end(),
1187 pdispatcher); 1259 pdispatcher);
1188 if (pos != dispatchers_.end()) 1260 if (pos != dispatchers_.end())
1189 return; 1261 return;
1190 dispatchers_.push_back(pdispatcher); 1262 dispatchers_.push_back(pdispatcher);
1263 #else
1264 dispatchers_.insert(pdispatcher);
1265 #endif // WEBRTC_WIN
1266 #if defined(WEBRTC_USE_EPOLL)
1267 if (epoll_fd_ != INVALID_SOCKET) {
1268 AddEpoll(pdispatcher);
1269 }
1270 #endif // WEBRTC_USE_EPOLL
1191 } 1271 }
1192 1272
1193 void PhysicalSocketServer::Remove(Dispatcher *pdispatcher) { 1273 void PhysicalSocketServer::Remove(Dispatcher *pdispatcher) {
1194 CritScope cs(&crit_); 1274 CritScope cs(&crit_);
1275 #if defined(WEBRTC_WIN)
1195 DispatcherList::iterator pos = std::find(dispatchers_.begin(), 1276 DispatcherList::iterator pos = std::find(dispatchers_.begin(),
1196 dispatchers_.end(), 1277 dispatchers_.end(),
1197 pdispatcher); 1278 pdispatcher);
1198 // We silently ignore duplicate calls to Add, so we should silently ignore 1279 // We silently ignore duplicate calls to Add, so we should silently ignore
1199 // the (expected) symmetric calls to Remove. Note that this may still hide 1280 // the (expected) symmetric calls to Remove. Note that this may still hide
1200 // a real issue, so we at least log a warning about it. 1281 // a real issue, so we at least log a warning about it.
1201 if (pos == dispatchers_.end()) { 1282 if (pos == dispatchers_.end()) {
1202 LOG(LS_WARNING) << "PhysicalSocketServer asked to remove a unknown " 1283 LOG(LS_WARNING) << "PhysicalSocketServer asked to remove a unknown "
1203 << "dispatcher, potentially from a duplicate call to Add."; 1284 << "dispatcher, potentially from a duplicate call to Add.";
1204 return; 1285 return;
1205 } 1286 }
1206 size_t index = pos - dispatchers_.begin(); 1287 size_t index = pos - dispatchers_.begin();
1207 dispatchers_.erase(pos); 1288 dispatchers_.erase(pos);
1208 for (IteratorList::iterator it = iterators_.begin(); it != iterators_.end(); 1289 for (IteratorList::iterator it = iterators_.begin(); it != iterators_.end();
1209 ++it) { 1290 ++it) {
1210 if (index < **it) { 1291 if (index < **it) {
1211 --**it; 1292 --**it;
1212 } 1293 }
1213 } 1294 }
1295 #else
1296 if (!dispatchers_.erase(pdispatcher)) {
1297 LOG(LS_WARNING) << "PhysicalSocketServer asked to remove a unknown "
1298 << "dispatcher, potentially from a duplicate call to Add.";
1299 return;
1300 }
1301 #endif // WEBRTC_WIN
1302 #if defined(WEBRTC_USE_EPOLL)
1303 if (epoll_fd_ != INVALID_SOCKET) {
1304 RemoveEpoll(pdispatcher);
1305 }
1306 #endif // WEBRTC_USE_EPOLL
1307 }
1308
1309 void PhysicalSocketServer::Update(Dispatcher *pdispatcher) {
1310 #if defined(WEBRTC_USE_EPOLL)
1311 if (epoll_fd_ == INVALID_SOCKET) {
1312 return;
1313 }
1314
1315 CritScope cs(&crit_);
1316 if (dispatchers_.find(pdispatcher) == dispatchers_.end()) {
1317 return;
1318 }
1319
1320 UpdateEpoll(pdispatcher);
1321 #endif
1214 } 1322 }
1215 1323
1216 #if defined(WEBRTC_POSIX) 1324 #if defined(WEBRTC_POSIX)
1325
1217 bool PhysicalSocketServer::Wait(int cmsWait, bool process_io) { 1326 bool PhysicalSocketServer::Wait(int cmsWait, bool process_io) {
1327 #if defined(WEBRTC_USE_EPOLL)
1328 // We don't keep a dedicated epoll descriptor containing only non-IO (i.e.
1329 // signaling) dispatchers, so the default "select" loop will be used in that
1330 // case.
1331 if (epoll_fd_ != INVALID_SOCKET && process_io) {
1332 return WaitEpoll(cmsWait);
1333 }
1334 #endif
1335 return WaitSelect(cmsWait, process_io);
1336 }
zhaoyanfeng 2017/05/18 07:46:17 If process_io == false, still use select? That see
joachim 2017/05/18 12:41:49 The wakeup dispatcher is created early during cons
zhaoyanfeng 2017/05/18 14:08:37 I have tested it in our server but failed, our ser
joachim 2017/05/18 19:31:53 I changed it now to use "poll" to wait for the sig
1337
1338 bool PhysicalSocketServer::WaitSelect(int cmsWait, bool process_io) {
1218 // Calculate timing information 1339 // Calculate timing information
1219 1340
1220 struct timeval* ptvWait = nullptr; 1341 struct timeval* ptvWait = nullptr;
1221 struct timeval tvWait; 1342 struct timeval tvWait;
1222 struct timeval tvStop; 1343 struct timeval tvStop;
1223 if (cmsWait != kForever) { 1344 if (cmsWait != kForever) {
1224 // Calculate wait timeval 1345 // Calculate wait timeval
1225 tvWait.tv_sec = cmsWait / 1000; 1346 tvWait.tv_sec = cmsWait / 1000;
1226 tvWait.tv_usec = (cmsWait % 1000) * 1000; 1347 tvWait.tv_usec = (cmsWait % 1000) * 1000;
1227 ptvWait = &tvWait; 1348 ptvWait = &tvWait;
(...skipping 22 matching lines...) Expand all
1250 __msan_unpoison(&fdsRead, sizeof(fdsRead)); 1371 __msan_unpoison(&fdsRead, sizeof(fdsRead));
1251 __msan_unpoison(&fdsWrite, sizeof(fdsWrite)); 1372 __msan_unpoison(&fdsWrite, sizeof(fdsWrite));
1252 #endif 1373 #endif
1253 1374
1254 fWait_ = true; 1375 fWait_ = true;
1255 1376
1256 while (fWait_) { 1377 while (fWait_) {
1257 int fdmax = -1; 1378 int fdmax = -1;
1258 { 1379 {
1259 CritScope cr(&crit_); 1380 CritScope cr(&crit_);
1260 for (size_t i = 0; i < dispatchers_.size(); ++i) { 1381 for (Dispatcher* pdispatcher : dispatchers_) {
1261 // Query dispatchers for read and write wait state 1382 // Query dispatchers for read and write wait state
1262 Dispatcher *pdispatcher = dispatchers_[i];
1263 RTC_DCHECK(pdispatcher); 1383 RTC_DCHECK(pdispatcher);
1264 if (!process_io && (pdispatcher != signal_wakeup_)) 1384 if (!process_io && (pdispatcher != signal_wakeup_))
1265 continue; 1385 continue;
1266 int fd = pdispatcher->GetDescriptor(); 1386 int fd = pdispatcher->GetDescriptor();
1387 // "select"ing a file descriptor that is equal to or larger than
1388 // FD_SETSIZE will result in undefined behavior.
1389 RTC_DCHECK_LT(fd, FD_SETSIZE);
1267 if (fd > fdmax) 1390 if (fd > fdmax)
1268 fdmax = fd; 1391 fdmax = fd;
1269 1392
1270 uint32_t ff = pdispatcher->GetRequestedEvents(); 1393 uint32_t ff = pdispatcher->GetRequestedEvents();
1271 if (ff & (DE_READ | DE_ACCEPT)) 1394 if (ff & (DE_READ | DE_ACCEPT))
1272 FD_SET(fd, &fdsRead); 1395 FD_SET(fd, &fdsRead);
1273 if (ff & (DE_WRITE | DE_CONNECT)) 1396 if (ff & (DE_WRITE | DE_CONNECT))
1274 FD_SET(fd, &fdsWrite); 1397 FD_SET(fd, &fdsWrite);
1275 } 1398 }
1276 } 1399 }
(...skipping 13 matching lines...) Expand all
1290 // Else ignore the error and keep going. If this EINTR was for one of the 1413 // Else ignore the error and keep going. If this EINTR was for one of the
1291 // signals managed by this PhysicalSocketServer, the 1414 // signals managed by this PhysicalSocketServer, the
1292 // PosixSignalDeliveryDispatcher will be in the signaled state in the next 1415 // PosixSignalDeliveryDispatcher will be in the signaled state in the next
1293 // iteration. 1416 // iteration.
1294 } else if (n == 0) { 1417 } else if (n == 0) {
1295 // If timeout, return success 1418 // If timeout, return success
1296 return true; 1419 return true;
1297 } else { 1420 } else {
1298 // We have signaled descriptors 1421 // We have signaled descriptors
1299 CritScope cr(&crit_); 1422 CritScope cr(&crit_);
1300 for (size_t i = 0; i < dispatchers_.size(); ++i) { 1423 for (Dispatcher *pdispatcher : dispatchers_) {
1301 Dispatcher *pdispatcher = dispatchers_[i];
1302 int fd = pdispatcher->GetDescriptor(); 1424 int fd = pdispatcher->GetDescriptor();
1303 uint32_t ff = 0; 1425 uint32_t ff = 0;
1304 int errcode = 0; 1426 int errcode = 0;
1305 1427
1306 // Reap any error code, which can be signaled through reads or writes. 1428 // Reap any error code, which can be signaled through reads or writes.
1307 // TODO(pthatcher): Should we set errcode if getsockopt fails? 1429 // TODO(pthatcher): Should we set errcode if getsockopt fails?
1308 if (FD_ISSET(fd, &fdsRead) || FD_ISSET(fd, &fdsWrite)) { 1430 if (FD_ISSET(fd, &fdsRead) || FD_ISSET(fd, &fdsWrite)) {
1309 socklen_t len = sizeof(errcode); 1431 socklen_t len = sizeof(errcode);
1310 ::getsockopt(fd, SOL_SOCKET, SO_ERROR, &errcode, &len); 1432 ::getsockopt(fd, SOL_SOCKET, SO_ERROR, &errcode, &len);
1311 } 1433 }
(...skipping 53 matching lines...) Expand 10 before | Expand all | Expand 10 after
1365 ptvWait->tv_usec += 1000000; 1487 ptvWait->tv_usec += 1000000;
1366 ptvWait->tv_sec -= 1; 1488 ptvWait->tv_sec -= 1;
1367 } 1489 }
1368 } 1490 }
1369 } 1491 }
1370 } 1492 }
1371 1493
1372 return true; 1494 return true;
1373 } 1495 }
1374 1496
1497 #if defined(WEBRTC_USE_EPOLL)
1498
1499 // Initial number of events to process with one call to "epoll_wait".
1500 static const size_t kInitialEpollEvents = 128;
1501
1502 // Maximum number of events to process with one call to "epoll_wait".
1503 static const size_t kMaxEpollEvents = 8192;
1504
1505 static int GetEpollEvents(Dispatcher* pdispatcher) {
1506 int events = 0;
1507 uint32_t ff = pdispatcher->GetRequestedEvents();
1508 if (ff & (DE_READ | DE_ACCEPT)) {
1509 events |= EPOLLIN;
1510 }
1511 if (ff & (DE_WRITE | DE_CONNECT)) {
1512 events |= EPOLLOUT;
1513 }
1514 return events;
1515 }
1516
1517 void PhysicalSocketServer::AddEpoll(Dispatcher* pdispatcher) {
1518 RTC_DCHECK(epoll_fd_ != INVALID_SOCKET);
1519 int fd = pdispatcher->GetDescriptor();
1520 RTC_DCHECK(fd != INVALID_SOCKET);
1521 if (fd == INVALID_SOCKET) {
1522 return;
1523 }
1524
1525 struct epoll_event event = {0};
1526 event.events = GetEpollEvents(pdispatcher);
1527 event.data.ptr = pdispatcher;
1528 int err = epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, fd, &event);
1529 RTC_DCHECK_EQ(err, 0);
1530 if (err == -1) {
1531 LOG_E(LS_ERROR, EN, errno) << "epoll_ctl EPOLL_CTL_ADD";
1532 }
1533 }
1534
1535 void PhysicalSocketServer::RemoveEpoll(Dispatcher* pdispatcher) {
1536 RTC_DCHECK(epoll_fd_ != INVALID_SOCKET);
1537 int fd = pdispatcher->GetDescriptor();
1538 RTC_DCHECK(fd != INVALID_SOCKET);
1539 if (fd == INVALID_SOCKET) {
1540 return;
1541 }
1542
1543 struct epoll_event event = {0};
1544 int err = epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, fd, &event);
1545 RTC_DCHECK(err == 0 || errno == ENOENT);
1546 if (err == -1) {
1547 if (errno == ENOENT) {
1548 // Socket has already been closed.
1549 LOG_E(LS_VERBOSE, EN, errno) << "epoll_ctl EPOLL_CTL_DEL";
1550 } else {
1551 LOG_E(LS_ERROR, EN, errno) << "epoll_ctl EPOLL_CTL_DEL";
1552 }
1553 }
1554 }
1555
1556 void PhysicalSocketServer::UpdateEpoll(Dispatcher* pdispatcher) {
1557 RTC_DCHECK(epoll_fd_ != INVALID_SOCKET);
1558 int fd = pdispatcher->GetDescriptor();
1559 RTC_DCHECK(fd != INVALID_SOCKET);
1560 if (fd == INVALID_SOCKET) {
1561 return;
1562 }
1563
1564 struct epoll_event event = {0};
1565 event.events = GetEpollEvents(pdispatcher);
1566 event.data.ptr = pdispatcher;
1567 int err = epoll_ctl(epoll_fd_, EPOLL_CTL_MOD, fd, &event);
1568 RTC_DCHECK_EQ(err, 0);
1569 if (err == -1) {
1570 LOG_E(LS_ERROR, EN, errno) << "epoll_ctl EPOLL_CTL_MOD";
1571 }
1572 }
1573
1574 bool PhysicalSocketServer::WaitEpoll(int cmsWait) {
1575 RTC_DCHECK(epoll_fd_ != INVALID_SOCKET);
1576 int64_t tvWait = -1;
1577 int64_t tvStop = -1;
1578 if (cmsWait != kForever) {
1579 tvWait = cmsWait;
1580 tvStop = TimeAfter(cmsWait);
1581 }
1582
1583 if (epoll_events_.empty()) {
1584 // The initial space to receive events is created only if epoll is used.
1585 epoll_events_.resize(kInitialEpollEvents);
1586 }
1587
1588 fWait_ = true;
1589
1590 while (fWait_) {
1591 // Wait then call handlers as appropriate
1592 // < 0 means error
1593 // 0 means timeout
1594 // > 0 means count of descriptors ready
1595 int n = epoll_wait(epoll_fd_, &epoll_events_[0],
1596 static_cast<int>(epoll_events_.size()), static_cast<int>(tvWait));
1597 if (n < 0) {
1598 if (errno != EINTR) {
1599 LOG_E(LS_ERROR, EN, errno) << "epoll";
1600 return false;
1601 }
1602 // Else ignore the error and keep going. If this EINTR was for one of the
1603 // signals managed by this PhysicalSocketServer, the
1604 // PosixSignalDeliveryDispatcher will be in the signaled state in the next
1605 // iteration.
1606 } else if (n == 0) {
1607 // If timeout, return success
1608 return true;
1609 } else {
1610 // We have signaled descriptors
1611 CritScope cr(&crit_);
1612 for (int i = 0; i < n; ++i) {
1613 const epoll_event& event = epoll_events_[i];
1614 Dispatcher* pdispatcher = static_cast<Dispatcher*>(event.data.ptr);
1615 if (dispatchers_.find(pdispatcher) == dispatchers_.end()) {
1616 // The dispatcher for this socket no longer exists.
1617 continue;
1618 }
1619
1620 int fd = pdispatcher->GetDescriptor();
1621 uint32_t ff = 0;
1622 int errcode = 0;
1623
1624 // Reap any error code.
1625 // TODO(pthatcher): Should we set errcode if getsockopt fails?
1626 if (event.events & (EPOLLRDHUP | EPOLLERR | EPOLLHUP)) {
1627 socklen_t len = sizeof(errcode);
1628 ::getsockopt(fd, SOL_SOCKET, SO_ERROR, &errcode, &len);
1629 }
1630
1631 // Check readable descriptors. If we're waiting on an accept, signal
1632 // that. Otherwise we're waiting for data, check to see if we're
1633 // readable or really closed.
1634 // TODO(pthatcher): Only peek at TCP descriptors.
1635 if ((event.events & (EPOLLIN | EPOLLPRI))) {
1636 if (pdispatcher->GetRequestedEvents() & DE_ACCEPT) {
1637 ff |= DE_ACCEPT;
1638 } else if (errcode || pdispatcher->IsDescriptorClosed()) {
1639 ff |= DE_CLOSE;
1640 } else {
1641 ff |= DE_READ;
1642 }
1643 }
1644
1645 // Check writable descriptors. If we're waiting on a connect, detect
1646 // success versus failure by the reaped error code.
1647 if ((event.events & EPOLLOUT)) {
1648 if (pdispatcher->GetRequestedEvents() & DE_CONNECT) {
1649 if (!errcode) {
1650 ff |= DE_CONNECT;
1651 } else {
1652 ff |= DE_CLOSE;
1653 }
1654 } else {
1655 ff |= DE_WRITE;
1656 }
1657 }
1658
1659 // Tell the descriptor about the event.
1660 if (ff != 0) {
1661 pdispatcher->OnPreEvent(ff);
1662 pdispatcher->OnEvent(ff, errcode);
1663 }
1664 }
1665 }
1666
1667 if (static_cast<size_t>(n) == epoll_events_.size() &&
1668 epoll_events_.size() < kMaxEpollEvents) {
1669 // We used the complete space to receive events, increase size for future
1670 // iterations.
1671 epoll_events_.resize(std::max(epoll_events_.size() * 2, kMaxEpollEvents));
1672 }
1673
1674 if (cmsWait != kForever) {
1675 tvWait = TimeDiff(tvStop, TimeMillis());
1676 if (tvWait < 0) {
1677 // Return success on timeout.
1678 return true;
1679 }
1680 }
1681 }
1682
1683 return true;
1684 }
1685
1686 #endif // WEBRTC_USE_EPOLL
1687
1375 static void GlobalSignalHandler(int signum) { 1688 static void GlobalSignalHandler(int signum) {
1376 PosixSignalHandler::Instance()->OnPosixSignalReceived(signum); 1689 PosixSignalHandler::Instance()->OnPosixSignalReceived(signum);
1377 } 1690 }
1378 1691
1379 bool PhysicalSocketServer::SetPosixSignalHandler(int signum, 1692 bool PhysicalSocketServer::SetPosixSignalHandler(int signum,
1380 void (*handler)(int)) { 1693 void (*handler)(int)) {
1381 // If handler is SIG_IGN or SIG_DFL then clear our user-level handler, 1694 // If handler is SIG_IGN or SIG_DFL then clear our user-level handler,
1382 // otherwise set one. 1695 // otherwise set one.
1383 if (handler == SIG_IGN || handler == SIG_DFL) { 1696 if (handler == SIG_IGN || handler == SIG_DFL) {
1384 if (!InstallSignal(signum, handler)) { 1697 if (!InstallSignal(signum, handler)) {
(...skipping 201 matching lines...) Expand 10 before | Expand all | Expand 10 after
1586 break; 1899 break;
1587 } 1900 }
1588 } 1901 }
1589 1902
1590 // Done 1903 // Done
1591 return true; 1904 return true;
1592 } 1905 }
1593 #endif // WEBRTC_WIN 1906 #endif // WEBRTC_WIN
1594 1907
1595 } // namespace rtc 1908 } // namespace rtc
OLDNEW
« no previous file with comments | « webrtc/base/physicalsocketserver.h ('k') | no next file » | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698