Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(305)

Side by Side Diff: webrtc/base/socket_unittest.cc

Issue 1616153007: Stay writable after partial socket writes. (Closed) Base URL: https://chromium.googlesource.com/external/webrtc.git@master
Patch Set: Feedback from Peter. Created 4 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « webrtc/base/socket_unittest.h ('k') | no next file » | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 /* 1 /*
2 * Copyright 2007 The WebRTC Project Authors. All rights reserved. 2 * Copyright 2007 The WebRTC Project Authors. All rights reserved.
3 * 3 *
4 * Use of this source code is governed by a BSD-style license 4 * Use of this source code is governed by a BSD-style license
5 * that can be found in the LICENSE file in the root of the source 5 * that can be found in the LICENSE file in the root of the source
6 * tree. An additional intellectual property rights grant can be found 6 * tree. An additional intellectual property rights grant can be found
7 * in the file PATENTS. All contributing project authors may 7 * in the file PATENTS. All contributing project authors may
8 * be found in the AUTHORS file in the root of the source tree. 8 * be found in the AUTHORS file in the root of the source tree.
9 */ 9 */
10 10
11 #include "webrtc/base/socket_unittest.h" 11 #include "webrtc/base/socket_unittest.h"
12 12
13 #include "webrtc/base/arraysize.h" 13 #include "webrtc/base/arraysize.h"
14 #include "webrtc/base/buffer.h"
14 #include "webrtc/base/asyncudpsocket.h" 15 #include "webrtc/base/asyncudpsocket.h"
15 #include "webrtc/base/gunit.h" 16 #include "webrtc/base/gunit.h"
16 #include "webrtc/base/nethelpers.h" 17 #include "webrtc/base/nethelpers.h"
17 #include "webrtc/base/socketserver.h" 18 #include "webrtc/base/socketserver.h"
18 #include "webrtc/base/testclient.h" 19 #include "webrtc/base/testclient.h"
19 #include "webrtc/base/testutils.h" 20 #include "webrtc/base/testutils.h"
20 #include "webrtc/base/thread.h" 21 #include "webrtc/base/thread.h"
21 22
22 namespace rtc { 23 namespace rtc {
23 24
(...skipping 640 matching lines...) Expand 10 before | Expand all | Expand 10 after
664 Sleeper sleeper; 665 Sleeper sleeper;
665 TypedMessageData<AsyncSocket*> data(client.get()); 666 TypedMessageData<AsyncSocket*> data(client.get());
666 thread->Send(&sleeper, 0, &data); 667 thread->Send(&sleeper, 0, &data);
667 EXPECT_FALSE(sink.Check(accepted.get(), testing::SSE_READ)); 668 EXPECT_FALSE(sink.Check(accepted.get(), testing::SSE_READ));
668 669
669 // But should signal when process_io is true. 670 // But should signal when process_io is true.
670 EXPECT_TRUE_WAIT((sink.Check(accepted.get(), testing::SSE_READ)), kTimeout); 671 EXPECT_TRUE_WAIT((sink.Check(accepted.get(), testing::SSE_READ)), kTimeout);
671 EXPECT_LT(0, accepted->Recv(buf, 1024)); 672 EXPECT_LT(0, accepted->Recv(buf, 1024));
672 } 673 }
673 674
674 void SocketTest::TcpInternal(const IPAddress& loopback) { 675 void SocketTest::TcpInternal(const IPAddress& loopback, size_t data_size,
676 size_t check_max_sent_size) {
675 testing::StreamSink sink; 677 testing::StreamSink sink;
676 SocketAddress accept_addr; 678 SocketAddress accept_addr;
677 679
678 // Create test data. 680 // Create receiving client.
679 const size_t kDataSize = 1024 * 1024; 681 scoped_ptr<AsyncSocket> receiver(
680 scoped_ptr<char[]> send_buffer(new char[kDataSize]);
681 scoped_ptr<char[]> recv_buffer(new char[kDataSize]);
682 size_t send_pos = 0, recv_pos = 0;
683 for (size_t i = 0; i < kDataSize; ++i) {
684 send_buffer[i] = static_cast<char>(i % 256);
685 recv_buffer[i] = 0;
686 }
687
688 // Create client.
689 scoped_ptr<AsyncSocket> client(
690 ss_->CreateAsyncSocket(loopback.family(), SOCK_STREAM)); 682 ss_->CreateAsyncSocket(loopback.family(), SOCK_STREAM));
691 sink.Monitor(client.get()); 683 sink.Monitor(receiver.get());
692 684
693 // Create server and listen. 685 // Create server and listen.
694 scoped_ptr<AsyncSocket> server( 686 scoped_ptr<AsyncSocket> server(
695 ss_->CreateAsyncSocket(loopback.family(), SOCK_STREAM)); 687 ss_->CreateAsyncSocket(loopback.family(), SOCK_STREAM));
696 sink.Monitor(server.get()); 688 sink.Monitor(server.get());
697 EXPECT_EQ(0, server->Bind(SocketAddress(loopback, 0))); 689 EXPECT_EQ(0, server->Bind(SocketAddress(loopback, 0)));
698 EXPECT_EQ(0, server->Listen(5)); 690 EXPECT_EQ(0, server->Listen(5));
699 691
700 // Attempt connection. 692 // Attempt connection.
701 EXPECT_EQ(0, client->Connect(server->GetLocalAddress())); 693 EXPECT_EQ(0, receiver->Connect(server->GetLocalAddress()));
702 694
703 // Accept connection. 695 // Accept connection which will be used for sending.
704 EXPECT_TRUE_WAIT((sink.Check(server.get(), testing::SSE_READ)), kTimeout); 696 EXPECT_TRUE_WAIT((sink.Check(server.get(), testing::SSE_READ)), kTimeout);
705 scoped_ptr<AsyncSocket> accepted(server->Accept(&accept_addr)); 697 scoped_ptr<AsyncSocket> sender(server->Accept(&accept_addr));
706 ASSERT_TRUE(accepted); 698 ASSERT_TRUE(sender);
707 sink.Monitor(accepted.get()); 699 sink.Monitor(sender.get());
708 700
709 // Both sides are now connected. 701 // Both sides are now connected.
710 EXPECT_EQ_WAIT(AsyncSocket::CS_CONNECTED, client->GetState(), kTimeout); 702 EXPECT_EQ_WAIT(AsyncSocket::CS_CONNECTED, receiver->GetState(), kTimeout);
711 EXPECT_TRUE(sink.Check(client.get(), testing::SSE_OPEN)); 703 EXPECT_TRUE(sink.Check(receiver.get(), testing::SSE_OPEN));
712 EXPECT_EQ(client->GetRemoteAddress(), accepted->GetLocalAddress()); 704 EXPECT_EQ(receiver->GetRemoteAddress(), sender->GetLocalAddress());
713 EXPECT_EQ(accepted->GetRemoteAddress(), client->GetLocalAddress()); 705 EXPECT_EQ(sender->GetRemoteAddress(), receiver->GetLocalAddress());
706
707 // Create test data.
708 rtc::Buffer send_buffer(0, data_size);
709 rtc::Buffer recv_buffer(0, data_size);
710 for (size_t i = 0; i < data_size; ++i) {
711 char ch = static_cast<char>(i % 256);
712 send_buffer.AppendData(&ch, sizeof(ch));
713 }
714 714
715 // Send and receive a bunch of data. 715 // Send and receive a bunch of data.
716 bool send_waiting_for_writability = false; 716 size_t sent_size = 0;
717 bool send_expect_success = true; 717 bool writable = true;
718 bool recv_waiting_for_readability = true; 718 bool send_called = false;
719 bool recv_expect_success = false; 719 bool readable = false;
720 int data_in_flight = 0; 720 bool recv_called = false;
721 while (recv_pos < kDataSize) { 721 while (recv_buffer.size() < send_buffer.size()) {
722 // Send as much as we can if we've been cleared to send. 722 // Send as much as we can while we're cleared to send.
723 while (!send_waiting_for_writability && send_pos < kDataSize) { 723 while (writable && sent_size < send_buffer.size()) {
724 int tosend = static_cast<int>(kDataSize - send_pos); 724 int unsent_count = static_cast<int>(send_buffer.size() - sent_size);
725 int sent = accepted->Send(send_buffer.get() + send_pos, tosend); 725 int sent = sender->Send(send_buffer.data() + sent_size, unsent_count);
726 if (send_expect_success) { 726 if (!send_called) {
727 // The first Send() after connecting or getting writability should 727 // The first Send() after connecting or getting writability should
728 // succeed and send some data. 728 // succeed and send some data.
729 EXPECT_GT(sent, 0); 729 EXPECT_GT(sent, 0);
730 send_expect_success = false; 730 send_called = true;
731 } 731 }
732 if (sent >= 0) { 732 if (sent >= 0) {
733 EXPECT_LE(sent, tosend); 733 EXPECT_LE(sent, unsent_count);
734 send_pos += sent; 734 EXPECT_LE(static_cast<size_t>(sent), check_max_sent_size);
735 data_in_flight += sent; 735 sent_size += sent;
736 } else { 736 }
737 ASSERT_TRUE(accepted->IsBlocking()); 737 if (sent < unsent_count) {
738 send_waiting_for_writability = true; 738 ASSERT_TRUE(sender->IsBlocking());
739 writable = false;
739 } 740 }
740 } 741 }
741 742
742 // Read all the sent data. 743 // Read all the sent data.
743 while (data_in_flight > 0) { 744 while (recv_buffer.size() < sent_size) {
744 if (recv_waiting_for_readability) { 745 if (!readable) {
745 // Wait until data is available. 746 // Wait until data is available.
746 EXPECT_TRUE_WAIT(sink.Check(client.get(), testing::SSE_READ), kTimeout); 747 EXPECT_TRUE_WAIT(sink.Check(receiver.get(), testing::SSE_READ),
747 recv_waiting_for_readability = false; 748 kTimeout);
748 recv_expect_success = true; 749 readable = true;
750 recv_called = false;
749 } 751 }
750 752
751 // Receive as much as we can get in a single recv call. 753 // Receive as much as we can get in a single recv call.
752 int rcvd = client->Recv(recv_buffer.get() + recv_pos, 754 char recved_data[data_size];
753 kDataSize - recv_pos); 755 int recved_size = receiver->Recv(recved_data, data_size);
754 756
755 if (recv_expect_success) { 757 if (!recv_called) {
756 // The first Recv() after getting readability should succeed and receive 758 // The first Recv() after getting readability should succeed and receive
757 // some data. 759 // some data.
758 // TODO: The following line is disabled due to flakey pulse 760 // TODO: The following line is disabled due to flakey pulse
759 // builds. Re-enable if/when possible. 761 // builds. Re-enable if/when possible.
760 // EXPECT_GT(rcvd, 0); 762 // EXPECT_GT(recved_size, 0);
761 recv_expect_success = false; 763 recv_called = true;
762 } 764 }
763 if (rcvd >= 0) { 765 if (recved_size >= 0) {
764 EXPECT_LE(rcvd, data_in_flight); 766 EXPECT_LE(static_cast<size_t>(recved_size),
765 recv_pos += rcvd; 767 sent_size - recv_buffer.size());
766 data_in_flight -= rcvd; 768 recv_buffer.AppendData(recved_data, recved_size);
767 } else { 769 } else {
768 ASSERT_TRUE(client->IsBlocking()); 770 ASSERT_TRUE(receiver->IsBlocking());
769 recv_waiting_for_readability = true; 771 readable = false;
770 } 772 }
771 } 773 }
772 774
773 // Once all that we've sent has been rcvd, expect to be able to send again. 775 // Once all that we've sent has been received, expect to be able to send
774 if (send_waiting_for_writability) { 776 // again.
775 EXPECT_TRUE_WAIT(sink.Check(accepted.get(), testing::SSE_WRITE), 777 if (!writable) {
778 EXPECT_TRUE_WAIT(sink.Check(sender.get(), testing::SSE_WRITE),
776 kTimeout); 779 kTimeout);
777 send_waiting_for_writability = false; 780 writable = true;
778 send_expect_success = true; 781 send_called = false;
779 } 782 }
780 } 783 }
781 784
782 // The received data matches the sent data. 785 // The received data matches the sent data.
783 EXPECT_EQ(kDataSize, send_pos); 786 EXPECT_EQ(data_size, sent_size);
784 EXPECT_EQ(kDataSize, recv_pos); 787 EXPECT_EQ(data_size, recv_buffer.size());
785 EXPECT_EQ(0, memcmp(recv_buffer.get(), send_buffer.get(), kDataSize)); 788 EXPECT_EQ(0, memcmp(recv_buffer.data(), send_buffer.data(), data_size));
pthatcher1 2016/02/01 20:27:13 Doesn't EXPECT_EQ(recv_buffer, send_buffer) work?
joachim 2016/02/01 21:39:53 You're right, buffers implement the "==" operator.
786 789
787 // Close down. 790 // Close down.
788 accepted->Close(); 791 sender->Close();
789 EXPECT_EQ_WAIT(AsyncSocket::CS_CLOSED, client->GetState(), kTimeout); 792 EXPECT_EQ_WAIT(AsyncSocket::CS_CLOSED, receiver->GetState(), kTimeout);
790 EXPECT_TRUE(sink.Check(client.get(), testing::SSE_CLOSE)); 793 EXPECT_TRUE(sink.Check(receiver.get(), testing::SSE_CLOSE));
791 client->Close(); 794 receiver->Close();
792 } 795 }
793 796
794 void SocketTest::SingleFlowControlCallbackInternal(const IPAddress& loopback) { 797 void SocketTest::SingleFlowControlCallbackInternal(const IPAddress& loopback) {
795 testing::StreamSink sink; 798 testing::StreamSink sink;
796 SocketAddress accept_addr; 799 SocketAddress accept_addr;
797 800
798 // Create client. 801 // Create client.
799 scoped_ptr<AsyncSocket> client( 802 scoped_ptr<AsyncSocket> client(
800 ss_->CreateAsyncSocket(loopback.family(), SOCK_STREAM)); 803 ss_->CreateAsyncSocket(loopback.family(), SOCK_STREAM));
801 sink.Monitor(client.get()); 804 sink.Monitor(client.get());
(...skipping 202 matching lines...) Expand 10 before | Expand all | Expand 10 after
1004 ASSERT_EQ(-1, mtu_socket->EstimateMTU(&mtu)); 1007 ASSERT_EQ(-1, mtu_socket->EstimateMTU(&mtu));
1005 #else 1008 #else
1006 // and the behavior seems unpredictable on Linux, 1009 // and the behavior seems unpredictable on Linux,
1007 // failing on the build machine 1010 // failing on the build machine
1008 // but succeeding on my Ubiquity instance. 1011 // but succeeding on my Ubiquity instance.
1009 #endif 1012 #endif
1010 } 1013 }
1011 } 1014 }
1012 1015
1013 } // namespace rtc 1016 } // namespace rtc
OLDNEW
« no previous file with comments | « webrtc/base/socket_unittest.h ('k') | no next file » | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698