Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(394)

Side by Side Diff: webrtc/base/socket_unittest.cc

Issue 1616153007: Stay writable after partial socket writes. (Closed) Base URL: https://chromium.googlesource.com/external/webrtc.git@master
Patch Set: More feedback. Created 4 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « webrtc/base/socket_unittest.h ('k') | no next file » | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 /* 1 /*
2 * Copyright 2007 The WebRTC Project Authors. All rights reserved. 2 * Copyright 2007 The WebRTC Project Authors. All rights reserved.
3 * 3 *
4 * Use of this source code is governed by a BSD-style license 4 * Use of this source code is governed by a BSD-style license
5 * that can be found in the LICENSE file in the root of the source 5 * that can be found in the LICENSE file in the root of the source
6 * tree. An additional intellectual property rights grant can be found 6 * tree. An additional intellectual property rights grant can be found
7 * in the file PATENTS. All contributing project authors may 7 * in the file PATENTS. All contributing project authors may
8 * be found in the AUTHORS file in the root of the source tree. 8 * be found in the AUTHORS file in the root of the source tree.
9 */ 9 */
10 10
11 #include "webrtc/base/socket_unittest.h" 11 #include "webrtc/base/socket_unittest.h"
12 12
13 #include "webrtc/base/arraysize.h" 13 #include "webrtc/base/arraysize.h"
14 #include "webrtc/base/buffer.h"
14 #include "webrtc/base/asyncudpsocket.h" 15 #include "webrtc/base/asyncudpsocket.h"
15 #include "webrtc/base/gunit.h" 16 #include "webrtc/base/gunit.h"
16 #include "webrtc/base/nethelpers.h" 17 #include "webrtc/base/nethelpers.h"
17 #include "webrtc/base/socketserver.h" 18 #include "webrtc/base/socketserver.h"
18 #include "webrtc/base/testclient.h" 19 #include "webrtc/base/testclient.h"
19 #include "webrtc/base/testutils.h" 20 #include "webrtc/base/testutils.h"
20 #include "webrtc/base/thread.h" 21 #include "webrtc/base/thread.h"
21 22
22 namespace rtc { 23 namespace rtc {
23 24
25 // Data size to be used in TcpInternal tests.
26 static const size_t kTcpInternalDataSize = 1024 * 1024; // bytes
27
24 #define MAYBE_SKIP_IPV6 \ 28 #define MAYBE_SKIP_IPV6 \
25 if (!HasIPv6Enabled()) { \ 29 if (!HasIPv6Enabled()) { \
26 LOG(LS_INFO) << "No IPv6... skipping"; \ 30 LOG(LS_INFO) << "No IPv6... skipping"; \
27 return; \ 31 return; \
28 } 32 }
29 33
30 34
31 void SocketTest::TestConnectIPv4() { 35 void SocketTest::TestConnectIPv4() {
32 ConnectInternal(kIPv4Loopback); 36 ConnectInternal(kIPv4Loopback);
33 } 37 }
(...skipping 88 matching lines...) Expand 10 before | Expand all | Expand 10 after
122 void SocketTest::TestSocketServerWaitIPv4() { 126 void SocketTest::TestSocketServerWaitIPv4() {
123 SocketServerWaitInternal(kIPv4Loopback); 127 SocketServerWaitInternal(kIPv4Loopback);
124 } 128 }
125 129
126 void SocketTest::TestSocketServerWaitIPv6() { 130 void SocketTest::TestSocketServerWaitIPv6() {
127 MAYBE_SKIP_IPV6; 131 MAYBE_SKIP_IPV6;
128 SocketServerWaitInternal(kIPv6Loopback); 132 SocketServerWaitInternal(kIPv6Loopback);
129 } 133 }
130 134
131 void SocketTest::TestTcpIPv4() { 135 void SocketTest::TestTcpIPv4() {
132 TcpInternal(kIPv4Loopback); 136 TcpInternal(kIPv4Loopback, kTcpInternalDataSize, -1);
133 } 137 }
134 138
135 void SocketTest::TestTcpIPv6() { 139 void SocketTest::TestTcpIPv6() {
136 MAYBE_SKIP_IPV6; 140 MAYBE_SKIP_IPV6;
137 TcpInternal(kIPv6Loopback); 141 TcpInternal(kIPv6Loopback, kTcpInternalDataSize, -1);
138 } 142 }
139 143
140 void SocketTest::TestSingleFlowControlCallbackIPv4() { 144 void SocketTest::TestSingleFlowControlCallbackIPv4() {
141 SingleFlowControlCallbackInternal(kIPv4Loopback); 145 SingleFlowControlCallbackInternal(kIPv4Loopback);
142 } 146 }
143 147
144 void SocketTest::TestSingleFlowControlCallbackIPv6() { 148 void SocketTest::TestSingleFlowControlCallbackIPv6() {
145 MAYBE_SKIP_IPV6; 149 MAYBE_SKIP_IPV6;
146 SingleFlowControlCallbackInternal(kIPv6Loopback); 150 SingleFlowControlCallbackInternal(kIPv6Loopback);
147 } 151 }
(...skipping 516 matching lines...) Expand 10 before | Expand all | Expand 10 after
664 Sleeper sleeper; 668 Sleeper sleeper;
665 TypedMessageData<AsyncSocket*> data(client.get()); 669 TypedMessageData<AsyncSocket*> data(client.get());
666 thread->Send(&sleeper, 0, &data); 670 thread->Send(&sleeper, 0, &data);
667 EXPECT_FALSE(sink.Check(accepted.get(), testing::SSE_READ)); 671 EXPECT_FALSE(sink.Check(accepted.get(), testing::SSE_READ));
668 672
669 // But should signal when process_io is true. 673 // But should signal when process_io is true.
670 EXPECT_TRUE_WAIT((sink.Check(accepted.get(), testing::SSE_READ)), kTimeout); 674 EXPECT_TRUE_WAIT((sink.Check(accepted.get(), testing::SSE_READ)), kTimeout);
671 EXPECT_LT(0, accepted->Recv(buf, 1024)); 675 EXPECT_LT(0, accepted->Recv(buf, 1024));
672 } 676 }
673 677
674 void SocketTest::TcpInternal(const IPAddress& loopback) { 678 void SocketTest::TcpInternal(const IPAddress& loopback, size_t data_size,
679 ssize_t max_send_size) {
675 testing::StreamSink sink; 680 testing::StreamSink sink;
676 SocketAddress accept_addr; 681 SocketAddress accept_addr;
677 682
678 // Create test data. 683 // Create receiving client.
679 const size_t kDataSize = 1024 * 1024; 684 scoped_ptr<AsyncSocket> receiver(
680 scoped_ptr<char[]> send_buffer(new char[kDataSize]);
681 scoped_ptr<char[]> recv_buffer(new char[kDataSize]);
682 size_t send_pos = 0, recv_pos = 0;
683 for (size_t i = 0; i < kDataSize; ++i) {
684 send_buffer[i] = static_cast<char>(i % 256);
685 recv_buffer[i] = 0;
686 }
687
688 // Create client.
689 scoped_ptr<AsyncSocket> client(
690 ss_->CreateAsyncSocket(loopback.family(), SOCK_STREAM)); 685 ss_->CreateAsyncSocket(loopback.family(), SOCK_STREAM));
691 sink.Monitor(client.get()); 686 sink.Monitor(receiver.get());
692 687
693 // Create server and listen. 688 // Create server and listen.
694 scoped_ptr<AsyncSocket> server( 689 scoped_ptr<AsyncSocket> server(
695 ss_->CreateAsyncSocket(loopback.family(), SOCK_STREAM)); 690 ss_->CreateAsyncSocket(loopback.family(), SOCK_STREAM));
696 sink.Monitor(server.get()); 691 sink.Monitor(server.get());
697 EXPECT_EQ(0, server->Bind(SocketAddress(loopback, 0))); 692 EXPECT_EQ(0, server->Bind(SocketAddress(loopback, 0)));
698 EXPECT_EQ(0, server->Listen(5)); 693 EXPECT_EQ(0, server->Listen(5));
699 694
700 // Attempt connection. 695 // Attempt connection.
701 EXPECT_EQ(0, client->Connect(server->GetLocalAddress())); 696 EXPECT_EQ(0, receiver->Connect(server->GetLocalAddress()));
702 697
703 // Accept connection. 698 // Accept connection which will be used for sending.
704 EXPECT_TRUE_WAIT((sink.Check(server.get(), testing::SSE_READ)), kTimeout); 699 EXPECT_TRUE_WAIT((sink.Check(server.get(), testing::SSE_READ)), kTimeout);
705 scoped_ptr<AsyncSocket> accepted(server->Accept(&accept_addr)); 700 scoped_ptr<AsyncSocket> sender(server->Accept(&accept_addr));
706 ASSERT_TRUE(accepted); 701 ASSERT_TRUE(sender);
707 sink.Monitor(accepted.get()); 702 sink.Monitor(sender.get());
708 703
709 // Both sides are now connected. 704 // Both sides are now connected.
710 EXPECT_EQ_WAIT(AsyncSocket::CS_CONNECTED, client->GetState(), kTimeout); 705 EXPECT_EQ_WAIT(AsyncSocket::CS_CONNECTED, receiver->GetState(), kTimeout);
711 EXPECT_TRUE(sink.Check(client.get(), testing::SSE_OPEN)); 706 EXPECT_TRUE(sink.Check(receiver.get(), testing::SSE_OPEN));
712 EXPECT_EQ(client->GetRemoteAddress(), accepted->GetLocalAddress()); 707 EXPECT_EQ(receiver->GetRemoteAddress(), sender->GetLocalAddress());
713 EXPECT_EQ(accepted->GetRemoteAddress(), client->GetLocalAddress()); 708 EXPECT_EQ(sender->GetRemoteAddress(), receiver->GetLocalAddress());
709
710 // Create test data.
711 rtc::Buffer send_buffer(0, data_size);
712 rtc::Buffer recv_buffer(0, data_size);
713 for (size_t i = 0; i < data_size; ++i) {
714 char ch = static_cast<char>(i % 256);
715 send_buffer.AppendData(&ch, sizeof(ch));
716 }
714 717
715 // Send and receive a bunch of data. 718 // Send and receive a bunch of data.
716 bool send_waiting_for_writability = false; 719 size_t sent_size = 0;
717 bool send_expect_success = true; 720 bool writable = true;
718 bool recv_waiting_for_readability = true; 721 bool send_called = false;
719 bool recv_expect_success = false; 722 bool readable = false;
720 int data_in_flight = 0; 723 bool recv_called = false;
721 while (recv_pos < kDataSize) { 724 while (recv_buffer.size() < send_buffer.size()) {
722 // Send as much as we can if we've been cleared to send. 725 // Send as much as we can while we're cleared to send.
723 while (!send_waiting_for_writability && send_pos < kDataSize) { 726 while (writable && sent_size < send_buffer.size()) {
724 int tosend = static_cast<int>(kDataSize - send_pos); 727 int unsent_size = static_cast<int>(send_buffer.size() - sent_size);
725 int sent = accepted->Send(send_buffer.get() + send_pos, tosend); 728 int sent = sender->Send(send_buffer.data() + sent_size, unsent_size);
726 if (send_expect_success) { 729 if (!send_called) {
727 // The first Send() after connecting or getting writability should 730 // The first Send() after connecting or getting writability should
728 // succeed and send some data. 731 // succeed and send some data.
729 EXPECT_GT(sent, 0); 732 EXPECT_GT(sent, 0);
730 send_expect_success = false; 733 send_called = true;
731 } 734 }
732 if (sent >= 0) { 735 if (sent >= 0) {
733 EXPECT_LE(sent, tosend); 736 EXPECT_LE(sent, unsent_size);
734 send_pos += sent; 737 sent_size += sent;
735 data_in_flight += sent; 738 if (max_send_size >= 0) {
739 EXPECT_LE(static_cast<ssize_t>(sent), max_send_size);
740 if (sent < unsent_size) {
741 // If max_send_size is limiting the amount to send per call such
742 // that the sent amount is less than the unsent amount, we simulate
743 // that the socket is no longer writable.
744 writable = false;
745 }
746 }
736 } else { 747 } else {
737 ASSERT_TRUE(accepted->IsBlocking()); 748 ASSERT_TRUE(sender->IsBlocking());
738 send_waiting_for_writability = true; 749 writable = false;
739 } 750 }
740 } 751 }
741 752
742 // Read all the sent data. 753 // Read all the sent data.
743 while (data_in_flight > 0) { 754 while (recv_buffer.size() < sent_size) {
744 if (recv_waiting_for_readability) { 755 if (!readable) {
745 // Wait until data is available. 756 // Wait until data is available.
746 EXPECT_TRUE_WAIT(sink.Check(client.get(), testing::SSE_READ), kTimeout); 757 EXPECT_TRUE_WAIT(sink.Check(receiver.get(), testing::SSE_READ),
747 recv_waiting_for_readability = false; 758 kTimeout);
748 recv_expect_success = true; 759 readable = true;
760 recv_called = false;
749 } 761 }
750 762
751 // Receive as much as we can get in a single recv call. 763 // Receive as much as we can get in a single recv call.
752 int rcvd = client->Recv(recv_buffer.get() + recv_pos, 764 char recved_data[data_size];
753 kDataSize - recv_pos); 765 int recved_size = receiver->Recv(recved_data, data_size);
754 766
755 if (recv_expect_success) { 767 if (!recv_called) {
756 // The first Recv() after getting readability should succeed and receive 768 // The first Recv() after getting readability should succeed and receive
757 // some data. 769 // some data.
758 // TODO: The following line is disabled due to flakey pulse 770 // TODO: The following line is disabled due to flakey pulse
759 // builds. Re-enable if/when possible. 771 // builds. Re-enable if/when possible.
760 // EXPECT_GT(rcvd, 0); 772 // EXPECT_GT(recved_size, 0);
761 recv_expect_success = false; 773 recv_called = true;
762 } 774 }
763 if (rcvd >= 0) { 775 if (recved_size >= 0) {
764 EXPECT_LE(rcvd, data_in_flight); 776 EXPECT_LE(static_cast<size_t>(recved_size),
765 recv_pos += rcvd; 777 sent_size - recv_buffer.size());
766 data_in_flight -= rcvd; 778 recv_buffer.AppendData(recved_data, recved_size);
767 } else { 779 } else {
768 ASSERT_TRUE(client->IsBlocking()); 780 ASSERT_TRUE(receiver->IsBlocking());
769 recv_waiting_for_readability = true; 781 readable = false;
770 } 782 }
771 } 783 }
772 784
773 // Once all that we've sent has been rcvd, expect to be able to send again. 785 // Once all that we've sent has been received, expect to be able to send
774 if (send_waiting_for_writability) { 786 // again.
775 EXPECT_TRUE_WAIT(sink.Check(accepted.get(), testing::SSE_WRITE), 787 if (!writable) {
788 EXPECT_TRUE_WAIT(sink.Check(sender.get(), testing::SSE_WRITE),
776 kTimeout); 789 kTimeout);
777 send_waiting_for_writability = false; 790 writable = true;
778 send_expect_success = true; 791 send_called = false;
779 } 792 }
780 } 793 }
781 794
782 // The received data matches the sent data. 795 // The received data matches the sent data.
783 EXPECT_EQ(kDataSize, send_pos); 796 EXPECT_EQ(data_size, sent_size);
784 EXPECT_EQ(kDataSize, recv_pos); 797 EXPECT_EQ(data_size, recv_buffer.size());
785 EXPECT_EQ(0, memcmp(recv_buffer.get(), send_buffer.get(), kDataSize)); 798 EXPECT_EQ(recv_buffer, send_buffer);
786 799
787 // Close down. 800 // Close down.
788 accepted->Close(); 801 sender->Close();
789 EXPECT_EQ_WAIT(AsyncSocket::CS_CLOSED, client->GetState(), kTimeout); 802 EXPECT_EQ_WAIT(AsyncSocket::CS_CLOSED, receiver->GetState(), kTimeout);
790 EXPECT_TRUE(sink.Check(client.get(), testing::SSE_CLOSE)); 803 EXPECT_TRUE(sink.Check(receiver.get(), testing::SSE_CLOSE));
791 client->Close(); 804 receiver->Close();
792 } 805 }
793 806
794 void SocketTest::SingleFlowControlCallbackInternal(const IPAddress& loopback) { 807 void SocketTest::SingleFlowControlCallbackInternal(const IPAddress& loopback) {
795 testing::StreamSink sink; 808 testing::StreamSink sink;
796 SocketAddress accept_addr; 809 SocketAddress accept_addr;
797 810
798 // Create client. 811 // Create client.
799 scoped_ptr<AsyncSocket> client( 812 scoped_ptr<AsyncSocket> client(
800 ss_->CreateAsyncSocket(loopback.family(), SOCK_STREAM)); 813 ss_->CreateAsyncSocket(loopback.family(), SOCK_STREAM));
801 sink.Monitor(client.get()); 814 sink.Monitor(client.get());
(...skipping 202 matching lines...) Expand 10 before | Expand all | Expand 10 after
1004 ASSERT_EQ(-1, mtu_socket->EstimateMTU(&mtu)); 1017 ASSERT_EQ(-1, mtu_socket->EstimateMTU(&mtu));
1005 #else 1018 #else
1006 // and the behavior seems unpredictable on Linux, 1019 // and the behavior seems unpredictable on Linux,
1007 // failing on the build machine 1020 // failing on the build machine
1008 // but succeeding on my Ubiquity instance. 1021 // but succeeding on my Ubiquity instance.
1009 #endif 1022 #endif
1010 } 1023 }
1011 } 1024 }
1012 1025
1013 } // namespace rtc 1026 } // namespace rtc
OLDNEW
« no previous file with comments | « webrtc/base/socket_unittest.h ('k') | no next file » | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698