Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(368)

Side by Side Diff: webrtc/api/quicdatachannel.cc

Issue 2089553002: Refactoring on QUIC (Closed) Base URL: https://chromium.googlesource.com/external/webrtc.git@master
Patch Set: Split the CL Created 4 years, 5 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « webrtc/api/quicdatachannel.h ('k') | webrtc/api/quicdatachannel_unittest.cc » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 /* 1 /*
2 * Copyright 2016 The WebRTC project authors. All Rights Reserved. 2 * Copyright 2016 The WebRTC project authors. All Rights Reserved.
3 * 3 *
4 * Use of this source code is governed by a BSD-style license 4 * Use of this source code is governed by a BSD-style license
5 * that can be found in the LICENSE file in the root of the source 5 * that can be found in the LICENSE file in the root of the source
6 * tree. An additional intellectual property rights grant can be found 6 * tree. An additional intellectual property rights grant can be found
7 * in the file PATENTS. All contributing project authors may 7 * in the file PATENTS. All contributing project authors may
8 * be found in the AUTHORS file in the root of the source tree. 8 * be found in the AUTHORS file in the root of the source tree.
9 */ 9 */
10 10
(...skipping 43 matching lines...) Expand 10 before | Expand all | Expand 10 after
54 << *data_channel_id; 54 << *data_channel_id;
55 return false; 55 return false;
56 } 56 }
57 size_t remaining_bytes = byte_buffer.Length(); 57 size_t remaining_bytes = byte_buffer.Length();
58 *bytes_read = len - remaining_bytes; 58 *bytes_read = len - remaining_bytes;
59 return true; 59 return true;
60 } 60 }
61 61
62 QuicDataChannel::QuicDataChannel(rtc::Thread* signaling_thread, 62 QuicDataChannel::QuicDataChannel(rtc::Thread* signaling_thread,
63 rtc::Thread* worker_thread, 63 rtc::Thread* worker_thread,
64 rtc::Thread* network_thread,
64 const std::string& label, 65 const std::string& label,
65 const DataChannelInit& config) 66 const DataChannelInit& config)
66 : signaling_thread_(signaling_thread), 67 : signaling_thread_(signaling_thread),
67 worker_thread_(worker_thread), 68 worker_thread_(worker_thread),
69 network_thread_(network_thread),
68 id_(config.id), 70 id_(config.id),
69 state_(kConnecting), 71 state_(kConnecting),
70 buffered_amount_(0), 72 buffered_amount_(0),
71 next_message_id_(0), 73 next_message_id_(0),
72 label_(label), 74 label_(label),
73 protocol_(config.protocol) {} 75 protocol_(config.protocol) {}
74 76
75 QuicDataChannel::~QuicDataChannel() {} 77 QuicDataChannel::~QuicDataChannel() {}
76 78
77 void QuicDataChannel::RegisterObserver(DataChannelObserver* observer) { 79 void QuicDataChannel::RegisterObserver(DataChannelObserver* observer) {
78 RTC_DCHECK(signaling_thread_->IsCurrent()); 80 RTC_DCHECK(signaling_thread_->IsCurrent());
79 observer_ = observer; 81 observer_ = observer;
80 } 82 }
81 83
82 void QuicDataChannel::UnregisterObserver() { 84 void QuicDataChannel::UnregisterObserver() {
83 RTC_DCHECK(signaling_thread_->IsCurrent()); 85 RTC_DCHECK(signaling_thread_->IsCurrent());
84 observer_ = nullptr; 86 observer_ = nullptr;
85 } 87 }
86 88
87 bool QuicDataChannel::Send(const DataBuffer& buffer) { 89 bool QuicDataChannel::Send(const DataBuffer& buffer) {
88 RTC_DCHECK(signaling_thread_->IsCurrent()); 90 RTC_DCHECK(signaling_thread_->IsCurrent());
89 if (state_ != kOpen) { 91 if (state_ != kOpen) {
90 LOG(LS_ERROR) << "QUIC data channel " << id_ 92 LOG(LS_ERROR) << "QUIC data channel " << id_
91 << " is not open so cannot send."; 93 << " is not open so cannot send.";
92 return false; 94 return false;
93 } 95 }
94 return worker_thread_->Invoke<bool>( 96 return network_thread_->Invoke<bool>(
95 RTC_FROM_HERE, rtc::Bind(&QuicDataChannel::Send_w, this, buffer)); 97 RTC_FROM_HERE, rtc::Bind(&QuicDataChannel::Send_n, this, buffer));
96 } 98 }
97 99
98 bool QuicDataChannel::Send_w(const DataBuffer& buffer) { 100 bool QuicDataChannel::Send_n(const DataBuffer& buffer) {
99 RTC_DCHECK(worker_thread_->IsCurrent()); 101 RTC_DCHECK(network_thread_->IsCurrent());
100 102
101 // Encode and send the header containing the data channel ID and message ID. 103 // Encode and send the header containing the data channel ID and message ID.
102 rtc::CopyOnWriteBuffer header; 104 rtc::CopyOnWriteBuffer header;
103 WriteQuicDataChannelMessageHeader(id_, ++next_message_id_, &header); 105 WriteQuicDataChannelMessageHeader(id_, ++next_message_id_, &header);
104 RTC_DCHECK(quic_transport_channel_); 106 RTC_DCHECK(quic_transport_channel_);
105 cricket::ReliableQuicStream* stream = 107 cricket::ReliableQuicStream* stream =
106 quic_transport_channel_->CreateQuicStream(); 108 quic_transport_channel_->CreateQuicStream();
107 RTC_DCHECK(stream); 109 RTC_DCHECK(stream);
108 110
109 // Send the header with a FIN if the message is empty. 111 // Send the header with a FIN if the message is empty.
(...skipping 139 matching lines...) Expand 10 before | Expand all | Expand 10 after
249 this, &QuicDataChannel::OnReadyToSend); 251 this, &QuicDataChannel::OnReadyToSend);
250 quic_transport_channel_->SignalClosed.connect( 252 quic_transport_channel_->SignalClosed.connect(
251 this, &QuicDataChannel::OnConnectionClosed); 253 this, &QuicDataChannel::OnConnectionClosed);
252 if (quic_transport_channel_->writable()) { 254 if (quic_transport_channel_->writable()) {
253 return kOpen; 255 return kOpen;
254 } 256 }
255 return kConnecting; 257 return kConnecting;
256 } 258 }
257 259
258 void QuicDataChannel::OnIncomingMessage(Message&& message) { 260 void QuicDataChannel::OnIncomingMessage(Message&& message) {
259 RTC_DCHECK(worker_thread_->IsCurrent()); 261 RTC_DCHECK(network_thread_->IsCurrent());
260 RTC_DCHECK(message.stream); 262 RTC_DCHECK(message.stream);
261 if (!observer_) { 263 if (!observer_) {
262 LOG(LS_WARNING) << "QUIC data channel " << id_ 264 LOG(LS_WARNING) << "QUIC data channel " << id_
263 << " received a message but has no observer."; 265 << " received a message but has no observer.";
264 message.stream->Close(); 266 message.stream->Close();
265 return; 267 return;
266 } 268 }
267 // A FIN is received if the message fits into a single QUIC stream frame and 269 // A FIN is received if the message fits into a single QUIC stream frame and
268 // the remote peer is done sending. 270 // the remote peer is done sending.
269 if (message.stream->fin_received()) { 271 if (message.stream->fin_received()) {
(...skipping 18 matching lines...) Expand all
288 &QuicDataChannel::OnDataReceived); 290 &QuicDataChannel::OnDataReceived);
289 // The QUIC stream will be removed from |incoming_quic_messages_| once it 291 // The QUIC stream will be removed from |incoming_quic_messages_| once it
290 // closes. 292 // closes.
291 message.stream->SignalClosed.connect( 293 message.stream->SignalClosed.connect(
292 this, &QuicDataChannel::OnIncomingQueuedStreamClosed); 294 this, &QuicDataChannel::OnIncomingQueuedStreamClosed);
293 } 295 }
294 296
295 void QuicDataChannel::OnDataReceived(net::QuicStreamId stream_id, 297 void QuicDataChannel::OnDataReceived(net::QuicStreamId stream_id,
296 const char* data, 298 const char* data,
297 size_t len) { 299 size_t len) {
298 RTC_DCHECK(worker_thread_->IsCurrent()); 300 RTC_DCHECK(network_thread_->IsCurrent());
299 RTC_DCHECK(data); 301 RTC_DCHECK(data);
300 const auto& kv = incoming_quic_messages_.find(stream_id); 302 const auto& kv = incoming_quic_messages_.find(stream_id);
301 if (kv == incoming_quic_messages_.end()) { 303 if (kv == incoming_quic_messages_.end()) {
302 RTC_DCHECK(false); 304 RTC_DCHECK(false);
303 return; 305 return;
304 } 306 }
305 Message& message = kv->second; 307 Message& message = kv->second;
306 cricket::ReliableQuicStream* stream = message.stream; 308 cricket::ReliableQuicStream* stream = message.stream;
307 rtc::CopyOnWriteBuffer& received_data = message.buffer; 309 rtc::CopyOnWriteBuffer& received_data = message.buffer;
308 // If the QUIC stream has not received a FIN, then the remote peer is not 310 // If the QUIC stream has not received a FIN, then the remote peer is not
309 // finished sending data. 311 // finished sending data.
310 if (!stream->fin_received()) { 312 if (!stream->fin_received()) {
311 received_data.AppendData(data, len); 313 received_data.AppendData(data, len);
312 return; 314 return;
313 } 315 }
314 // Otherwise we are done receiving and can provide the data channel observer 316 // Otherwise we are done receiving and can provide the data channel observer
315 // with the message. 317 // with the message.
316 LOG(LS_INFO) << "Stream " << stream_id 318 LOG(LS_INFO) << "Stream " << stream_id
317 << " has finished receiving data for QUIC data channel " << id_; 319 << " has finished receiving data for QUIC data channel " << id_;
318 received_data.AppendData(data, len); 320 received_data.AppendData(data, len);
319 DataBuffer final_message(std::move(received_data), false); 321 DataBuffer final_message(std::move(received_data), false);
320 invoker_.AsyncInvoke<void>( 322 invoker_.AsyncInvoke<void>(
321 RTC_FROM_HERE, signaling_thread_, 323 RTC_FROM_HERE, signaling_thread_,
322 rtc::Bind(&QuicDataChannel::OnMessage_s, this, std::move(final_message))); 324 rtc::Bind(&QuicDataChannel::OnMessage_s, this, std::move(final_message)));
323 // Once the stream is closed, OnDataReceived will not fire for the stream. 325 // Once the stream is closed, OnDataReceived will not fire for the stream.
324 stream->Close(); 326 stream->Close();
325 } 327 }
326 328
327 void QuicDataChannel::OnReadyToSend(cricket::TransportChannel* channel) { 329 void QuicDataChannel::OnReadyToSend(cricket::TransportChannel* channel) {
328 RTC_DCHECK(worker_thread_->IsCurrent()); 330 RTC_DCHECK(network_thread_->IsCurrent());
329 RTC_DCHECK(channel == quic_transport_channel_); 331 RTC_DCHECK(channel == quic_transport_channel_);
330 LOG(LS_INFO) << "QuicTransportChannel is ready to send"; 332 LOG(LS_INFO) << "QuicTransportChannel is ready to send";
331 invoker_.AsyncInvoke<void>( 333 invoker_.AsyncInvoke<void>(
332 RTC_FROM_HERE, signaling_thread_, 334 RTC_FROM_HERE, signaling_thread_,
333 rtc::Bind(&QuicDataChannel::SetState_s, this, kOpen)); 335 rtc::Bind(&QuicDataChannel::SetState_s, this, kOpen));
334 } 336 }
335 337
336 void QuicDataChannel::OnWriteBlockedStreamClosed(net::QuicStreamId stream_id, 338 void QuicDataChannel::OnWriteBlockedStreamClosed(net::QuicStreamId stream_id,
337 int error) { 339 int error) {
338 RTC_DCHECK(worker_thread_->IsCurrent()); 340 RTC_DCHECK(worker_thread_->IsCurrent());
339 LOG(LS_VERBOSE) << "Write blocked stream " << stream_id << " is closed."; 341 LOG(LS_VERBOSE) << "Write blocked stream " << stream_id << " is closed.";
340 write_blocked_quic_streams_.erase(stream_id); 342 write_blocked_quic_streams_.erase(stream_id);
341 } 343 }
342 344
343 void QuicDataChannel::OnIncomingQueuedStreamClosed(net::QuicStreamId stream_id, 345 void QuicDataChannel::OnIncomingQueuedStreamClosed(net::QuicStreamId stream_id,
344 int error) { 346 int error) {
345 RTC_DCHECK(worker_thread_->IsCurrent()); 347 RTC_DCHECK(network_thread_->IsCurrent());
346 LOG(LS_VERBOSE) << "Incoming queued stream " << stream_id << " is closed."; 348 LOG(LS_VERBOSE) << "Incoming queued stream " << stream_id << " is closed.";
347 incoming_quic_messages_.erase(stream_id); 349 incoming_quic_messages_.erase(stream_id);
348 } 350 }
349 351
350 void QuicDataChannel::OnConnectionClosed() { 352 void QuicDataChannel::OnConnectionClosed() {
351 RTC_DCHECK(worker_thread_->IsCurrent()); 353 RTC_DCHECK(worker_thread_->IsCurrent());
352 invoker_.AsyncInvoke<void>(RTC_FROM_HERE, signaling_thread_, 354 invoker_.AsyncInvoke<void>(RTC_FROM_HERE, signaling_thread_,
353 rtc::Bind(&QuicDataChannel::Close, this)); 355 rtc::Bind(&QuicDataChannel::Close, this));
354 } 356 }
355 357
(...skipping 29 matching lines...) Expand all
385 387
386 size_t QuicDataChannel::GetNumWriteBlockedStreams() const { 388 size_t QuicDataChannel::GetNumWriteBlockedStreams() const {
387 return write_blocked_quic_streams_.size(); 389 return write_blocked_quic_streams_.size();
388 } 390 }
389 391
390 size_t QuicDataChannel::GetNumIncomingStreams() const { 392 size_t QuicDataChannel::GetNumIncomingStreams() const {
391 return incoming_quic_messages_.size(); 393 return incoming_quic_messages_.size();
392 } 394 }
393 395
394 } // namespace webrtc 396 } // namespace webrtc
OLDNEW
« no previous file with comments | « webrtc/api/quicdatachannel.h ('k') | webrtc/api/quicdatachannel_unittest.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698