Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(433)

Side by Side Diff: webrtc/api/quicdatachannel.cc

Issue 2146133002: Revert of Modify PeerConnection for end-to-end QuicDataChannel usage (Closed) Base URL: https://chromium.googlesource.com/external/webrtc.git@master
Patch Set: Created 4 years, 5 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « webrtc/api/quicdatachannel.h ('k') | webrtc/api/quicdatachannel_unittest.cc » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 /* 1 /*
2 * Copyright 2016 The WebRTC project authors. All Rights Reserved. 2 * Copyright 2016 The WebRTC project authors. All Rights Reserved.
3 * 3 *
4 * Use of this source code is governed by a BSD-style license 4 * Use of this source code is governed by a BSD-style license
5 * that can be found in the LICENSE file in the root of the source 5 * that can be found in the LICENSE file in the root of the source
6 * tree. An additional intellectual property rights grant can be found 6 * tree. An additional intellectual property rights grant can be found
7 * in the file PATENTS. All contributing project authors may 7 * in the file PATENTS. All contributing project authors may
8 * be found in the AUTHORS file in the root of the source tree. 8 * be found in the AUTHORS file in the root of the source tree.
9 */ 9 */
10 10
(...skipping 43 matching lines...) Expand 10 before | Expand all | Expand 10 after
54 << *data_channel_id; 54 << *data_channel_id;
55 return false; 55 return false;
56 } 56 }
57 size_t remaining_bytes = byte_buffer.Length(); 57 size_t remaining_bytes = byte_buffer.Length();
58 *bytes_read = len - remaining_bytes; 58 *bytes_read = len - remaining_bytes;
59 return true; 59 return true;
60 } 60 }
61 61
62 QuicDataChannel::QuicDataChannel(rtc::Thread* signaling_thread, 62 QuicDataChannel::QuicDataChannel(rtc::Thread* signaling_thread,
63 rtc::Thread* worker_thread, 63 rtc::Thread* worker_thread,
64 rtc::Thread* network_thread,
65 const std::string& label, 64 const std::string& label,
66 const DataChannelInit& config) 65 const DataChannelInit& config)
67 : signaling_thread_(signaling_thread), 66 : signaling_thread_(signaling_thread),
68 worker_thread_(worker_thread), 67 worker_thread_(worker_thread),
69 network_thread_(network_thread),
70 id_(config.id), 68 id_(config.id),
71 state_(kConnecting), 69 state_(kConnecting),
72 buffered_amount_(0), 70 buffered_amount_(0),
73 next_message_id_(0), 71 next_message_id_(0),
74 label_(label), 72 label_(label),
75 protocol_(config.protocol) {} 73 protocol_(config.protocol) {}
76 74
77 QuicDataChannel::~QuicDataChannel() {} 75 QuicDataChannel::~QuicDataChannel() {}
78 76
79 void QuicDataChannel::RegisterObserver(DataChannelObserver* observer) { 77 void QuicDataChannel::RegisterObserver(DataChannelObserver* observer) {
80 RTC_DCHECK(signaling_thread_->IsCurrent()); 78 RTC_DCHECK(signaling_thread_->IsCurrent());
81 observer_ = observer; 79 observer_ = observer;
82 } 80 }
83 81
84 void QuicDataChannel::UnregisterObserver() { 82 void QuicDataChannel::UnregisterObserver() {
85 RTC_DCHECK(signaling_thread_->IsCurrent()); 83 RTC_DCHECK(signaling_thread_->IsCurrent());
86 observer_ = nullptr; 84 observer_ = nullptr;
87 } 85 }
88 86
89 bool QuicDataChannel::Send(const DataBuffer& buffer) { 87 bool QuicDataChannel::Send(const DataBuffer& buffer) {
90 RTC_DCHECK(signaling_thread_->IsCurrent()); 88 RTC_DCHECK(signaling_thread_->IsCurrent());
91 if (state_ != kOpen) { 89 if (state_ != kOpen) {
92 LOG(LS_ERROR) << "QUIC data channel " << id_ 90 LOG(LS_ERROR) << "QUIC data channel " << id_
93 << " is not open so cannot send."; 91 << " is not open so cannot send.";
94 return false; 92 return false;
95 } 93 }
96 return network_thread_->Invoke<bool>( 94 return worker_thread_->Invoke<bool>(
97 RTC_FROM_HERE, rtc::Bind(&QuicDataChannel::Send_n, this, buffer)); 95 RTC_FROM_HERE, rtc::Bind(&QuicDataChannel::Send_w, this, buffer));
98 } 96 }
99 97
100 bool QuicDataChannel::Send_n(const DataBuffer& buffer) { 98 bool QuicDataChannel::Send_w(const DataBuffer& buffer) {
101 RTC_DCHECK(network_thread_->IsCurrent()); 99 RTC_DCHECK(worker_thread_->IsCurrent());
102 100
103 // Encode and send the header containing the data channel ID and message ID. 101 // Encode and send the header containing the data channel ID and message ID.
104 rtc::CopyOnWriteBuffer header; 102 rtc::CopyOnWriteBuffer header;
105 WriteQuicDataChannelMessageHeader(id_, ++next_message_id_, &header); 103 WriteQuicDataChannelMessageHeader(id_, ++next_message_id_, &header);
106 RTC_DCHECK(quic_transport_channel_); 104 RTC_DCHECK(quic_transport_channel_);
107 cricket::ReliableQuicStream* stream = 105 cricket::ReliableQuicStream* stream =
108 quic_transport_channel_->CreateQuicStream(); 106 quic_transport_channel_->CreateQuicStream();
109 RTC_DCHECK(stream); 107 RTC_DCHECK(stream);
110 108
111 // Send the header with a FIN if the message is empty. 109 // Send the header with a FIN if the message is empty.
(...skipping 139 matching lines...) Expand 10 before | Expand all | Expand 10 after
251 this, &QuicDataChannel::OnReadyToSend); 249 this, &QuicDataChannel::OnReadyToSend);
252 quic_transport_channel_->SignalClosed.connect( 250 quic_transport_channel_->SignalClosed.connect(
253 this, &QuicDataChannel::OnConnectionClosed); 251 this, &QuicDataChannel::OnConnectionClosed);
254 if (quic_transport_channel_->writable()) { 252 if (quic_transport_channel_->writable()) {
255 return kOpen; 253 return kOpen;
256 } 254 }
257 return kConnecting; 255 return kConnecting;
258 } 256 }
259 257
260 void QuicDataChannel::OnIncomingMessage(Message&& message) { 258 void QuicDataChannel::OnIncomingMessage(Message&& message) {
261 RTC_DCHECK(network_thread_->IsCurrent()); 259 RTC_DCHECK(worker_thread_->IsCurrent());
262 RTC_DCHECK(message.stream); 260 RTC_DCHECK(message.stream);
263 if (!observer_) { 261 if (!observer_) {
264 LOG(LS_WARNING) << "QUIC data channel " << id_ 262 LOG(LS_WARNING) << "QUIC data channel " << id_
265 << " received a message but has no observer."; 263 << " received a message but has no observer.";
266 message.stream->Close(); 264 message.stream->Close();
267 return; 265 return;
268 } 266 }
269 // A FIN is received if the message fits into a single QUIC stream frame and 267 // A FIN is received if the message fits into a single QUIC stream frame and
270 // the remote peer is done sending. 268 // the remote peer is done sending.
271 if (message.stream->fin_received()) { 269 if (message.stream->fin_received()) {
(...skipping 18 matching lines...) Expand all
290 &QuicDataChannel::OnDataReceived); 288 &QuicDataChannel::OnDataReceived);
291 // The QUIC stream will be removed from |incoming_quic_messages_| once it 289 // The QUIC stream will be removed from |incoming_quic_messages_| once it
292 // closes. 290 // closes.
293 message.stream->SignalClosed.connect( 291 message.stream->SignalClosed.connect(
294 this, &QuicDataChannel::OnIncomingQueuedStreamClosed); 292 this, &QuicDataChannel::OnIncomingQueuedStreamClosed);
295 } 293 }
296 294
297 void QuicDataChannel::OnDataReceived(net::QuicStreamId stream_id, 295 void QuicDataChannel::OnDataReceived(net::QuicStreamId stream_id,
298 const char* data, 296 const char* data,
299 size_t len) { 297 size_t len) {
300 RTC_DCHECK(network_thread_->IsCurrent()); 298 RTC_DCHECK(worker_thread_->IsCurrent());
301 RTC_DCHECK(data); 299 RTC_DCHECK(data);
302 const auto& kv = incoming_quic_messages_.find(stream_id); 300 const auto& kv = incoming_quic_messages_.find(stream_id);
303 if (kv == incoming_quic_messages_.end()) { 301 if (kv == incoming_quic_messages_.end()) {
304 RTC_DCHECK(false); 302 RTC_DCHECK(false);
305 return; 303 return;
306 } 304 }
307 Message& message = kv->second; 305 Message& message = kv->second;
308 cricket::ReliableQuicStream* stream = message.stream; 306 cricket::ReliableQuicStream* stream = message.stream;
309 rtc::CopyOnWriteBuffer& received_data = message.buffer; 307 rtc::CopyOnWriteBuffer& received_data = message.buffer;
310 // If the QUIC stream has not received a FIN, then the remote peer is not 308 // If the QUIC stream has not received a FIN, then the remote peer is not
311 // finished sending data. 309 // finished sending data.
312 if (!stream->fin_received()) { 310 if (!stream->fin_received()) {
313 received_data.AppendData(data, len); 311 received_data.AppendData(data, len);
314 return; 312 return;
315 } 313 }
316 // Otherwise we are done receiving and can provide the data channel observer 314 // Otherwise we are done receiving and can provide the data channel observer
317 // with the message. 315 // with the message.
318 LOG(LS_INFO) << "Stream " << stream_id 316 LOG(LS_INFO) << "Stream " << stream_id
319 << " has finished receiving data for QUIC data channel " << id_; 317 << " has finished receiving data for QUIC data channel " << id_;
320 received_data.AppendData(data, len); 318 received_data.AppendData(data, len);
321 DataBuffer final_message(std::move(received_data), false); 319 DataBuffer final_message(std::move(received_data), false);
322 invoker_.AsyncInvoke<void>( 320 invoker_.AsyncInvoke<void>(
323 RTC_FROM_HERE, signaling_thread_, 321 RTC_FROM_HERE, signaling_thread_,
324 rtc::Bind(&QuicDataChannel::OnMessage_s, this, std::move(final_message))); 322 rtc::Bind(&QuicDataChannel::OnMessage_s, this, std::move(final_message)));
325 // Once the stream is closed, OnDataReceived will not fire for the stream. 323 // Once the stream is closed, OnDataReceived will not fire for the stream.
326 stream->Close(); 324 stream->Close();
327 } 325 }
328 326
329 void QuicDataChannel::OnReadyToSend(cricket::TransportChannel* channel) { 327 void QuicDataChannel::OnReadyToSend(cricket::TransportChannel* channel) {
330 RTC_DCHECK(network_thread_->IsCurrent()); 328 RTC_DCHECK(worker_thread_->IsCurrent());
331 RTC_DCHECK(channel == quic_transport_channel_); 329 RTC_DCHECK(channel == quic_transport_channel_);
332 LOG(LS_INFO) << "QuicTransportChannel is ready to send"; 330 LOG(LS_INFO) << "QuicTransportChannel is ready to send";
333 invoker_.AsyncInvoke<void>( 331 invoker_.AsyncInvoke<void>(
334 RTC_FROM_HERE, signaling_thread_, 332 RTC_FROM_HERE, signaling_thread_,
335 rtc::Bind(&QuicDataChannel::SetState_s, this, kOpen)); 333 rtc::Bind(&QuicDataChannel::SetState_s, this, kOpen));
336 } 334 }
337 335
338 void QuicDataChannel::OnWriteBlockedStreamClosed(net::QuicStreamId stream_id, 336 void QuicDataChannel::OnWriteBlockedStreamClosed(net::QuicStreamId stream_id,
339 int error) { 337 int error) {
340 RTC_DCHECK(worker_thread_->IsCurrent()); 338 RTC_DCHECK(worker_thread_->IsCurrent());
341 LOG(LS_VERBOSE) << "Write blocked stream " << stream_id << " is closed."; 339 LOG(LS_VERBOSE) << "Write blocked stream " << stream_id << " is closed.";
342 write_blocked_quic_streams_.erase(stream_id); 340 write_blocked_quic_streams_.erase(stream_id);
343 } 341 }
344 342
345 void QuicDataChannel::OnIncomingQueuedStreamClosed(net::QuicStreamId stream_id, 343 void QuicDataChannel::OnIncomingQueuedStreamClosed(net::QuicStreamId stream_id,
346 int error) { 344 int error) {
347 RTC_DCHECK(network_thread_->IsCurrent()); 345 RTC_DCHECK(worker_thread_->IsCurrent());
348 LOG(LS_VERBOSE) << "Incoming queued stream " << stream_id << " is closed."; 346 LOG(LS_VERBOSE) << "Incoming queued stream " << stream_id << " is closed.";
349 incoming_quic_messages_.erase(stream_id); 347 incoming_quic_messages_.erase(stream_id);
350 } 348 }
351 349
352 void QuicDataChannel::OnConnectionClosed() { 350 void QuicDataChannel::OnConnectionClosed() {
353 RTC_DCHECK(worker_thread_->IsCurrent()); 351 RTC_DCHECK(worker_thread_->IsCurrent());
354 invoker_.AsyncInvoke<void>(RTC_FROM_HERE, signaling_thread_, 352 invoker_.AsyncInvoke<void>(RTC_FROM_HERE, signaling_thread_,
355 rtc::Bind(&QuicDataChannel::Close, this)); 353 rtc::Bind(&QuicDataChannel::Close, this));
356 } 354 }
357 355
(...skipping 29 matching lines...) Expand all
387 385
388 size_t QuicDataChannel::GetNumWriteBlockedStreams() const { 386 size_t QuicDataChannel::GetNumWriteBlockedStreams() const {
389 return write_blocked_quic_streams_.size(); 387 return write_blocked_quic_streams_.size();
390 } 388 }
391 389
392 size_t QuicDataChannel::GetNumIncomingStreams() const { 390 size_t QuicDataChannel::GetNumIncomingStreams() const {
393 return incoming_quic_messages_.size(); 391 return incoming_quic_messages_.size();
394 } 392 }
395 393
396 } // namespace webrtc 394 } // namespace webrtc
OLDNEW
« no previous file with comments | « webrtc/api/quicdatachannel.h ('k') | webrtc/api/quicdatachannel_unittest.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698