| OLD | NEW |
| 1 /* | 1 /* |
| 2 * Copyright 2016 The WebRTC project authors. All Rights Reserved. | 2 * Copyright 2016 The WebRTC project authors. All Rights Reserved. |
| 3 * | 3 * |
| 4 * Use of this source code is governed by a BSD-style license | 4 * Use of this source code is governed by a BSD-style license |
| 5 * that can be found in the LICENSE file in the root of the source | 5 * that can be found in the LICENSE file in the root of the source |
| 6 * tree. An additional intellectual property rights grant can be found | 6 * tree. An additional intellectual property rights grant can be found |
| 7 * in the file PATENTS. All contributing project authors may | 7 * in the file PATENTS. All contributing project authors may |
| 8 * be found in the AUTHORS file in the root of the source tree. | 8 * be found in the AUTHORS file in the root of the source tree. |
| 9 */ | 9 */ |
| 10 | 10 |
| (...skipping 74 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 85 } | 85 } |
| 86 | 86 |
| 87 bool QuicDataChannel::Send(const DataBuffer& buffer) { | 87 bool QuicDataChannel::Send(const DataBuffer& buffer) { |
| 88 RTC_DCHECK(signaling_thread_->IsCurrent()); | 88 RTC_DCHECK(signaling_thread_->IsCurrent()); |
| 89 if (state_ != kOpen) { | 89 if (state_ != kOpen) { |
| 90 LOG(LS_ERROR) << "QUIC data channel " << id_ | 90 LOG(LS_ERROR) << "QUIC data channel " << id_ |
| 91 << " is not open so cannot send."; | 91 << " is not open so cannot send."; |
| 92 return false; | 92 return false; |
| 93 } | 93 } |
| 94 return worker_thread_->Invoke<bool>( | 94 return worker_thread_->Invoke<bool>( |
| 95 rtc::Bind(&QuicDataChannel::Send_w, this, buffer)); | 95 RTC_FROM_HERE, rtc::Bind(&QuicDataChannel::Send_w, this, buffer)); |
| 96 } | 96 } |
| 97 | 97 |
| 98 bool QuicDataChannel::Send_w(const DataBuffer& buffer) { | 98 bool QuicDataChannel::Send_w(const DataBuffer& buffer) { |
| 99 RTC_DCHECK(worker_thread_->IsCurrent()); | 99 RTC_DCHECK(worker_thread_->IsCurrent()); |
| 100 | 100 |
| 101 // Encode and send the header containing the data channel ID and message ID. | 101 // Encode and send the header containing the data channel ID and message ID. |
| 102 rtc::CopyOnWriteBuffer header; | 102 rtc::CopyOnWriteBuffer header; |
| 103 WriteQuicDataChannelMessageHeader(id_, ++next_message_id_, &header); | 103 WriteQuicDataChannelMessageHeader(id_, ++next_message_id_, &header); |
| 104 RTC_DCHECK(quic_transport_channel_); | 104 RTC_DCHECK(quic_transport_channel_); |
| 105 cricket::ReliableQuicStream* stream = | 105 cricket::ReliableQuicStream* stream = |
| (...skipping 74 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 180 LOG(LS_INFO) << "Stream " << stream->id() | 180 LOG(LS_INFO) << "Stream " << stream->id() |
| 181 << " successfully wrote data for QUIC data channel " << id_; | 181 << " successfully wrote data for QUIC data channel " << id_; |
| 182 stream->Close(); | 182 stream->Close(); |
| 183 } | 183 } |
| 184 } | 184 } |
| 185 | 185 |
| 186 void QuicDataChannel::SetBufferedAmount_w(uint64_t buffered_amount) { | 186 void QuicDataChannel::SetBufferedAmount_w(uint64_t buffered_amount) { |
| 187 RTC_DCHECK(worker_thread_->IsCurrent()); | 187 RTC_DCHECK(worker_thread_->IsCurrent()); |
| 188 buffered_amount_ = buffered_amount; | 188 buffered_amount_ = buffered_amount; |
| 189 invoker_.AsyncInvoke<void>( | 189 invoker_.AsyncInvoke<void>( |
| 190 signaling_thread_, rtc::Bind(&QuicDataChannel::OnBufferedAmountChange_s, | 190 RTC_FROM_HERE, signaling_thread_, |
| 191 this, buffered_amount)); | 191 rtc::Bind(&QuicDataChannel::OnBufferedAmountChange_s, this, |
| 192 buffered_amount)); |
| 192 } | 193 } |
| 193 | 194 |
| 194 void QuicDataChannel::Close() { | 195 void QuicDataChannel::Close() { |
| 195 RTC_DCHECK(signaling_thread_->IsCurrent()); | 196 RTC_DCHECK(signaling_thread_->IsCurrent()); |
| 196 if (state_ == kClosed || state_ == kClosing) { | 197 if (state_ == kClosed || state_ == kClosing) { |
| 197 return; | 198 return; |
| 198 } | 199 } |
| 199 LOG(LS_INFO) << "Closing QUIC data channel."; | 200 LOG(LS_INFO) << "Closing QUIC data channel."; |
| 200 SetState_s(kClosing); | 201 SetState_s(kClosing); |
| 201 worker_thread_->Invoke<void>(rtc::Bind(&QuicDataChannel::Close_w, this)); | 202 worker_thread_->Invoke<void>(RTC_FROM_HERE, |
| 203 rtc::Bind(&QuicDataChannel::Close_w, this)); |
| 202 SetState_s(kClosed); | 204 SetState_s(kClosed); |
| 203 } | 205 } |
| 204 | 206 |
| 205 void QuicDataChannel::Close_w() { | 207 void QuicDataChannel::Close_w() { |
| 206 RTC_DCHECK(worker_thread_->IsCurrent()); | 208 RTC_DCHECK(worker_thread_->IsCurrent()); |
| 207 for (auto& kv : incoming_quic_messages_) { | 209 for (auto& kv : incoming_quic_messages_) { |
| 208 Message& message = kv.second; | 210 Message& message = kv.second; |
| 209 cricket::ReliableQuicStream* stream = message.stream; | 211 cricket::ReliableQuicStream* stream = message.stream; |
| 210 stream->Close(); | 212 stream->Close(); |
| 211 } | 213 } |
| (...skipping 17 matching lines...) Expand all Loading... |
| 229 LOG(LS_WARNING) << "Ignoring duplicate transport channel."; | 231 LOG(LS_WARNING) << "Ignoring duplicate transport channel."; |
| 230 return true; | 232 return true; |
| 231 } | 233 } |
| 232 LOG(LS_ERROR) << "|channel| does not match existing transport channel."; | 234 LOG(LS_ERROR) << "|channel| does not match existing transport channel."; |
| 233 return false; | 235 return false; |
| 234 } | 236 } |
| 235 | 237 |
| 236 quic_transport_channel_ = channel; | 238 quic_transport_channel_ = channel; |
| 237 LOG(LS_INFO) << "Setting QuicTransportChannel for QUIC data channel " << id_; | 239 LOG(LS_INFO) << "Setting QuicTransportChannel for QUIC data channel " << id_; |
| 238 DataState data_channel_state = worker_thread_->Invoke<DataState>( | 240 DataState data_channel_state = worker_thread_->Invoke<DataState>( |
| 239 rtc::Bind(&QuicDataChannel::SetTransportChannel_w, this)); | 241 RTC_FROM_HERE, rtc::Bind(&QuicDataChannel::SetTransportChannel_w, this)); |
| 240 SetState_s(data_channel_state); | 242 SetState_s(data_channel_state); |
| 241 return true; | 243 return true; |
| 242 } | 244 } |
| 243 | 245 |
| 244 DataChannelInterface::DataState QuicDataChannel::SetTransportChannel_w() { | 246 DataChannelInterface::DataState QuicDataChannel::SetTransportChannel_w() { |
| 245 RTC_DCHECK(worker_thread_->IsCurrent()); | 247 RTC_DCHECK(worker_thread_->IsCurrent()); |
| 246 quic_transport_channel_->SignalReadyToSend.connect( | 248 quic_transport_channel_->SignalReadyToSend.connect( |
| 247 this, &QuicDataChannel::OnReadyToSend); | 249 this, &QuicDataChannel::OnReadyToSend); |
| 248 quic_transport_channel_->SignalClosed.connect( | 250 quic_transport_channel_->SignalClosed.connect( |
| 249 this, &QuicDataChannel::OnConnectionClosed); | 251 this, &QuicDataChannel::OnConnectionClosed); |
| (...skipping 12 matching lines...) Expand all Loading... |
| 262 message.stream->Close(); | 264 message.stream->Close(); |
| 263 return; | 265 return; |
| 264 } | 266 } |
| 265 // A FIN is received if the message fits into a single QUIC stream frame and | 267 // A FIN is received if the message fits into a single QUIC stream frame and |
| 266 // the remote peer is done sending. | 268 // the remote peer is done sending. |
| 267 if (message.stream->fin_received()) { | 269 if (message.stream->fin_received()) { |
| 268 LOG(LS_INFO) << "Stream " << message.stream->id() | 270 LOG(LS_INFO) << "Stream " << message.stream->id() |
| 269 << " has finished receiving data for QUIC data channel " | 271 << " has finished receiving data for QUIC data channel " |
| 270 << id_; | 272 << id_; |
| 271 DataBuffer final_message(message.buffer, false); | 273 DataBuffer final_message(message.buffer, false); |
| 272 invoker_.AsyncInvoke<void>(signaling_thread_, | 274 invoker_.AsyncInvoke<void>(RTC_FROM_HERE, signaling_thread_, |
| 273 rtc::Bind(&QuicDataChannel::OnMessage_s, this, | 275 rtc::Bind(&QuicDataChannel::OnMessage_s, this, |
| 274 std::move(final_message))); | 276 std::move(final_message))); |
| 275 message.stream->Close(); | 277 message.stream->Close(); |
| 276 return; | 278 return; |
| 277 } | 279 } |
| 278 // Otherwise the message is divided across multiple QUIC stream frames, so | 280 // Otherwise the message is divided across multiple QUIC stream frames, so |
| 279 // queue the data. OnDataReceived() will be called each time the remaining | 281 // queue the data. OnDataReceived() will be called each time the remaining |
| 280 // QUIC stream frames arrive. | 282 // QUIC stream frames arrive. |
| 281 LOG(LS_INFO) << "QUIC data channel " << id_ | 283 LOG(LS_INFO) << "QUIC data channel " << id_ |
| 282 << " is queuing incoming data for stream " | 284 << " is queuing incoming data for stream " |
| (...skipping 26 matching lines...) Expand all Loading... |
| 309 received_data.AppendData(data, len); | 311 received_data.AppendData(data, len); |
| 310 return; | 312 return; |
| 311 } | 313 } |
| 312 // Otherwise we are done receiving and can provide the data channel observer | 314 // Otherwise we are done receiving and can provide the data channel observer |
| 313 // with the message. | 315 // with the message. |
| 314 LOG(LS_INFO) << "Stream " << stream_id | 316 LOG(LS_INFO) << "Stream " << stream_id |
| 315 << " has finished receiving data for QUIC data channel " << id_; | 317 << " has finished receiving data for QUIC data channel " << id_; |
| 316 received_data.AppendData(data, len); | 318 received_data.AppendData(data, len); |
| 317 DataBuffer final_message(std::move(received_data), false); | 319 DataBuffer final_message(std::move(received_data), false); |
| 318 invoker_.AsyncInvoke<void>( | 320 invoker_.AsyncInvoke<void>( |
| 319 signaling_thread_, | 321 RTC_FROM_HERE, signaling_thread_, |
| 320 rtc::Bind(&QuicDataChannel::OnMessage_s, this, std::move(final_message))); | 322 rtc::Bind(&QuicDataChannel::OnMessage_s, this, std::move(final_message))); |
| 321 // Once the stream is closed, OnDataReceived will not fire for the stream. | 323 // Once the stream is closed, OnDataReceived will not fire for the stream. |
| 322 stream->Close(); | 324 stream->Close(); |
| 323 } | 325 } |
| 324 | 326 |
| 325 void QuicDataChannel::OnReadyToSend(cricket::TransportChannel* channel) { | 327 void QuicDataChannel::OnReadyToSend(cricket::TransportChannel* channel) { |
| 326 RTC_DCHECK(worker_thread_->IsCurrent()); | 328 RTC_DCHECK(worker_thread_->IsCurrent()); |
| 327 RTC_DCHECK(channel == quic_transport_channel_); | 329 RTC_DCHECK(channel == quic_transport_channel_); |
| 328 LOG(LS_INFO) << "QuicTransportChannel is ready to send"; | 330 LOG(LS_INFO) << "QuicTransportChannel is ready to send"; |
| 329 invoker_.AsyncInvoke<void>( | 331 invoker_.AsyncInvoke<void>( |
| 330 signaling_thread_, rtc::Bind(&QuicDataChannel::SetState_s, this, kOpen)); | 332 RTC_FROM_HERE, signaling_thread_, |
| 333 rtc::Bind(&QuicDataChannel::SetState_s, this, kOpen)); |
| 331 } | 334 } |
| 332 | 335 |
| 333 void QuicDataChannel::OnWriteBlockedStreamClosed(net::QuicStreamId stream_id, | 336 void QuicDataChannel::OnWriteBlockedStreamClosed(net::QuicStreamId stream_id, |
| 334 int error) { | 337 int error) { |
| 335 RTC_DCHECK(worker_thread_->IsCurrent()); | 338 RTC_DCHECK(worker_thread_->IsCurrent()); |
| 336 LOG(LS_VERBOSE) << "Write blocked stream " << stream_id << " is closed."; | 339 LOG(LS_VERBOSE) << "Write blocked stream " << stream_id << " is closed."; |
| 337 write_blocked_quic_streams_.erase(stream_id); | 340 write_blocked_quic_streams_.erase(stream_id); |
| 338 } | 341 } |
| 339 | 342 |
| 340 void QuicDataChannel::OnIncomingQueuedStreamClosed(net::QuicStreamId stream_id, | 343 void QuicDataChannel::OnIncomingQueuedStreamClosed(net::QuicStreamId stream_id, |
| 341 int error) { | 344 int error) { |
| 342 RTC_DCHECK(worker_thread_->IsCurrent()); | 345 RTC_DCHECK(worker_thread_->IsCurrent()); |
| 343 LOG(LS_VERBOSE) << "Incoming queued stream " << stream_id << " is closed."; | 346 LOG(LS_VERBOSE) << "Incoming queued stream " << stream_id << " is closed."; |
| 344 incoming_quic_messages_.erase(stream_id); | 347 incoming_quic_messages_.erase(stream_id); |
| 345 } | 348 } |
| 346 | 349 |
| 347 void QuicDataChannel::OnConnectionClosed() { | 350 void QuicDataChannel::OnConnectionClosed() { |
| 348 RTC_DCHECK(worker_thread_->IsCurrent()); | 351 RTC_DCHECK(worker_thread_->IsCurrent()); |
| 349 invoker_.AsyncInvoke<void>(signaling_thread_, | 352 invoker_.AsyncInvoke<void>(RTC_FROM_HERE, signaling_thread_, |
| 350 rtc::Bind(&QuicDataChannel::Close, this)); | 353 rtc::Bind(&QuicDataChannel::Close, this)); |
| 351 } | 354 } |
| 352 | 355 |
| 353 void QuicDataChannel::OnMessage_s(const DataBuffer& received_data) { | 356 void QuicDataChannel::OnMessage_s(const DataBuffer& received_data) { |
| 354 RTC_DCHECK(signaling_thread_->IsCurrent()); | 357 RTC_DCHECK(signaling_thread_->IsCurrent()); |
| 355 if (observer_) { | 358 if (observer_) { |
| 356 observer_->OnMessage(received_data); | 359 observer_->OnMessage(received_data); |
| 357 } | 360 } |
| 358 } | 361 } |
| 359 | 362 |
| (...skipping 22 matching lines...) Expand all Loading... |
| 382 | 385 |
| 383 size_t QuicDataChannel::GetNumWriteBlockedStreams() const { | 386 size_t QuicDataChannel::GetNumWriteBlockedStreams() const { |
| 384 return write_blocked_quic_streams_.size(); | 387 return write_blocked_quic_streams_.size(); |
| 385 } | 388 } |
| 386 | 389 |
| 387 size_t QuicDataChannel::GetNumIncomingStreams() const { | 390 size_t QuicDataChannel::GetNumIncomingStreams() const { |
| 388 return incoming_quic_messages_.size(); | 391 return incoming_quic_messages_.size(); |
| 389 } | 392 } |
| 390 | 393 |
| 391 } // namespace webrtc | 394 } // namespace webrtc |
| OLD | NEW |