OLD | NEW |
1 /* | 1 /* |
2 * Copyright 2016 The WebRTC project authors. All Rights Reserved. | 2 * Copyright 2016 The WebRTC project authors. All Rights Reserved. |
3 * | 3 * |
4 * Use of this source code is governed by a BSD-style license | 4 * Use of this source code is governed by a BSD-style license |
5 * that can be found in the LICENSE file in the root of the source | 5 * that can be found in the LICENSE file in the root of the source |
6 * tree. An additional intellectual property rights grant can be found | 6 * tree. An additional intellectual property rights grant can be found |
7 * in the file PATENTS. All contributing project authors may | 7 * in the file PATENTS. All contributing project authors may |
8 * be found in the AUTHORS file in the root of the source tree. | 8 * be found in the AUTHORS file in the root of the source tree. |
9 */ | 9 */ |
10 | 10 |
(...skipping 74 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
85 } | 85 } |
86 | 86 |
87 bool QuicDataChannel::Send(const DataBuffer& buffer) { | 87 bool QuicDataChannel::Send(const DataBuffer& buffer) { |
88 RTC_DCHECK(signaling_thread_->IsCurrent()); | 88 RTC_DCHECK(signaling_thread_->IsCurrent()); |
89 if (state_ != kOpen) { | 89 if (state_ != kOpen) { |
90 LOG(LS_ERROR) << "QUIC data channel " << id_ | 90 LOG(LS_ERROR) << "QUIC data channel " << id_ |
91 << " is not open so cannot send."; | 91 << " is not open so cannot send."; |
92 return false; | 92 return false; |
93 } | 93 } |
94 return worker_thread_->Invoke<bool>( | 94 return worker_thread_->Invoke<bool>( |
95 rtc::Bind(&QuicDataChannel::Send_w, this, buffer)); | 95 RTC_FROM_HERE, rtc::Bind(&QuicDataChannel::Send_w, this, buffer)); |
96 } | 96 } |
97 | 97 |
98 bool QuicDataChannel::Send_w(const DataBuffer& buffer) { | 98 bool QuicDataChannel::Send_w(const DataBuffer& buffer) { |
99 RTC_DCHECK(worker_thread_->IsCurrent()); | 99 RTC_DCHECK(worker_thread_->IsCurrent()); |
100 | 100 |
101 // Encode and send the header containing the data channel ID and message ID. | 101 // Encode and send the header containing the data channel ID and message ID. |
102 rtc::CopyOnWriteBuffer header; | 102 rtc::CopyOnWriteBuffer header; |
103 WriteQuicDataChannelMessageHeader(id_, ++next_message_id_, &header); | 103 WriteQuicDataChannelMessageHeader(id_, ++next_message_id_, &header); |
104 RTC_DCHECK(quic_transport_channel_); | 104 RTC_DCHECK(quic_transport_channel_); |
105 cricket::ReliableQuicStream* stream = | 105 cricket::ReliableQuicStream* stream = |
(...skipping 74 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
180 LOG(LS_INFO) << "Stream " << stream->id() | 180 LOG(LS_INFO) << "Stream " << stream->id() |
181 << " successfully wrote data for QUIC data channel " << id_; | 181 << " successfully wrote data for QUIC data channel " << id_; |
182 stream->Close(); | 182 stream->Close(); |
183 } | 183 } |
184 } | 184 } |
185 | 185 |
186 void QuicDataChannel::SetBufferedAmount_w(uint64_t buffered_amount) { | 186 void QuicDataChannel::SetBufferedAmount_w(uint64_t buffered_amount) { |
187 RTC_DCHECK(worker_thread_->IsCurrent()); | 187 RTC_DCHECK(worker_thread_->IsCurrent()); |
188 buffered_amount_ = buffered_amount; | 188 buffered_amount_ = buffered_amount; |
189 invoker_.AsyncInvoke<void>( | 189 invoker_.AsyncInvoke<void>( |
190 signaling_thread_, rtc::Bind(&QuicDataChannel::OnBufferedAmountChange_s, | 190 RTC_FROM_HERE, signaling_thread_, |
191 this, buffered_amount)); | 191 rtc::Bind(&QuicDataChannel::OnBufferedAmountChange_s, this, |
| 192 buffered_amount)); |
192 } | 193 } |
193 | 194 |
194 void QuicDataChannel::Close() { | 195 void QuicDataChannel::Close() { |
195 RTC_DCHECK(signaling_thread_->IsCurrent()); | 196 RTC_DCHECK(signaling_thread_->IsCurrent()); |
196 if (state_ == kClosed || state_ == kClosing) { | 197 if (state_ == kClosed || state_ == kClosing) { |
197 return; | 198 return; |
198 } | 199 } |
199 LOG(LS_INFO) << "Closing QUIC data channel."; | 200 LOG(LS_INFO) << "Closing QUIC data channel."; |
200 SetState_s(kClosing); | 201 SetState_s(kClosing); |
201 worker_thread_->Invoke<void>(rtc::Bind(&QuicDataChannel::Close_w, this)); | 202 worker_thread_->Invoke<void>(RTC_FROM_HERE, |
| 203 rtc::Bind(&QuicDataChannel::Close_w, this)); |
202 SetState_s(kClosed); | 204 SetState_s(kClosed); |
203 } | 205 } |
204 | 206 |
205 void QuicDataChannel::Close_w() { | 207 void QuicDataChannel::Close_w() { |
206 RTC_DCHECK(worker_thread_->IsCurrent()); | 208 RTC_DCHECK(worker_thread_->IsCurrent()); |
207 for (auto& kv : incoming_quic_messages_) { | 209 for (auto& kv : incoming_quic_messages_) { |
208 Message& message = kv.second; | 210 Message& message = kv.second; |
209 cricket::ReliableQuicStream* stream = message.stream; | 211 cricket::ReliableQuicStream* stream = message.stream; |
210 stream->Close(); | 212 stream->Close(); |
211 } | 213 } |
(...skipping 17 matching lines...) Expand all Loading... |
229 LOG(LS_WARNING) << "Ignoring duplicate transport channel."; | 231 LOG(LS_WARNING) << "Ignoring duplicate transport channel."; |
230 return true; | 232 return true; |
231 } | 233 } |
232 LOG(LS_ERROR) << "|channel| does not match existing transport channel."; | 234 LOG(LS_ERROR) << "|channel| does not match existing transport channel."; |
233 return false; | 235 return false; |
234 } | 236 } |
235 | 237 |
236 quic_transport_channel_ = channel; | 238 quic_transport_channel_ = channel; |
237 LOG(LS_INFO) << "Setting QuicTransportChannel for QUIC data channel " << id_; | 239 LOG(LS_INFO) << "Setting QuicTransportChannel for QUIC data channel " << id_; |
238 DataState data_channel_state = worker_thread_->Invoke<DataState>( | 240 DataState data_channel_state = worker_thread_->Invoke<DataState>( |
239 rtc::Bind(&QuicDataChannel::SetTransportChannel_w, this)); | 241 RTC_FROM_HERE, rtc::Bind(&QuicDataChannel::SetTransportChannel_w, this)); |
240 SetState_s(data_channel_state); | 242 SetState_s(data_channel_state); |
241 return true; | 243 return true; |
242 } | 244 } |
243 | 245 |
244 DataChannelInterface::DataState QuicDataChannel::SetTransportChannel_w() { | 246 DataChannelInterface::DataState QuicDataChannel::SetTransportChannel_w() { |
245 RTC_DCHECK(worker_thread_->IsCurrent()); | 247 RTC_DCHECK(worker_thread_->IsCurrent()); |
246 quic_transport_channel_->SignalReadyToSend.connect( | 248 quic_transport_channel_->SignalReadyToSend.connect( |
247 this, &QuicDataChannel::OnReadyToSend); | 249 this, &QuicDataChannel::OnReadyToSend); |
248 quic_transport_channel_->SignalClosed.connect( | 250 quic_transport_channel_->SignalClosed.connect( |
249 this, &QuicDataChannel::OnConnectionClosed); | 251 this, &QuicDataChannel::OnConnectionClosed); |
(...skipping 12 matching lines...) Expand all Loading... |
262 message.stream->Close(); | 264 message.stream->Close(); |
263 return; | 265 return; |
264 } | 266 } |
265 // A FIN is received if the message fits into a single QUIC stream frame and | 267 // A FIN is received if the message fits into a single QUIC stream frame and |
266 // the remote peer is done sending. | 268 // the remote peer is done sending. |
267 if (message.stream->fin_received()) { | 269 if (message.stream->fin_received()) { |
268 LOG(LS_INFO) << "Stream " << message.stream->id() | 270 LOG(LS_INFO) << "Stream " << message.stream->id() |
269 << " has finished receiving data for QUIC data channel " | 271 << " has finished receiving data for QUIC data channel " |
270 << id_; | 272 << id_; |
271 DataBuffer final_message(message.buffer, false); | 273 DataBuffer final_message(message.buffer, false); |
272 invoker_.AsyncInvoke<void>(signaling_thread_, | 274 invoker_.AsyncInvoke<void>(RTC_FROM_HERE, signaling_thread_, |
273 rtc::Bind(&QuicDataChannel::OnMessage_s, this, | 275 rtc::Bind(&QuicDataChannel::OnMessage_s, this, |
274 std::move(final_message))); | 276 std::move(final_message))); |
275 message.stream->Close(); | 277 message.stream->Close(); |
276 return; | 278 return; |
277 } | 279 } |
278 // Otherwise the message is divided across multiple QUIC stream frames, so | 280 // Otherwise the message is divided across multiple QUIC stream frames, so |
279 // queue the data. OnDataReceived() will be called each time the remaining | 281 // queue the data. OnDataReceived() will be called each time the remaining |
280 // QUIC stream frames arrive. | 282 // QUIC stream frames arrive. |
281 LOG(LS_INFO) << "QUIC data channel " << id_ | 283 LOG(LS_INFO) << "QUIC data channel " << id_ |
282 << " is queuing incoming data for stream " | 284 << " is queuing incoming data for stream " |
(...skipping 26 matching lines...) Expand all Loading... |
309 received_data.AppendData(data, len); | 311 received_data.AppendData(data, len); |
310 return; | 312 return; |
311 } | 313 } |
312 // Otherwise we are done receiving and can provide the data channel observer | 314 // Otherwise we are done receiving and can provide the data channel observer |
313 // with the message. | 315 // with the message. |
314 LOG(LS_INFO) << "Stream " << stream_id | 316 LOG(LS_INFO) << "Stream " << stream_id |
315 << " has finished receiving data for QUIC data channel " << id_; | 317 << " has finished receiving data for QUIC data channel " << id_; |
316 received_data.AppendData(data, len); | 318 received_data.AppendData(data, len); |
317 DataBuffer final_message(std::move(received_data), false); | 319 DataBuffer final_message(std::move(received_data), false); |
318 invoker_.AsyncInvoke<void>( | 320 invoker_.AsyncInvoke<void>( |
319 signaling_thread_, | 321 RTC_FROM_HERE, signaling_thread_, |
320 rtc::Bind(&QuicDataChannel::OnMessage_s, this, std::move(final_message))); | 322 rtc::Bind(&QuicDataChannel::OnMessage_s, this, std::move(final_message))); |
321 // Once the stream is closed, OnDataReceived will not fire for the stream. | 323 // Once the stream is closed, OnDataReceived will not fire for the stream. |
322 stream->Close(); | 324 stream->Close(); |
323 } | 325 } |
324 | 326 |
325 void QuicDataChannel::OnReadyToSend(cricket::TransportChannel* channel) { | 327 void QuicDataChannel::OnReadyToSend(cricket::TransportChannel* channel) { |
326 RTC_DCHECK(worker_thread_->IsCurrent()); | 328 RTC_DCHECK(worker_thread_->IsCurrent()); |
327 RTC_DCHECK(channel == quic_transport_channel_); | 329 RTC_DCHECK(channel == quic_transport_channel_); |
328 LOG(LS_INFO) << "QuicTransportChannel is ready to send"; | 330 LOG(LS_INFO) << "QuicTransportChannel is ready to send"; |
329 invoker_.AsyncInvoke<void>( | 331 invoker_.AsyncInvoke<void>( |
330 signaling_thread_, rtc::Bind(&QuicDataChannel::SetState_s, this, kOpen)); | 332 RTC_FROM_HERE, signaling_thread_, |
| 333 rtc::Bind(&QuicDataChannel::SetState_s, this, kOpen)); |
331 } | 334 } |
332 | 335 |
333 void QuicDataChannel::OnWriteBlockedStreamClosed(net::QuicStreamId stream_id, | 336 void QuicDataChannel::OnWriteBlockedStreamClosed(net::QuicStreamId stream_id, |
334 int error) { | 337 int error) { |
335 RTC_DCHECK(worker_thread_->IsCurrent()); | 338 RTC_DCHECK(worker_thread_->IsCurrent()); |
336 LOG(LS_VERBOSE) << "Write blocked stream " << stream_id << " is closed."; | 339 LOG(LS_VERBOSE) << "Write blocked stream " << stream_id << " is closed."; |
337 write_blocked_quic_streams_.erase(stream_id); | 340 write_blocked_quic_streams_.erase(stream_id); |
338 } | 341 } |
339 | 342 |
340 void QuicDataChannel::OnIncomingQueuedStreamClosed(net::QuicStreamId stream_id, | 343 void QuicDataChannel::OnIncomingQueuedStreamClosed(net::QuicStreamId stream_id, |
341 int error) { | 344 int error) { |
342 RTC_DCHECK(worker_thread_->IsCurrent()); | 345 RTC_DCHECK(worker_thread_->IsCurrent()); |
343 LOG(LS_VERBOSE) << "Incoming queued stream " << stream_id << " is closed."; | 346 LOG(LS_VERBOSE) << "Incoming queued stream " << stream_id << " is closed."; |
344 incoming_quic_messages_.erase(stream_id); | 347 incoming_quic_messages_.erase(stream_id); |
345 } | 348 } |
346 | 349 |
347 void QuicDataChannel::OnConnectionClosed() { | 350 void QuicDataChannel::OnConnectionClosed() { |
348 RTC_DCHECK(worker_thread_->IsCurrent()); | 351 RTC_DCHECK(worker_thread_->IsCurrent()); |
349 invoker_.AsyncInvoke<void>(signaling_thread_, | 352 invoker_.AsyncInvoke<void>(RTC_FROM_HERE, signaling_thread_, |
350 rtc::Bind(&QuicDataChannel::Close, this)); | 353 rtc::Bind(&QuicDataChannel::Close, this)); |
351 } | 354 } |
352 | 355 |
353 void QuicDataChannel::OnMessage_s(const DataBuffer& received_data) { | 356 void QuicDataChannel::OnMessage_s(const DataBuffer& received_data) { |
354 RTC_DCHECK(signaling_thread_->IsCurrent()); | 357 RTC_DCHECK(signaling_thread_->IsCurrent()); |
355 if (observer_) { | 358 if (observer_) { |
356 observer_->OnMessage(received_data); | 359 observer_->OnMessage(received_data); |
357 } | 360 } |
358 } | 361 } |
359 | 362 |
(...skipping 22 matching lines...) Expand all Loading... |
382 | 385 |
383 size_t QuicDataChannel::GetNumWriteBlockedStreams() const { | 386 size_t QuicDataChannel::GetNumWriteBlockedStreams() const { |
384 return write_blocked_quic_streams_.size(); | 387 return write_blocked_quic_streams_.size(); |
385 } | 388 } |
386 | 389 |
387 size_t QuicDataChannel::GetNumIncomingStreams() const { | 390 size_t QuicDataChannel::GetNumIncomingStreams() const { |
388 return incoming_quic_messages_.size(); | 391 return incoming_quic_messages_.size(); |
389 } | 392 } |
390 | 393 |
391 } // namespace webrtc | 394 } // namespace webrtc |
OLD | NEW |