Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(181)

Unified Diff: webrtc/api/quicdatachannel.cc

Issue 1844803002: Modify PeerConnection for end-to-end QuicDataChannel usage (Closed) Base URL: https://chromium.googlesource.com/external/webrtc.git@master
Patch Set: Created 4 years, 9 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: webrtc/api/quicdatachannel.cc
diff --git a/webrtc/api/quicdatachannel.cc b/webrtc/api/quicdatachannel.cc
new file mode 100644
index 0000000000000000000000000000000000000000..32984b5746a930c208ad79a99364dae86596800e
--- /dev/null
+++ b/webrtc/api/quicdatachannel.cc
@@ -0,0 +1,255 @@
+/*
+ * Copyright 2016 The WebRTC project authors. All Rights Reserved.
+ *
+ * Use of this source code is governed by a BSD-style license
+ * that can be found in the LICENSE file in the root of the source
+ * tree. An additional intellectual property rights grant can be found
+ * in the file PATENTS. All contributing project authors may
+ * be found in the AUTHORS file in the root of the source tree.
+ */
+
+#include "webrtc/api/quicdatachannel.h"
+
+#include "webrtc/base/bind.h"
+#include "webrtc/base/bytebuffer.h"
+#include "webrtc/base/logging.h"
+#include "webrtc/p2p/quic/quictransportchannel.h"
+#include "webrtc/p2p/quic/reliablequicstream.h"
+
+namespace webrtc {
+
+QuicDataChannel::QuicDataChannel(
+ cricket::QuicTransportChannel* quic_transport_channel,
+ rtc::Thread* signaling_thread,
+ rtc::Thread* worker_thread,
+ const std::string& label,
+ const DataChannelInit* config)
Taylor Brandstetter 2016/04/01 23:23:41 What should happen if the data channel init is som
mikescarlett 2016/04/05 19:58:50 By error do you mean something like "RTC_DCHECK(co
Taylor Brandstetter 2016/04/05 22:31:17 It definitely shouldn't crash, but the API should
+ : quic_transport_channel_(quic_transport_channel),
+ signaling_thread_(signaling_thread),
+ worker_thread_(worker_thread),
+ observer_(nullptr),
+ id_(config->id),
+ state_(quic_transport_channel_->quic_state() ==
+ cricket::QUIC_TRANSPORT_CONNECTED
+ ? kOpen
+ : kConnecting),
+ buffered_amount_(0),
+ num_sent_messages_(0),
+ label_(label),
+ protocol_(config->protocol),
+ send_open_message_(false) {
+ quic_transport_channel_->SignalWritableState.connect(
+ this, &QuicDataChannel::OnWritableState);
+ quic_transport_channel_->SignalClosed.connect(
+ this, &QuicDataChannel::OnConnectionClosed);
+ if (state_ == kOpen && send_open_message_) {
Taylor Brandstetter 2016/04/01 23:23:41 If send_open_message_ is always false I'd just rem
mikescarlett 2016/04/05 19:58:49 Removed.
+ worker_thread_->Invoke<void>(
+ rtc::Bind(&QuicDataChannel::SendOpenMessage_w, this));
+ }
+}
+
+QuicDataChannel::~QuicDataChannel() {}
+
+void QuicDataChannel::RegisterObserver(DataChannelObserver* observer) {
+ RTC_DCHECK(signaling_thread_->IsCurrent());
+ observer_ = observer;
+}
+void QuicDataChannel::UnregisterObserver() {
+ RTC_DCHECK(signaling_thread_->IsCurrent());
+ observer_ = nullptr;
+}
+
+bool QuicDataChannel::Send(const DataBuffer& buffer) {
+ RTC_DCHECK(signaling_thread_->IsCurrent());
+ return worker_thread_->Invoke<bool>(
+ rtc::Bind(&QuicDataChannel::Send_w, this, buffer));
+}
+
+bool QuicDataChannel::Send_w(const DataBuffer& buffer) {
+ RTC_DCHECK(worker_thread_->IsCurrent());
+ if (state_ != kOpen) {
+ LOG(LS_ERROR) << "QUIC data channel " << id_
+ << " is not open so cannot send.";
+ return false;
+ }
+ if (buffer.size() == 0) {
+ LOG(LS_WARNING) << "QUIC data channel " << id_
+ << " refuses to send an empty message.";
+ return true;
+ }
+ size_t max_length = buffer.size() + 15;
+ rtc::ByteBuffer byte_buffer(nullptr, max_length,
+ rtc::ByteBuffer::ByteOrder::ORDER_HOST);
+ byte_buffer.WriteVarint(id_);
+ byte_buffer.WriteVarint(++num_sent_messages_);
+ byte_buffer.WriteBytes(buffer.data.data<char>(), buffer.size());
+ return WriteData_w(byte_buffer.Data(), byte_buffer.Length());
+}
+
+bool QuicDataChannel::WriteData_w(const char* data, size_t len) {
+ RTC_DCHECK(worker_thread_->IsCurrent());
+ cricket::ReliableQuicStream* stream =
+ quic_transport_channel_->CreateQuicStream();
+ // The stream should not be NULL. Otherwise an internal error is preventing
+ // QUIC streams from being created or the data channel state is invalid.
Taylor Brandstetter 2016/04/01 23:23:41 If this can only occur due to an internal error, w
mikescarlett 2016/04/05 19:58:50 I agree now. A minor issue is this will crash if a
+ if (stream == nullptr) {
+ LOG(LS_ERROR) << "QUIC data channel " << id_
+ << " is open but failed to allocate a QUIC stream.";
+ return false;
+ }
+ // Send the message with FIN == true, which signals to the remote peer that
+ // there is no more data after this message.
+ rtc::StreamResult result = stream->Write(data, len, true);
+ if (result == rtc::SR_SUCCESS) {
+ // The message is sent and we don't need this QUIC stream.
+ LOG(INFO) << "Stream " << stream->id()
+ << " successfully wrote data for QUIC data channel " << id_;
+ stream->Close();
+ return true;
+ } else if (result == rtc::SR_BLOCK) {
+ // The QUIC stream is write blocked, so the message will be queued by the
+ // QUIC session. It might be due to the QUIC transport not being writable,
+ // or it may be due to exceeding the QUIC flow control limit.
+ LOG(LS_WARNING) << "Stream " << stream->id()
+ << " is write blocked for QUIC data channel " << id_;
+ if (observer_ != nullptr) {
+ observer_->OnBufferedAmountChange(buffered_amount_);
+ buffered_amount_ += len;
Taylor Brandstetter 2016/04/01 23:23:41 I don't see buffered_amount_ ever decrease. Also w
mikescarlett 2016/04/05 19:58:49 I implemented this by adding a new ReliableQuicStr
+ }
+ }
+ return false;
Taylor Brandstetter 2016/04/01 23:23:41 If the data is buffered successfully, this method
mikescarlett 2016/04/05 19:58:50 Okay buffering returns true now.
+}
+
+void QuicDataChannel::SendOpenMessage_w() {
+ RTC_DCHECK(worker_thread_->IsCurrent());
+ // Not implemented.
+}
+
+void QuicDataChannel::Close() {
+ RTC_DCHECK(signaling_thread_->IsCurrent());
+ if (state_ == kClosed) {
+ return;
+ }
+ LOG(LS_INFO) << "Closing QUIC data channel.";
+ SetState(kClosing);
+ for (auto& kv : incoming_quic_streams_) {
+ cricket::ReliableQuicStream* stream = kv.second;
+ stream->Close();
+ }
+ queued_data_.clear();
+ SetState(kClosed);
+}
+
+void QuicDataChannel::OnStreamClosed(net::QuicStreamId stream_id, int error) {
+ RTC_DCHECK(worker_thread_->IsCurrent());
+ LOG(LS_VERBOSE) << "Stream " << stream_id << " is closed.";
+ incoming_quic_streams_.erase(stream_id);
+}
+
+void QuicDataChannel::OnIncomingStream(cricket::ReliableQuicStream* stream,
+ rtc::ByteBuffer* remaining_bytes) {
pthatcher1 2016/03/30 20:34:48 The order should be remaining_bytes, stream, since
mikescarlett 2016/04/05 19:58:50 Done.
+ RTC_DCHECK(worker_thread_->IsCurrent());
+ // The data channel ID has already been read from the byte buffer, so we can
+ // skip to the message id.
+ uint64_t message_id;
+ remaining_bytes->ReadVarint(&message_id);
Taylor Brandstetter 2016/04/01 23:23:41 Should probably handle failures of ReadVarint.
mikescarlett 2016/04/05 19:58:49 Done. ReadVarint is in QuicDataTransport now.
+
+ if (stream->fin_received()) {
+ LOG(LS_INFO) << "Stream " << stream->id()
+ << " has finished receiving data for QUIC data channel "
+ << id_;
+ // A FIN is received if the message fits into a single QUIC stream frame and
+ // the remote peer is done sending. In this case, propagate the data to the
+ // observer and close the stream.
+ if (observer_ != nullptr) {
+ DataBuffer message(rtc::CopyOnWriteBuffer(remaining_bytes->Data(),
+ remaining_bytes->Length()),
+ false);
+ signaling_thread_->Invoke<void>(
pthatcher1 2016/03/30 20:34:48 These needs to be an AsyncInvoke
mikescarlett 2016/04/05 19:58:49 Done.
+ rtc::Bind(&QuicDataChannel::OnMessage_s, this, message));
+ OnMessage_s(message);
pthatcher1 2016/03/30 20:34:48 Why do we hop to the signalling thread and then ca
mikescarlett 2016/04/05 19:58:50 That clearly shouldn't be there. Fixed.
+ } else {
+ LOG(LS_WARNING) << "QUIC data channel " << id_
+ << " received a message but has no observer.";
+ }
+ stream->Close();
+ } else {
+ // Otherwise the message is divided across multiple QUIC stream frames, so
+ // queue the data. OnDataReceived() will be called each time the remaining
+ // QUIC stream frames arrive.
+ LOG(LS_INFO) << "QUIC data channel " << id_
+ << " is queuing incoming data for stream " << stream->id();
+ incoming_quic_streams_[stream->id()] = stream;
+ rtc::CopyOnWriteBuffer& received_data = queued_data_[stream->id()];
pthatcher1 2016/03/30 20:34:48 When does the buffer get added to the queud_data_?
pthatcher1 2016/03/30 20:34:48 I think we just want a normal rtc::Buffer, not an
Taylor Brandstetter 2016/04/01 23:23:41 Do you mean, "when does the object get constructed
mikescarlett 2016/04/05 19:58:49 I changed it to not use operator[] to create the b
mikescarlett 2016/04/05 19:58:49 It needs to be rtc::CopyOnWriteBuffer due to this
+ received_data.AppendData(remaining_bytes->Data(),
+ remaining_bytes->Length());
+ stream->SignalDataReceived.connect(this, &QuicDataChannel::OnDataReceived);
+ stream->SignalClosed.connect(this, &QuicDataChannel::OnStreamClosed);
+ }
pthatcher1 2016/03/30 20:34:48 Can you use an early return in this code? Perhaps
mikescarlett 2016/04/05 19:58:49 Done.
+}
+
+void QuicDataChannel::OnDataReceived(net::QuicStreamId stream_id,
+ const char* data,
+ size_t len) {
+ RTC_DCHECK(worker_thread_->IsCurrent());
+ cricket::ReliableQuicStream* stream = incoming_quic_streams_[stream_id];
+ // True if the QUIC stream has reeived a FIN, which indicates the remote
+ // peer is finished receiving data.
+ bool finished = stream->fin_received();
+ // Lookup or create an rtc::CopyOnWriteBuffer.
+ rtc::CopyOnWriteBuffer& received_data = queued_data_[stream_id];
pthatcher1 2016/03/30 20:34:48 We should use .find() and check to see if it's in
Taylor Brandstetter 2016/04/01 23:23:41 It always *should* be. So if you do that, put it i
mikescarlett 2016/04/05 19:58:49 I put an RTC_DCHECK. OnDataReceived is only called
+
+ if (finished) {
+ LOG(LS_INFO) << "Stream " << stream_id
+ << " has finished receiving data for QUIC data channel "
+ << id_;
+ received_data.AppendData(data, len);
+ DataBuffer message(received_data, false);
pthatcher1 2016/03/30 20:34:48 We should probably use a map of rtc::Buffer* so th
mikescarlett 2016/04/05 19:58:49 What about using std::move to avoid copying for Da
+ signaling_thread_->Invoke<void>(
+ rtc::Bind(&QuicDataChannel::OnMessage_s, this, message));
+ OnMessage_s(message);
Taylor Brandstetter 2016/04/01 23:23:41 Again, OnMessage_s shouldn't be called twice.
mikescarlett 2016/04/05 19:58:49 Removed.
+ queued_data_.erase(stream_id);
+ stream->Close();
+ } else {
+ received_data.AppendData(data, len);
+ }
pthatcher1 2016/03/30 20:34:48 Same here with early returns. Something like this
mikescarlett 2016/04/05 19:58:49 Done.
+}
pthatcher1 2016/03/30 20:34:48 Now that I see this more completely, I think that
Taylor Brandstetter 2016/04/01 23:23:41 +1
mikescarlett 2016/04/05 19:58:49 I revised it. QuicDataChannelMessage is a struct a
+
+void QuicDataChannel::OnWritableState(cricket::TransportChannel* channel) {
+ RTC_DCHECK(worker_thread_->IsCurrent());
+ RTC_DCHECK(channel == quic_transport_channel_);
+ state_ = kOpen;
+ if (send_open_message_) {
+ SendOpenMessage_w();
pthatcher1 2016/03/30 20:34:48 I though we weren't doing the open message in this
mikescarlett 2016/04/05 19:58:49 Right I'm not implementing it. Fixed
+ }
+}
+
+void QuicDataChannel::OnConnectionClosed() {
+ RTC_DCHECK(worker_thread_->IsCurrent());
+ SetState(kClosed);
+}
+
+void QuicDataChannel::OnMessage_s(const DataBuffer& received_data) {
+ RTC_DCHECK(signaling_thread_->IsCurrent());
+ if (observer_ != nullptr) {
Taylor Brandstetter 2016/04/01 23:23:41 If it's OnMessage_s that checks for an observer, y
mikescarlett 2016/04/05 19:58:49 Replaced with RTC_DCHECK.
+ observer_->OnMessage(received_data);
+ } else {
+ LOG(LS_WARNING) << "QUIC data channel " << id_
+ << " received a message but has no observer.";
+ }
pthatcher1 2016/03/30 20:34:48 Early return here please.
mikescarlett 2016/04/05 19:58:49 Done.
+}
+
+void QuicDataChannel::SetState(DataState state) {
+ if (state_ == state) {
+ return;
+ }
+ state_ = state;
+ if (observer_) {
+ observer_->OnStateChange();
+ }
+ if (state_ == kClosed) {
+ SignalClosed(this);
+ }
+}
+
+} // namespace webrtc

Powered by Google App Engine
This is Rietveld 408576698