OLD | NEW |
1 /* | 1 /* |
2 * Copyright 2016 The WebRTC project authors. All Rights Reserved. | 2 * Copyright 2016 The WebRTC project authors. All Rights Reserved. |
3 * | 3 * |
4 * Use of this source code is governed by a BSD-style license | 4 * Use of this source code is governed by a BSD-style license |
5 * that can be found in the LICENSE file in the root of the source | 5 * that can be found in the LICENSE file in the root of the source |
6 * tree. An additional intellectual property rights grant can be found | 6 * tree. An additional intellectual property rights grant can be found |
7 * in the file PATENTS. All contributing project authors may | 7 * in the file PATENTS. All contributing project authors may |
8 * be found in the AUTHORS file in the root of the source tree. | 8 * be found in the AUTHORS file in the root of the source tree. |
9 */ | 9 */ |
10 | 10 |
(...skipping 43 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
54 << *data_channel_id; | 54 << *data_channel_id; |
55 return false; | 55 return false; |
56 } | 56 } |
57 size_t remaining_bytes = byte_buffer.Length(); | 57 size_t remaining_bytes = byte_buffer.Length(); |
58 *bytes_read = len - remaining_bytes; | 58 *bytes_read = len - remaining_bytes; |
59 return true; | 59 return true; |
60 } | 60 } |
61 | 61 |
62 QuicDataChannel::QuicDataChannel(rtc::Thread* signaling_thread, | 62 QuicDataChannel::QuicDataChannel(rtc::Thread* signaling_thread, |
63 rtc::Thread* worker_thread, | 63 rtc::Thread* worker_thread, |
| 64 rtc::Thread* network_thread, |
64 const std::string& label, | 65 const std::string& label, |
65 const DataChannelInit& config) | 66 const DataChannelInit& config) |
66 : signaling_thread_(signaling_thread), | 67 : signaling_thread_(signaling_thread), |
67 worker_thread_(worker_thread), | 68 worker_thread_(worker_thread), |
| 69 network_thread_(network_thread), |
68 id_(config.id), | 70 id_(config.id), |
69 state_(kConnecting), | 71 state_(kConnecting), |
70 buffered_amount_(0), | 72 buffered_amount_(0), |
71 next_message_id_(0), | 73 next_message_id_(0), |
72 label_(label), | 74 label_(label), |
73 protocol_(config.protocol) {} | 75 protocol_(config.protocol) {} |
74 | 76 |
75 QuicDataChannel::~QuicDataChannel() {} | 77 QuicDataChannel::~QuicDataChannel() {} |
76 | 78 |
77 void QuicDataChannel::RegisterObserver(DataChannelObserver* observer) { | 79 void QuicDataChannel::RegisterObserver(DataChannelObserver* observer) { |
78 RTC_DCHECK(signaling_thread_->IsCurrent()); | 80 RTC_DCHECK(signaling_thread_->IsCurrent()); |
79 observer_ = observer; | 81 observer_ = observer; |
80 } | 82 } |
81 | 83 |
82 void QuicDataChannel::UnregisterObserver() { | 84 void QuicDataChannel::UnregisterObserver() { |
83 RTC_DCHECK(signaling_thread_->IsCurrent()); | 85 RTC_DCHECK(signaling_thread_->IsCurrent()); |
84 observer_ = nullptr; | 86 observer_ = nullptr; |
85 } | 87 } |
86 | 88 |
87 bool QuicDataChannel::Send(const DataBuffer& buffer) { | 89 bool QuicDataChannel::Send(const DataBuffer& buffer) { |
88 RTC_DCHECK(signaling_thread_->IsCurrent()); | 90 RTC_DCHECK(signaling_thread_->IsCurrent()); |
89 if (state_ != kOpen) { | 91 if (state_ != kOpen) { |
90 LOG(LS_ERROR) << "QUIC data channel " << id_ | 92 LOG(LS_ERROR) << "QUIC data channel " << id_ |
91 << " is not open so cannot send."; | 93 << " is not open so cannot send."; |
92 return false; | 94 return false; |
93 } | 95 } |
94 return worker_thread_->Invoke<bool>( | 96 return network_thread_->Invoke<bool>( |
95 RTC_FROM_HERE, rtc::Bind(&QuicDataChannel::Send_w, this, buffer)); | 97 RTC_FROM_HERE, rtc::Bind(&QuicDataChannel::Send_n, this, buffer)); |
96 } | 98 } |
97 | 99 |
98 bool QuicDataChannel::Send_w(const DataBuffer& buffer) { | 100 bool QuicDataChannel::Send_n(const DataBuffer& buffer) { |
99 RTC_DCHECK(worker_thread_->IsCurrent()); | 101 RTC_DCHECK(network_thread_->IsCurrent()); |
100 | 102 |
101 // Encode and send the header containing the data channel ID and message ID. | 103 // Encode and send the header containing the data channel ID and message ID. |
102 rtc::CopyOnWriteBuffer header; | 104 rtc::CopyOnWriteBuffer header; |
103 WriteQuicDataChannelMessageHeader(id_, ++next_message_id_, &header); | 105 WriteQuicDataChannelMessageHeader(id_, ++next_message_id_, &header); |
104 RTC_DCHECK(quic_transport_channel_); | 106 RTC_DCHECK(quic_transport_channel_); |
105 cricket::ReliableQuicStream* stream = | 107 cricket::ReliableQuicStream* stream = |
106 quic_transport_channel_->CreateQuicStream(); | 108 quic_transport_channel_->CreateQuicStream(); |
107 RTC_DCHECK(stream); | 109 RTC_DCHECK(stream); |
108 | 110 |
109 // Send the header with a FIN if the message is empty. | 111 // Send the header with a FIN if the message is empty. |
(...skipping 139 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
249 this, &QuicDataChannel::OnReadyToSend); | 251 this, &QuicDataChannel::OnReadyToSend); |
250 quic_transport_channel_->SignalClosed.connect( | 252 quic_transport_channel_->SignalClosed.connect( |
251 this, &QuicDataChannel::OnConnectionClosed); | 253 this, &QuicDataChannel::OnConnectionClosed); |
252 if (quic_transport_channel_->writable()) { | 254 if (quic_transport_channel_->writable()) { |
253 return kOpen; | 255 return kOpen; |
254 } | 256 } |
255 return kConnecting; | 257 return kConnecting; |
256 } | 258 } |
257 | 259 |
258 void QuicDataChannel::OnIncomingMessage(Message&& message) { | 260 void QuicDataChannel::OnIncomingMessage(Message&& message) { |
259 RTC_DCHECK(worker_thread_->IsCurrent()); | 261 RTC_DCHECK(network_thread_->IsCurrent()); |
260 RTC_DCHECK(message.stream); | 262 RTC_DCHECK(message.stream); |
261 if (!observer_) { | 263 if (!observer_) { |
262 LOG(LS_WARNING) << "QUIC data channel " << id_ | 264 LOG(LS_WARNING) << "QUIC data channel " << id_ |
263 << " received a message but has no observer."; | 265 << " received a message but has no observer."; |
264 message.stream->Close(); | 266 message.stream->Close(); |
265 return; | 267 return; |
266 } | 268 } |
267 // A FIN is received if the message fits into a single QUIC stream frame and | 269 // A FIN is received if the message fits into a single QUIC stream frame and |
268 // the remote peer is done sending. | 270 // the remote peer is done sending. |
269 if (message.stream->fin_received()) { | 271 if (message.stream->fin_received()) { |
(...skipping 18 matching lines...) Expand all Loading... |
288 &QuicDataChannel::OnDataReceived); | 290 &QuicDataChannel::OnDataReceived); |
289 // The QUIC stream will be removed from |incoming_quic_messages_| once it | 291 // The QUIC stream will be removed from |incoming_quic_messages_| once it |
290 // closes. | 292 // closes. |
291 message.stream->SignalClosed.connect( | 293 message.stream->SignalClosed.connect( |
292 this, &QuicDataChannel::OnIncomingQueuedStreamClosed); | 294 this, &QuicDataChannel::OnIncomingQueuedStreamClosed); |
293 } | 295 } |
294 | 296 |
295 void QuicDataChannel::OnDataReceived(net::QuicStreamId stream_id, | 297 void QuicDataChannel::OnDataReceived(net::QuicStreamId stream_id, |
296 const char* data, | 298 const char* data, |
297 size_t len) { | 299 size_t len) { |
298 RTC_DCHECK(worker_thread_->IsCurrent()); | 300 RTC_DCHECK(network_thread_->IsCurrent()); |
299 RTC_DCHECK(data); | 301 RTC_DCHECK(data); |
300 const auto& kv = incoming_quic_messages_.find(stream_id); | 302 const auto& kv = incoming_quic_messages_.find(stream_id); |
301 if (kv == incoming_quic_messages_.end()) { | 303 if (kv == incoming_quic_messages_.end()) { |
302 RTC_DCHECK(false); | 304 RTC_DCHECK(false); |
303 return; | 305 return; |
304 } | 306 } |
305 Message& message = kv->second; | 307 Message& message = kv->second; |
306 cricket::ReliableQuicStream* stream = message.stream; | 308 cricket::ReliableQuicStream* stream = message.stream; |
307 rtc::CopyOnWriteBuffer& received_data = message.buffer; | 309 rtc::CopyOnWriteBuffer& received_data = message.buffer; |
308 // If the QUIC stream has not received a FIN, then the remote peer is not | 310 // If the QUIC stream has not received a FIN, then the remote peer is not |
309 // finished sending data. | 311 // finished sending data. |
310 if (!stream->fin_received()) { | 312 if (!stream->fin_received()) { |
311 received_data.AppendData(data, len); | 313 received_data.AppendData(data, len); |
312 return; | 314 return; |
313 } | 315 } |
314 // Otherwise we are done receiving and can provide the data channel observer | 316 // Otherwise we are done receiving and can provide the data channel observer |
315 // with the message. | 317 // with the message. |
316 LOG(LS_INFO) << "Stream " << stream_id | 318 LOG(LS_INFO) << "Stream " << stream_id |
317 << " has finished receiving data for QUIC data channel " << id_; | 319 << " has finished receiving data for QUIC data channel " << id_; |
318 received_data.AppendData(data, len); | 320 received_data.AppendData(data, len); |
319 DataBuffer final_message(std::move(received_data), false); | 321 DataBuffer final_message(std::move(received_data), false); |
320 invoker_.AsyncInvoke<void>( | 322 invoker_.AsyncInvoke<void>( |
321 RTC_FROM_HERE, signaling_thread_, | 323 RTC_FROM_HERE, signaling_thread_, |
322 rtc::Bind(&QuicDataChannel::OnMessage_s, this, std::move(final_message))); | 324 rtc::Bind(&QuicDataChannel::OnMessage_s, this, std::move(final_message))); |
323 // Once the stream is closed, OnDataReceived will not fire for the stream. | 325 // Once the stream is closed, OnDataReceived will not fire for the stream. |
324 stream->Close(); | 326 stream->Close(); |
325 } | 327 } |
326 | 328 |
327 void QuicDataChannel::OnReadyToSend(cricket::TransportChannel* channel) { | 329 void QuicDataChannel::OnReadyToSend(cricket::TransportChannel* channel) { |
328 RTC_DCHECK(worker_thread_->IsCurrent()); | 330 RTC_DCHECK(network_thread_->IsCurrent()); |
329 RTC_DCHECK(channel == quic_transport_channel_); | 331 RTC_DCHECK(channel == quic_transport_channel_); |
330 LOG(LS_INFO) << "QuicTransportChannel is ready to send"; | 332 LOG(LS_INFO) << "QuicTransportChannel is ready to send"; |
331 invoker_.AsyncInvoke<void>( | 333 invoker_.AsyncInvoke<void>( |
332 RTC_FROM_HERE, signaling_thread_, | 334 RTC_FROM_HERE, signaling_thread_, |
333 rtc::Bind(&QuicDataChannel::SetState_s, this, kOpen)); | 335 rtc::Bind(&QuicDataChannel::SetState_s, this, kOpen)); |
334 } | 336 } |
335 | 337 |
336 void QuicDataChannel::OnWriteBlockedStreamClosed(net::QuicStreamId stream_id, | 338 void QuicDataChannel::OnWriteBlockedStreamClosed(net::QuicStreamId stream_id, |
337 int error) { | 339 int error) { |
338 RTC_DCHECK(worker_thread_->IsCurrent()); | 340 RTC_DCHECK(worker_thread_->IsCurrent()); |
339 LOG(LS_VERBOSE) << "Write blocked stream " << stream_id << " is closed."; | 341 LOG(LS_VERBOSE) << "Write blocked stream " << stream_id << " is closed."; |
340 write_blocked_quic_streams_.erase(stream_id); | 342 write_blocked_quic_streams_.erase(stream_id); |
341 } | 343 } |
342 | 344 |
343 void QuicDataChannel::OnIncomingQueuedStreamClosed(net::QuicStreamId stream_id, | 345 void QuicDataChannel::OnIncomingQueuedStreamClosed(net::QuicStreamId stream_id, |
344 int error) { | 346 int error) { |
345 RTC_DCHECK(worker_thread_->IsCurrent()); | 347 RTC_DCHECK(network_thread_->IsCurrent()); |
346 LOG(LS_VERBOSE) << "Incoming queued stream " << stream_id << " is closed."; | 348 LOG(LS_VERBOSE) << "Incoming queued stream " << stream_id << " is closed."; |
347 incoming_quic_messages_.erase(stream_id); | 349 incoming_quic_messages_.erase(stream_id); |
348 } | 350 } |
349 | 351 |
350 void QuicDataChannel::OnConnectionClosed() { | 352 void QuicDataChannel::OnConnectionClosed() { |
351 RTC_DCHECK(worker_thread_->IsCurrent()); | 353 RTC_DCHECK(worker_thread_->IsCurrent()); |
352 invoker_.AsyncInvoke<void>(RTC_FROM_HERE, signaling_thread_, | 354 invoker_.AsyncInvoke<void>(RTC_FROM_HERE, signaling_thread_, |
353 rtc::Bind(&QuicDataChannel::Close, this)); | 355 rtc::Bind(&QuicDataChannel::Close, this)); |
354 } | 356 } |
355 | 357 |
(...skipping 29 matching lines...) Expand all Loading... |
385 | 387 |
386 size_t QuicDataChannel::GetNumWriteBlockedStreams() const { | 388 size_t QuicDataChannel::GetNumWriteBlockedStreams() const { |
387 return write_blocked_quic_streams_.size(); | 389 return write_blocked_quic_streams_.size(); |
388 } | 390 } |
389 | 391 |
390 size_t QuicDataChannel::GetNumIncomingStreams() const { | 392 size_t QuicDataChannel::GetNumIncomingStreams() const { |
391 return incoming_quic_messages_.size(); | 393 return incoming_quic_messages_.size(); |
392 } | 394 } |
393 | 395 |
394 } // namespace webrtc | 396 } // namespace webrtc |
OLD | NEW |