OLD | NEW |
1 /* | 1 /* |
2 * Copyright 2016 The WebRTC project authors. All Rights Reserved. | 2 * Copyright 2016 The WebRTC project authors. All Rights Reserved. |
3 * | 3 * |
4 * Use of this source code is governed by a BSD-style license | 4 * Use of this source code is governed by a BSD-style license |
5 * that can be found in the LICENSE file in the root of the source | 5 * that can be found in the LICENSE file in the root of the source |
6 * tree. An additional intellectual property rights grant can be found | 6 * tree. An additional intellectual property rights grant can be found |
7 * in the file PATENTS. All contributing project authors may | 7 * in the file PATENTS. All contributing project authors may |
8 * be found in the AUTHORS file in the root of the source tree. | 8 * be found in the AUTHORS file in the root of the source tree. |
9 */ | 9 */ |
10 | 10 |
(...skipping 43 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
54 << *data_channel_id; | 54 << *data_channel_id; |
55 return false; | 55 return false; |
56 } | 56 } |
57 size_t remaining_bytes = byte_buffer.Length(); | 57 size_t remaining_bytes = byte_buffer.Length(); |
58 *bytes_read = len - remaining_bytes; | 58 *bytes_read = len - remaining_bytes; |
59 return true; | 59 return true; |
60 } | 60 } |
61 | 61 |
62 QuicDataChannel::QuicDataChannel(rtc::Thread* signaling_thread, | 62 QuicDataChannel::QuicDataChannel(rtc::Thread* signaling_thread, |
63 rtc::Thread* worker_thread, | 63 rtc::Thread* worker_thread, |
64 rtc::Thread* network_thread, | |
65 const std::string& label, | 64 const std::string& label, |
66 const DataChannelInit& config) | 65 const DataChannelInit& config) |
67 : signaling_thread_(signaling_thread), | 66 : signaling_thread_(signaling_thread), |
68 worker_thread_(worker_thread), | 67 worker_thread_(worker_thread), |
69 network_thread_(network_thread), | |
70 id_(config.id), | 68 id_(config.id), |
71 state_(kConnecting), | 69 state_(kConnecting), |
72 buffered_amount_(0), | 70 buffered_amount_(0), |
73 next_message_id_(0), | 71 next_message_id_(0), |
74 label_(label), | 72 label_(label), |
75 protocol_(config.protocol) {} | 73 protocol_(config.protocol) {} |
76 | 74 |
77 QuicDataChannel::~QuicDataChannel() {} | 75 QuicDataChannel::~QuicDataChannel() {} |
78 | 76 |
79 void QuicDataChannel::RegisterObserver(DataChannelObserver* observer) { | 77 void QuicDataChannel::RegisterObserver(DataChannelObserver* observer) { |
80 RTC_DCHECK(signaling_thread_->IsCurrent()); | 78 RTC_DCHECK(signaling_thread_->IsCurrent()); |
81 observer_ = observer; | 79 observer_ = observer; |
82 } | 80 } |
83 | 81 |
84 void QuicDataChannel::UnregisterObserver() { | 82 void QuicDataChannel::UnregisterObserver() { |
85 RTC_DCHECK(signaling_thread_->IsCurrent()); | 83 RTC_DCHECK(signaling_thread_->IsCurrent()); |
86 observer_ = nullptr; | 84 observer_ = nullptr; |
87 } | 85 } |
88 | 86 |
89 bool QuicDataChannel::Send(const DataBuffer& buffer) { | 87 bool QuicDataChannel::Send(const DataBuffer& buffer) { |
90 RTC_DCHECK(signaling_thread_->IsCurrent()); | 88 RTC_DCHECK(signaling_thread_->IsCurrent()); |
91 if (state_ != kOpen) { | 89 if (state_ != kOpen) { |
92 LOG(LS_ERROR) << "QUIC data channel " << id_ | 90 LOG(LS_ERROR) << "QUIC data channel " << id_ |
93 << " is not open so cannot send."; | 91 << " is not open so cannot send."; |
94 return false; | 92 return false; |
95 } | 93 } |
96 return network_thread_->Invoke<bool>( | 94 return worker_thread_->Invoke<bool>( |
97 RTC_FROM_HERE, rtc::Bind(&QuicDataChannel::Send_n, this, buffer)); | 95 RTC_FROM_HERE, rtc::Bind(&QuicDataChannel::Send_w, this, buffer)); |
98 } | 96 } |
99 | 97 |
100 bool QuicDataChannel::Send_n(const DataBuffer& buffer) { | 98 bool QuicDataChannel::Send_w(const DataBuffer& buffer) { |
101 RTC_DCHECK(network_thread_->IsCurrent()); | 99 RTC_DCHECK(worker_thread_->IsCurrent()); |
102 | 100 |
103 // Encode and send the header containing the data channel ID and message ID. | 101 // Encode and send the header containing the data channel ID and message ID. |
104 rtc::CopyOnWriteBuffer header; | 102 rtc::CopyOnWriteBuffer header; |
105 WriteQuicDataChannelMessageHeader(id_, ++next_message_id_, &header); | 103 WriteQuicDataChannelMessageHeader(id_, ++next_message_id_, &header); |
106 RTC_DCHECK(quic_transport_channel_); | 104 RTC_DCHECK(quic_transport_channel_); |
107 cricket::ReliableQuicStream* stream = | 105 cricket::ReliableQuicStream* stream = |
108 quic_transport_channel_->CreateQuicStream(); | 106 quic_transport_channel_->CreateQuicStream(); |
109 RTC_DCHECK(stream); | 107 RTC_DCHECK(stream); |
110 | 108 |
111 // Send the header with a FIN if the message is empty. | 109 // Send the header with a FIN if the message is empty. |
(...skipping 139 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
251 this, &QuicDataChannel::OnReadyToSend); | 249 this, &QuicDataChannel::OnReadyToSend); |
252 quic_transport_channel_->SignalClosed.connect( | 250 quic_transport_channel_->SignalClosed.connect( |
253 this, &QuicDataChannel::OnConnectionClosed); | 251 this, &QuicDataChannel::OnConnectionClosed); |
254 if (quic_transport_channel_->writable()) { | 252 if (quic_transport_channel_->writable()) { |
255 return kOpen; | 253 return kOpen; |
256 } | 254 } |
257 return kConnecting; | 255 return kConnecting; |
258 } | 256 } |
259 | 257 |
260 void QuicDataChannel::OnIncomingMessage(Message&& message) { | 258 void QuicDataChannel::OnIncomingMessage(Message&& message) { |
261 RTC_DCHECK(network_thread_->IsCurrent()); | 259 RTC_DCHECK(worker_thread_->IsCurrent()); |
262 RTC_DCHECK(message.stream); | 260 RTC_DCHECK(message.stream); |
263 if (!observer_) { | 261 if (!observer_) { |
264 LOG(LS_WARNING) << "QUIC data channel " << id_ | 262 LOG(LS_WARNING) << "QUIC data channel " << id_ |
265 << " received a message but has no observer."; | 263 << " received a message but has no observer."; |
266 message.stream->Close(); | 264 message.stream->Close(); |
267 return; | 265 return; |
268 } | 266 } |
269 // A FIN is received if the message fits into a single QUIC stream frame and | 267 // A FIN is received if the message fits into a single QUIC stream frame and |
270 // the remote peer is done sending. | 268 // the remote peer is done sending. |
271 if (message.stream->fin_received()) { | 269 if (message.stream->fin_received()) { |
(...skipping 18 matching lines...) Expand all Loading... |
290 &QuicDataChannel::OnDataReceived); | 288 &QuicDataChannel::OnDataReceived); |
291 // The QUIC stream will be removed from |incoming_quic_messages_| once it | 289 // The QUIC stream will be removed from |incoming_quic_messages_| once it |
292 // closes. | 290 // closes. |
293 message.stream->SignalClosed.connect( | 291 message.stream->SignalClosed.connect( |
294 this, &QuicDataChannel::OnIncomingQueuedStreamClosed); | 292 this, &QuicDataChannel::OnIncomingQueuedStreamClosed); |
295 } | 293 } |
296 | 294 |
297 void QuicDataChannel::OnDataReceived(net::QuicStreamId stream_id, | 295 void QuicDataChannel::OnDataReceived(net::QuicStreamId stream_id, |
298 const char* data, | 296 const char* data, |
299 size_t len) { | 297 size_t len) { |
300 RTC_DCHECK(network_thread_->IsCurrent()); | 298 RTC_DCHECK(worker_thread_->IsCurrent()); |
301 RTC_DCHECK(data); | 299 RTC_DCHECK(data); |
302 const auto& kv = incoming_quic_messages_.find(stream_id); | 300 const auto& kv = incoming_quic_messages_.find(stream_id); |
303 if (kv == incoming_quic_messages_.end()) { | 301 if (kv == incoming_quic_messages_.end()) { |
304 RTC_DCHECK(false); | 302 RTC_DCHECK(false); |
305 return; | 303 return; |
306 } | 304 } |
307 Message& message = kv->second; | 305 Message& message = kv->second; |
308 cricket::ReliableQuicStream* stream = message.stream; | 306 cricket::ReliableQuicStream* stream = message.stream; |
309 rtc::CopyOnWriteBuffer& received_data = message.buffer; | 307 rtc::CopyOnWriteBuffer& received_data = message.buffer; |
310 // If the QUIC stream has not received a FIN, then the remote peer is not | 308 // If the QUIC stream has not received a FIN, then the remote peer is not |
311 // finished sending data. | 309 // finished sending data. |
312 if (!stream->fin_received()) { | 310 if (!stream->fin_received()) { |
313 received_data.AppendData(data, len); | 311 received_data.AppendData(data, len); |
314 return; | 312 return; |
315 } | 313 } |
316 // Otherwise we are done receiving and can provide the data channel observer | 314 // Otherwise we are done receiving and can provide the data channel observer |
317 // with the message. | 315 // with the message. |
318 LOG(LS_INFO) << "Stream " << stream_id | 316 LOG(LS_INFO) << "Stream " << stream_id |
319 << " has finished receiving data for QUIC data channel " << id_; | 317 << " has finished receiving data for QUIC data channel " << id_; |
320 received_data.AppendData(data, len); | 318 received_data.AppendData(data, len); |
321 DataBuffer final_message(std::move(received_data), false); | 319 DataBuffer final_message(std::move(received_data), false); |
322 invoker_.AsyncInvoke<void>( | 320 invoker_.AsyncInvoke<void>( |
323 RTC_FROM_HERE, signaling_thread_, | 321 RTC_FROM_HERE, signaling_thread_, |
324 rtc::Bind(&QuicDataChannel::OnMessage_s, this, std::move(final_message))); | 322 rtc::Bind(&QuicDataChannel::OnMessage_s, this, std::move(final_message))); |
325 // Once the stream is closed, OnDataReceived will not fire for the stream. | 323 // Once the stream is closed, OnDataReceived will not fire for the stream. |
326 stream->Close(); | 324 stream->Close(); |
327 } | 325 } |
328 | 326 |
329 void QuicDataChannel::OnReadyToSend(cricket::TransportChannel* channel) { | 327 void QuicDataChannel::OnReadyToSend(cricket::TransportChannel* channel) { |
330 RTC_DCHECK(network_thread_->IsCurrent()); | 328 RTC_DCHECK(worker_thread_->IsCurrent()); |
331 RTC_DCHECK(channel == quic_transport_channel_); | 329 RTC_DCHECK(channel == quic_transport_channel_); |
332 LOG(LS_INFO) << "QuicTransportChannel is ready to send"; | 330 LOG(LS_INFO) << "QuicTransportChannel is ready to send"; |
333 invoker_.AsyncInvoke<void>( | 331 invoker_.AsyncInvoke<void>( |
334 RTC_FROM_HERE, signaling_thread_, | 332 RTC_FROM_HERE, signaling_thread_, |
335 rtc::Bind(&QuicDataChannel::SetState_s, this, kOpen)); | 333 rtc::Bind(&QuicDataChannel::SetState_s, this, kOpen)); |
336 } | 334 } |
337 | 335 |
338 void QuicDataChannel::OnWriteBlockedStreamClosed(net::QuicStreamId stream_id, | 336 void QuicDataChannel::OnWriteBlockedStreamClosed(net::QuicStreamId stream_id, |
339 int error) { | 337 int error) { |
340 RTC_DCHECK(worker_thread_->IsCurrent()); | 338 RTC_DCHECK(worker_thread_->IsCurrent()); |
341 LOG(LS_VERBOSE) << "Write blocked stream " << stream_id << " is closed."; | 339 LOG(LS_VERBOSE) << "Write blocked stream " << stream_id << " is closed."; |
342 write_blocked_quic_streams_.erase(stream_id); | 340 write_blocked_quic_streams_.erase(stream_id); |
343 } | 341 } |
344 | 342 |
345 void QuicDataChannel::OnIncomingQueuedStreamClosed(net::QuicStreamId stream_id, | 343 void QuicDataChannel::OnIncomingQueuedStreamClosed(net::QuicStreamId stream_id, |
346 int error) { | 344 int error) { |
347 RTC_DCHECK(network_thread_->IsCurrent()); | 345 RTC_DCHECK(worker_thread_->IsCurrent()); |
348 LOG(LS_VERBOSE) << "Incoming queued stream " << stream_id << " is closed."; | 346 LOG(LS_VERBOSE) << "Incoming queued stream " << stream_id << " is closed."; |
349 incoming_quic_messages_.erase(stream_id); | 347 incoming_quic_messages_.erase(stream_id); |
350 } | 348 } |
351 | 349 |
352 void QuicDataChannel::OnConnectionClosed() { | 350 void QuicDataChannel::OnConnectionClosed() { |
353 RTC_DCHECK(worker_thread_->IsCurrent()); | 351 RTC_DCHECK(worker_thread_->IsCurrent()); |
354 invoker_.AsyncInvoke<void>(RTC_FROM_HERE, signaling_thread_, | 352 invoker_.AsyncInvoke<void>(RTC_FROM_HERE, signaling_thread_, |
355 rtc::Bind(&QuicDataChannel::Close, this)); | 353 rtc::Bind(&QuicDataChannel::Close, this)); |
356 } | 354 } |
357 | 355 |
(...skipping 29 matching lines...) Expand all Loading... |
387 | 385 |
388 size_t QuicDataChannel::GetNumWriteBlockedStreams() const { | 386 size_t QuicDataChannel::GetNumWriteBlockedStreams() const { |
389 return write_blocked_quic_streams_.size(); | 387 return write_blocked_quic_streams_.size(); |
390 } | 388 } |
391 | 389 |
392 size_t QuicDataChannel::GetNumIncomingStreams() const { | 390 size_t QuicDataChannel::GetNumIncomingStreams() const { |
393 return incoming_quic_messages_.size(); | 391 return incoming_quic_messages_.size(); |
394 } | 392 } |
395 | 393 |
396 } // namespace webrtc | 394 } // namespace webrtc |
OLD | NEW |