OLD | NEW |
| (Empty) |
1 /* | |
2 * Copyright 2004 The WebRTC Project Authors. All rights reserved. | |
3 * | |
4 * Use of this source code is governed by a BSD-style license | |
5 * that can be found in the LICENSE file in the root of the source | |
6 * tree. An additional intellectual property rights grant can be found | |
7 * in the file PATENTS. All contributing project authors may | |
8 * be found in the AUTHORS file in the root of the source tree. | |
9 */ | |
10 | |
11 #if defined(WEBRTC_POSIX) | |
12 #include <sys/file.h> | |
13 #endif // WEBRTC_POSIX | |
14 #include <sys/types.h> | |
15 #include <sys/stat.h> | |
16 #include <errno.h> | |
17 | |
18 #include <algorithm> | |
19 #include <string> | |
20 | |
21 #include "webrtc/base/basictypes.h" | |
22 #include "webrtc/base/checks.h" | |
23 #include "webrtc/base/logging.h" | |
24 #include "webrtc/base/messagequeue.h" | |
25 #include "webrtc/base/stream.h" | |
26 #include "webrtc/base/stringencode.h" | |
27 #include "webrtc/base/stringutils.h" | |
28 #include "webrtc/base/thread.h" | |
29 #include "webrtc/base/timeutils.h" | |
30 | |
31 #if defined(WEBRTC_WIN) | |
32 #include "webrtc/base/win32.h" | |
33 #define fileno _fileno | |
34 #endif | |
35 | |
36 namespace rtc { | |
37 | |
38 /////////////////////////////////////////////////////////////////////////////// | |
39 // StreamInterface | |
40 /////////////////////////////////////////////////////////////////////////////// | |
41 StreamInterface::~StreamInterface() { | |
42 } | |
43 | |
44 StreamResult StreamInterface::WriteAll(const void* data, size_t data_len, | |
45 size_t* written, int* error) { | |
46 StreamResult result = SR_SUCCESS; | |
47 size_t total_written = 0, current_written; | |
48 while (total_written < data_len) { | |
49 result = Write(static_cast<const char*>(data) + total_written, | |
50 data_len - total_written, ¤t_written, error); | |
51 if (result != SR_SUCCESS) | |
52 break; | |
53 total_written += current_written; | |
54 } | |
55 if (written) | |
56 *written = total_written; | |
57 return result; | |
58 } | |
59 | |
60 StreamResult StreamInterface::ReadAll(void* buffer, size_t buffer_len, | |
61 size_t* read, int* error) { | |
62 StreamResult result = SR_SUCCESS; | |
63 size_t total_read = 0, current_read; | |
64 while (total_read < buffer_len) { | |
65 result = Read(static_cast<char*>(buffer) + total_read, | |
66 buffer_len - total_read, ¤t_read, error); | |
67 if (result != SR_SUCCESS) | |
68 break; | |
69 total_read += current_read; | |
70 } | |
71 if (read) | |
72 *read = total_read; | |
73 return result; | |
74 } | |
75 | |
76 StreamResult StreamInterface::ReadLine(std::string* line) { | |
77 line->clear(); | |
78 StreamResult result = SR_SUCCESS; | |
79 while (true) { | |
80 char ch; | |
81 result = Read(&ch, sizeof(ch), nullptr, nullptr); | |
82 if (result != SR_SUCCESS) { | |
83 break; | |
84 } | |
85 if (ch == '\n') { | |
86 break; | |
87 } | |
88 line->push_back(ch); | |
89 } | |
90 if (!line->empty()) { // give back the line we've collected so far with | |
91 result = SR_SUCCESS; // a success code. Otherwise return the last code | |
92 } | |
93 return result; | |
94 } | |
95 | |
96 void StreamInterface::PostEvent(Thread* t, int events, int err) { | |
97 t->Post(RTC_FROM_HERE, this, MSG_POST_EVENT, | |
98 new StreamEventData(events, err)); | |
99 } | |
100 | |
101 void StreamInterface::PostEvent(int events, int err) { | |
102 PostEvent(Thread::Current(), events, err); | |
103 } | |
104 | |
105 const void* StreamInterface::GetReadData(size_t* data_len) { | |
106 return nullptr; | |
107 } | |
108 | |
109 void* StreamInterface::GetWriteBuffer(size_t* buf_len) { | |
110 return nullptr; | |
111 } | |
112 | |
113 bool StreamInterface::SetPosition(size_t position) { | |
114 return false; | |
115 } | |
116 | |
117 bool StreamInterface::GetPosition(size_t* position) const { | |
118 return false; | |
119 } | |
120 | |
121 bool StreamInterface::GetSize(size_t* size) const { | |
122 return false; | |
123 } | |
124 | |
125 bool StreamInterface::GetAvailable(size_t* size) const { | |
126 return false; | |
127 } | |
128 | |
129 bool StreamInterface::GetWriteRemaining(size_t* size) const { | |
130 return false; | |
131 } | |
132 | |
133 bool StreamInterface::Flush() { | |
134 return false; | |
135 } | |
136 | |
137 bool StreamInterface::ReserveSize(size_t size) { | |
138 return true; | |
139 } | |
140 | |
141 StreamInterface::StreamInterface() { | |
142 } | |
143 | |
144 void StreamInterface::OnMessage(Message* msg) { | |
145 if (MSG_POST_EVENT == msg->message_id) { | |
146 StreamEventData* pe = static_cast<StreamEventData*>(msg->pdata); | |
147 SignalEvent(this, pe->events, pe->error); | |
148 delete msg->pdata; | |
149 } | |
150 } | |
151 | |
152 /////////////////////////////////////////////////////////////////////////////// | |
153 // StreamAdapterInterface | |
154 /////////////////////////////////////////////////////////////////////////////// | |
155 | |
156 StreamAdapterInterface::StreamAdapterInterface(StreamInterface* stream, | |
157 bool owned) | |
158 : stream_(stream), owned_(owned) { | |
159 if (nullptr != stream_) | |
160 stream_->SignalEvent.connect(this, &StreamAdapterInterface::OnEvent); | |
161 } | |
162 | |
163 StreamState StreamAdapterInterface::GetState() const { | |
164 return stream_->GetState(); | |
165 } | |
166 StreamResult StreamAdapterInterface::Read(void* buffer, | |
167 size_t buffer_len, | |
168 size_t* read, | |
169 int* error) { | |
170 return stream_->Read(buffer, buffer_len, read, error); | |
171 } | |
172 StreamResult StreamAdapterInterface::Write(const void* data, | |
173 size_t data_len, | |
174 size_t* written, | |
175 int* error) { | |
176 return stream_->Write(data, data_len, written, error); | |
177 } | |
178 void StreamAdapterInterface::Close() { | |
179 stream_->Close(); | |
180 } | |
181 | |
182 bool StreamAdapterInterface::SetPosition(size_t position) { | |
183 return stream_->SetPosition(position); | |
184 } | |
185 | |
186 bool StreamAdapterInterface::GetPosition(size_t* position) const { | |
187 return stream_->GetPosition(position); | |
188 } | |
189 | |
190 bool StreamAdapterInterface::GetSize(size_t* size) const { | |
191 return stream_->GetSize(size); | |
192 } | |
193 | |
194 bool StreamAdapterInterface::GetAvailable(size_t* size) const { | |
195 return stream_->GetAvailable(size); | |
196 } | |
197 | |
198 bool StreamAdapterInterface::GetWriteRemaining(size_t* size) const { | |
199 return stream_->GetWriteRemaining(size); | |
200 } | |
201 | |
202 bool StreamAdapterInterface::ReserveSize(size_t size) { | |
203 return stream_->ReserveSize(size); | |
204 } | |
205 | |
206 bool StreamAdapterInterface::Flush() { | |
207 return stream_->Flush(); | |
208 } | |
209 | |
210 void StreamAdapterInterface::Attach(StreamInterface* stream, bool owned) { | |
211 if (nullptr != stream_) | |
212 stream_->SignalEvent.disconnect(this); | |
213 if (owned_) | |
214 delete stream_; | |
215 stream_ = stream; | |
216 owned_ = owned; | |
217 if (nullptr != stream_) | |
218 stream_->SignalEvent.connect(this, &StreamAdapterInterface::OnEvent); | |
219 } | |
220 | |
221 StreamInterface* StreamAdapterInterface::Detach() { | |
222 if (nullptr != stream_) | |
223 stream_->SignalEvent.disconnect(this); | |
224 StreamInterface* stream = stream_; | |
225 stream_ = nullptr; | |
226 return stream; | |
227 } | |
228 | |
229 StreamAdapterInterface::~StreamAdapterInterface() { | |
230 if (owned_) | |
231 delete stream_; | |
232 } | |
233 | |
234 void StreamAdapterInterface::OnEvent(StreamInterface* stream, | |
235 int events, | |
236 int err) { | |
237 SignalEvent(this, events, err); | |
238 } | |
239 | |
240 /////////////////////////////////////////////////////////////////////////////// | |
241 // StreamTap | |
242 /////////////////////////////////////////////////////////////////////////////// | |
243 | |
244 StreamTap::StreamTap(StreamInterface* stream, StreamInterface* tap) | |
245 : StreamAdapterInterface(stream), tap_(), tap_result_(SR_SUCCESS), | |
246 tap_error_(0) { | |
247 AttachTap(tap); | |
248 } | |
249 | |
250 StreamTap::~StreamTap() = default; | |
251 | |
252 void StreamTap::AttachTap(StreamInterface* tap) { | |
253 tap_.reset(tap); | |
254 } | |
255 | |
256 StreamInterface* StreamTap::DetachTap() { | |
257 return tap_.release(); | |
258 } | |
259 | |
260 StreamResult StreamTap::GetTapResult(int* error) { | |
261 if (error) { | |
262 *error = tap_error_; | |
263 } | |
264 return tap_result_; | |
265 } | |
266 | |
267 StreamResult StreamTap::Read(void* buffer, size_t buffer_len, | |
268 size_t* read, int* error) { | |
269 size_t backup_read; | |
270 if (!read) { | |
271 read = &backup_read; | |
272 } | |
273 StreamResult res = StreamAdapterInterface::Read(buffer, buffer_len, | |
274 read, error); | |
275 if ((res == SR_SUCCESS) && (tap_result_ == SR_SUCCESS)) { | |
276 tap_result_ = tap_->WriteAll(buffer, *read, nullptr, &tap_error_); | |
277 } | |
278 return res; | |
279 } | |
280 | |
281 StreamResult StreamTap::Write(const void* data, size_t data_len, | |
282 size_t* written, int* error) { | |
283 size_t backup_written; | |
284 if (!written) { | |
285 written = &backup_written; | |
286 } | |
287 StreamResult res = StreamAdapterInterface::Write(data, data_len, | |
288 written, error); | |
289 if ((res == SR_SUCCESS) && (tap_result_ == SR_SUCCESS)) { | |
290 tap_result_ = tap_->WriteAll(data, *written, nullptr, &tap_error_); | |
291 } | |
292 return res; | |
293 } | |
294 | |
295 /////////////////////////////////////////////////////////////////////////////// | |
296 // NullStream | |
297 /////////////////////////////////////////////////////////////////////////////// | |
298 | |
299 NullStream::NullStream() { | |
300 } | |
301 | |
302 NullStream::~NullStream() { | |
303 } | |
304 | |
305 StreamState NullStream::GetState() const { | |
306 return SS_OPEN; | |
307 } | |
308 | |
309 StreamResult NullStream::Read(void* buffer, size_t buffer_len, | |
310 size_t* read, int* error) { | |
311 if (error) *error = -1; | |
312 return SR_ERROR; | |
313 } | |
314 | |
315 StreamResult NullStream::Write(const void* data, size_t data_len, | |
316 size_t* written, int* error) { | |
317 if (written) *written = data_len; | |
318 return SR_SUCCESS; | |
319 } | |
320 | |
321 void NullStream::Close() { | |
322 } | |
323 | |
324 /////////////////////////////////////////////////////////////////////////////// | |
325 // FileStream | |
326 /////////////////////////////////////////////////////////////////////////////// | |
327 | |
328 FileStream::FileStream() : file_(nullptr) {} | |
329 | |
330 FileStream::~FileStream() { | |
331 FileStream::Close(); | |
332 } | |
333 | |
334 bool FileStream::Open(const std::string& filename, const char* mode, | |
335 int* error) { | |
336 Close(); | |
337 #if defined(WEBRTC_WIN) | |
338 std::wstring wfilename; | |
339 if (Utf8ToWindowsFilename(filename, &wfilename)) { | |
340 file_ = _wfopen(wfilename.c_str(), ToUtf16(mode).c_str()); | |
341 } else { | |
342 if (error) { | |
343 *error = -1; | |
344 return false; | |
345 } | |
346 } | |
347 #else | |
348 file_ = fopen(filename.c_str(), mode); | |
349 #endif | |
350 if (!file_ && error) { | |
351 *error = errno; | |
352 } | |
353 return (file_ != nullptr); | |
354 } | |
355 | |
356 bool FileStream::OpenShare(const std::string& filename, const char* mode, | |
357 int shflag, int* error) { | |
358 Close(); | |
359 #if defined(WEBRTC_WIN) | |
360 std::wstring wfilename; | |
361 if (Utf8ToWindowsFilename(filename, &wfilename)) { | |
362 file_ = _wfsopen(wfilename.c_str(), ToUtf16(mode).c_str(), shflag); | |
363 if (!file_ && error) { | |
364 *error = errno; | |
365 return false; | |
366 } | |
367 return file_ != nullptr; | |
368 } else { | |
369 if (error) { | |
370 *error = -1; | |
371 } | |
372 return false; | |
373 } | |
374 #else | |
375 return Open(filename, mode, error); | |
376 #endif | |
377 } | |
378 | |
379 bool FileStream::DisableBuffering() { | |
380 if (!file_) | |
381 return false; | |
382 return (setvbuf(file_, nullptr, _IONBF, 0) == 0); | |
383 } | |
384 | |
385 StreamState FileStream::GetState() const { | |
386 return (file_ == nullptr) ? SS_CLOSED : SS_OPEN; | |
387 } | |
388 | |
389 StreamResult FileStream::Read(void* buffer, size_t buffer_len, | |
390 size_t* read, int* error) { | |
391 if (!file_) | |
392 return SR_EOS; | |
393 size_t result = fread(buffer, 1, buffer_len, file_); | |
394 if ((result == 0) && (buffer_len > 0)) { | |
395 if (feof(file_)) | |
396 return SR_EOS; | |
397 if (error) | |
398 *error = errno; | |
399 return SR_ERROR; | |
400 } | |
401 if (read) | |
402 *read = result; | |
403 return SR_SUCCESS; | |
404 } | |
405 | |
406 StreamResult FileStream::Write(const void* data, size_t data_len, | |
407 size_t* written, int* error) { | |
408 if (!file_) | |
409 return SR_EOS; | |
410 size_t result = fwrite(data, 1, data_len, file_); | |
411 if ((result == 0) && (data_len > 0)) { | |
412 if (error) | |
413 *error = errno; | |
414 return SR_ERROR; | |
415 } | |
416 if (written) | |
417 *written = result; | |
418 return SR_SUCCESS; | |
419 } | |
420 | |
421 void FileStream::Close() { | |
422 if (file_) { | |
423 DoClose(); | |
424 file_ = nullptr; | |
425 } | |
426 } | |
427 | |
428 bool FileStream::SetPosition(size_t position) { | |
429 if (!file_) | |
430 return false; | |
431 return (fseek(file_, static_cast<int>(position), SEEK_SET) == 0); | |
432 } | |
433 | |
434 bool FileStream::GetPosition(size_t* position) const { | |
435 RTC_DCHECK(nullptr != position); | |
436 if (!file_) | |
437 return false; | |
438 long result = ftell(file_); | |
439 if (result < 0) | |
440 return false; | |
441 if (position) | |
442 *position = result; | |
443 return true; | |
444 } | |
445 | |
446 bool FileStream::GetSize(size_t* size) const { | |
447 RTC_DCHECK(nullptr != size); | |
448 if (!file_) | |
449 return false; | |
450 struct stat file_stats; | |
451 if (fstat(fileno(file_), &file_stats) != 0) | |
452 return false; | |
453 if (size) | |
454 *size = file_stats.st_size; | |
455 return true; | |
456 } | |
457 | |
458 bool FileStream::GetAvailable(size_t* size) const { | |
459 RTC_DCHECK(nullptr != size); | |
460 if (!GetSize(size)) | |
461 return false; | |
462 long result = ftell(file_); | |
463 if (result < 0) | |
464 return false; | |
465 if (size) | |
466 *size -= result; | |
467 return true; | |
468 } | |
469 | |
470 bool FileStream::ReserveSize(size_t size) { | |
471 // TODO: extend the file to the proper length | |
472 return true; | |
473 } | |
474 | |
475 bool FileStream::GetSize(const std::string& filename, size_t* size) { | |
476 struct stat file_stats; | |
477 if (stat(filename.c_str(), &file_stats) != 0) | |
478 return false; | |
479 *size = file_stats.st_size; | |
480 return true; | |
481 } | |
482 | |
483 bool FileStream::Flush() { | |
484 if (file_) { | |
485 return (0 == fflush(file_)); | |
486 } | |
487 // try to flush empty file? | |
488 RTC_NOTREACHED(); | |
489 return false; | |
490 } | |
491 | |
492 #if defined(WEBRTC_POSIX) && !defined(__native_client__) | |
493 | |
494 bool FileStream::TryLock() { | |
495 if (file_ == nullptr) { | |
496 // Stream not open. | |
497 RTC_NOTREACHED(); | |
498 return false; | |
499 } | |
500 | |
501 return flock(fileno(file_), LOCK_EX|LOCK_NB) == 0; | |
502 } | |
503 | |
504 bool FileStream::Unlock() { | |
505 if (file_ == nullptr) { | |
506 // Stream not open. | |
507 RTC_NOTREACHED(); | |
508 return false; | |
509 } | |
510 | |
511 return flock(fileno(file_), LOCK_UN) == 0; | |
512 } | |
513 | |
514 #endif | |
515 | |
516 void FileStream::DoClose() { | |
517 fclose(file_); | |
518 } | |
519 | |
520 /////////////////////////////////////////////////////////////////////////////// | |
521 // MemoryStream | |
522 /////////////////////////////////////////////////////////////////////////////// | |
523 | |
524 MemoryStreamBase::MemoryStreamBase() | |
525 : buffer_(nullptr), buffer_length_(0), data_length_(0), seek_position_(0) {} | |
526 | |
527 StreamState MemoryStreamBase::GetState() const { | |
528 return SS_OPEN; | |
529 } | |
530 | |
531 StreamResult MemoryStreamBase::Read(void* buffer, size_t bytes, | |
532 size_t* bytes_read, int* error) { | |
533 if (seek_position_ >= data_length_) { | |
534 return SR_EOS; | |
535 } | |
536 size_t available = data_length_ - seek_position_; | |
537 if (bytes > available) { | |
538 // Read partial buffer | |
539 bytes = available; | |
540 } | |
541 memcpy(buffer, &buffer_[seek_position_], bytes); | |
542 seek_position_ += bytes; | |
543 if (bytes_read) { | |
544 *bytes_read = bytes; | |
545 } | |
546 return SR_SUCCESS; | |
547 } | |
548 | |
549 StreamResult MemoryStreamBase::Write(const void* buffer, size_t bytes, | |
550 size_t* bytes_written, int* error) { | |
551 size_t available = buffer_length_ - seek_position_; | |
552 if (0 == available) { | |
553 // Increase buffer size to the larger of: | |
554 // a) new position rounded up to next 256 bytes | |
555 // b) double the previous length | |
556 size_t new_buffer_length = | |
557 std::max(((seek_position_ + bytes) | 0xFF) + 1, buffer_length_ * 2); | |
558 StreamResult result = DoReserve(new_buffer_length, error); | |
559 if (SR_SUCCESS != result) { | |
560 return result; | |
561 } | |
562 RTC_DCHECK(buffer_length_ >= new_buffer_length); | |
563 available = buffer_length_ - seek_position_; | |
564 } | |
565 | |
566 if (bytes > available) { | |
567 bytes = available; | |
568 } | |
569 memcpy(&buffer_[seek_position_], buffer, bytes); | |
570 seek_position_ += bytes; | |
571 if (data_length_ < seek_position_) { | |
572 data_length_ = seek_position_; | |
573 } | |
574 if (bytes_written) { | |
575 *bytes_written = bytes; | |
576 } | |
577 return SR_SUCCESS; | |
578 } | |
579 | |
580 void MemoryStreamBase::Close() { | |
581 // nothing to do | |
582 } | |
583 | |
584 bool MemoryStreamBase::SetPosition(size_t position) { | |
585 if (position > data_length_) | |
586 return false; | |
587 seek_position_ = position; | |
588 return true; | |
589 } | |
590 | |
591 bool MemoryStreamBase::GetPosition(size_t* position) const { | |
592 if (position) | |
593 *position = seek_position_; | |
594 return true; | |
595 } | |
596 | |
597 bool MemoryStreamBase::GetSize(size_t* size) const { | |
598 if (size) | |
599 *size = data_length_; | |
600 return true; | |
601 } | |
602 | |
603 bool MemoryStreamBase::GetAvailable(size_t* size) const { | |
604 if (size) | |
605 *size = data_length_ - seek_position_; | |
606 return true; | |
607 } | |
608 | |
609 bool MemoryStreamBase::ReserveSize(size_t size) { | |
610 return (SR_SUCCESS == DoReserve(size, nullptr)); | |
611 } | |
612 | |
613 StreamResult MemoryStreamBase::DoReserve(size_t size, int* error) { | |
614 return (buffer_length_ >= size) ? SR_SUCCESS : SR_EOS; | |
615 } | |
616 | |
617 /////////////////////////////////////////////////////////////////////////////// | |
618 | |
619 MemoryStream::MemoryStream() : buffer_alloc_(nullptr) {} | |
620 | |
621 MemoryStream::MemoryStream(const char* data) : buffer_alloc_(nullptr) { | |
622 SetData(data, strlen(data)); | |
623 } | |
624 | |
625 MemoryStream::MemoryStream(const void* data, size_t length) | |
626 : buffer_alloc_(nullptr) { | |
627 SetData(data, length); | |
628 } | |
629 | |
630 MemoryStream::~MemoryStream() { | |
631 delete [] buffer_alloc_; | |
632 } | |
633 | |
634 void MemoryStream::SetData(const void* data, size_t length) { | |
635 data_length_ = buffer_length_ = length; | |
636 delete [] buffer_alloc_; | |
637 buffer_alloc_ = new char[buffer_length_ + kAlignment]; | |
638 buffer_ = reinterpret_cast<char*>(ALIGNP(buffer_alloc_, kAlignment)); | |
639 memcpy(buffer_, data, data_length_); | |
640 seek_position_ = 0; | |
641 } | |
642 | |
643 StreamResult MemoryStream::DoReserve(size_t size, int* error) { | |
644 if (buffer_length_ >= size) | |
645 return SR_SUCCESS; | |
646 | |
647 if (char* new_buffer_alloc = new char[size + kAlignment]) { | |
648 char* new_buffer = reinterpret_cast<char*>( | |
649 ALIGNP(new_buffer_alloc, kAlignment)); | |
650 memcpy(new_buffer, buffer_, data_length_); | |
651 delete [] buffer_alloc_; | |
652 buffer_alloc_ = new_buffer_alloc; | |
653 buffer_ = new_buffer; | |
654 buffer_length_ = size; | |
655 return SR_SUCCESS; | |
656 } | |
657 | |
658 if (error) { | |
659 *error = ENOMEM; | |
660 } | |
661 return SR_ERROR; | |
662 } | |
663 | |
664 /////////////////////////////////////////////////////////////////////////////// | |
665 | |
666 ExternalMemoryStream::ExternalMemoryStream() { | |
667 } | |
668 | |
669 ExternalMemoryStream::ExternalMemoryStream(void* data, size_t length) { | |
670 SetData(data, length); | |
671 } | |
672 | |
673 ExternalMemoryStream::~ExternalMemoryStream() { | |
674 } | |
675 | |
676 void ExternalMemoryStream::SetData(void* data, size_t length) { | |
677 data_length_ = buffer_length_ = length; | |
678 buffer_ = static_cast<char*>(data); | |
679 seek_position_ = 0; | |
680 } | |
681 | |
682 /////////////////////////////////////////////////////////////////////////////// | |
683 // FifoBuffer | |
684 /////////////////////////////////////////////////////////////////////////////// | |
685 | |
686 FifoBuffer::FifoBuffer(size_t size) | |
687 : state_(SS_OPEN), buffer_(new char[size]), buffer_length_(size), | |
688 data_length_(0), read_position_(0), owner_(Thread::Current()) { | |
689 // all events are done on the owner_ thread | |
690 } | |
691 | |
692 FifoBuffer::FifoBuffer(size_t size, Thread* owner) | |
693 : state_(SS_OPEN), buffer_(new char[size]), buffer_length_(size), | |
694 data_length_(0), read_position_(0), owner_(owner) { | |
695 // all events are done on the owner_ thread | |
696 } | |
697 | |
698 FifoBuffer::~FifoBuffer() { | |
699 } | |
700 | |
701 bool FifoBuffer::GetBuffered(size_t* size) const { | |
702 CritScope cs(&crit_); | |
703 *size = data_length_; | |
704 return true; | |
705 } | |
706 | |
707 bool FifoBuffer::SetCapacity(size_t size) { | |
708 CritScope cs(&crit_); | |
709 if (data_length_ > size) { | |
710 return false; | |
711 } | |
712 | |
713 if (size != buffer_length_) { | |
714 char* buffer = new char[size]; | |
715 const size_t copy = data_length_; | |
716 const size_t tail_copy = std::min(copy, buffer_length_ - read_position_); | |
717 memcpy(buffer, &buffer_[read_position_], tail_copy); | |
718 memcpy(buffer + tail_copy, &buffer_[0], copy - tail_copy); | |
719 buffer_.reset(buffer); | |
720 read_position_ = 0; | |
721 buffer_length_ = size; | |
722 } | |
723 return true; | |
724 } | |
725 | |
726 StreamResult FifoBuffer::ReadOffset(void* buffer, size_t bytes, | |
727 size_t offset, size_t* bytes_read) { | |
728 CritScope cs(&crit_); | |
729 return ReadOffsetLocked(buffer, bytes, offset, bytes_read); | |
730 } | |
731 | |
732 StreamResult FifoBuffer::WriteOffset(const void* buffer, size_t bytes, | |
733 size_t offset, size_t* bytes_written) { | |
734 CritScope cs(&crit_); | |
735 return WriteOffsetLocked(buffer, bytes, offset, bytes_written); | |
736 } | |
737 | |
738 StreamState FifoBuffer::GetState() const { | |
739 CritScope cs(&crit_); | |
740 return state_; | |
741 } | |
742 | |
743 StreamResult FifoBuffer::Read(void* buffer, size_t bytes, | |
744 size_t* bytes_read, int* error) { | |
745 CritScope cs(&crit_); | |
746 const bool was_writable = data_length_ < buffer_length_; | |
747 size_t copy = 0; | |
748 StreamResult result = ReadOffsetLocked(buffer, bytes, 0, ©); | |
749 | |
750 if (result == SR_SUCCESS) { | |
751 // If read was successful then adjust the read position and number of | |
752 // bytes buffered. | |
753 read_position_ = (read_position_ + copy) % buffer_length_; | |
754 data_length_ -= copy; | |
755 if (bytes_read) { | |
756 *bytes_read = copy; | |
757 } | |
758 | |
759 // if we were full before, and now we're not, post an event | |
760 if (!was_writable && copy > 0) { | |
761 PostEvent(owner_, SE_WRITE, 0); | |
762 } | |
763 } | |
764 return result; | |
765 } | |
766 | |
767 StreamResult FifoBuffer::Write(const void* buffer, size_t bytes, | |
768 size_t* bytes_written, int* error) { | |
769 CritScope cs(&crit_); | |
770 | |
771 const bool was_readable = (data_length_ > 0); | |
772 size_t copy = 0; | |
773 StreamResult result = WriteOffsetLocked(buffer, bytes, 0, ©); | |
774 | |
775 if (result == SR_SUCCESS) { | |
776 // If write was successful then adjust the number of readable bytes. | |
777 data_length_ += copy; | |
778 if (bytes_written) { | |
779 *bytes_written = copy; | |
780 } | |
781 | |
782 // if we didn't have any data to read before, and now we do, post an event | |
783 if (!was_readable && copy > 0) { | |
784 PostEvent(owner_, SE_READ, 0); | |
785 } | |
786 } | |
787 return result; | |
788 } | |
789 | |
790 void FifoBuffer::Close() { | |
791 CritScope cs(&crit_); | |
792 state_ = SS_CLOSED; | |
793 } | |
794 | |
795 const void* FifoBuffer::GetReadData(size_t* size) { | |
796 CritScope cs(&crit_); | |
797 *size = (read_position_ + data_length_ <= buffer_length_) ? | |
798 data_length_ : buffer_length_ - read_position_; | |
799 return &buffer_[read_position_]; | |
800 } | |
801 | |
802 void FifoBuffer::ConsumeReadData(size_t size) { | |
803 CritScope cs(&crit_); | |
804 RTC_DCHECK(size <= data_length_); | |
805 const bool was_writable = data_length_ < buffer_length_; | |
806 read_position_ = (read_position_ + size) % buffer_length_; | |
807 data_length_ -= size; | |
808 if (!was_writable && size > 0) { | |
809 PostEvent(owner_, SE_WRITE, 0); | |
810 } | |
811 } | |
812 | |
813 void* FifoBuffer::GetWriteBuffer(size_t* size) { | |
814 CritScope cs(&crit_); | |
815 if (state_ == SS_CLOSED) { | |
816 return nullptr; | |
817 } | |
818 | |
819 // if empty, reset the write position to the beginning, so we can get | |
820 // the biggest possible block | |
821 if (data_length_ == 0) { | |
822 read_position_ = 0; | |
823 } | |
824 | |
825 const size_t write_position = (read_position_ + data_length_) | |
826 % buffer_length_; | |
827 *size = (write_position > read_position_ || data_length_ == 0) ? | |
828 buffer_length_ - write_position : read_position_ - write_position; | |
829 return &buffer_[write_position]; | |
830 } | |
831 | |
832 void FifoBuffer::ConsumeWriteBuffer(size_t size) { | |
833 CritScope cs(&crit_); | |
834 RTC_DCHECK(size <= buffer_length_ - data_length_); | |
835 const bool was_readable = (data_length_ > 0); | |
836 data_length_ += size; | |
837 if (!was_readable && size > 0) { | |
838 PostEvent(owner_, SE_READ, 0); | |
839 } | |
840 } | |
841 | |
842 bool FifoBuffer::GetWriteRemaining(size_t* size) const { | |
843 CritScope cs(&crit_); | |
844 *size = buffer_length_ - data_length_; | |
845 return true; | |
846 } | |
847 | |
848 StreamResult FifoBuffer::ReadOffsetLocked(void* buffer, | |
849 size_t bytes, | |
850 size_t offset, | |
851 size_t* bytes_read) { | |
852 if (offset >= data_length_) { | |
853 return (state_ != SS_CLOSED) ? SR_BLOCK : SR_EOS; | |
854 } | |
855 | |
856 const size_t available = data_length_ - offset; | |
857 const size_t read_position = (read_position_ + offset) % buffer_length_; | |
858 const size_t copy = std::min(bytes, available); | |
859 const size_t tail_copy = std::min(copy, buffer_length_ - read_position); | |
860 char* const p = static_cast<char*>(buffer); | |
861 memcpy(p, &buffer_[read_position], tail_copy); | |
862 memcpy(p + tail_copy, &buffer_[0], copy - tail_copy); | |
863 | |
864 if (bytes_read) { | |
865 *bytes_read = copy; | |
866 } | |
867 return SR_SUCCESS; | |
868 } | |
869 | |
870 StreamResult FifoBuffer::WriteOffsetLocked(const void* buffer, | |
871 size_t bytes, | |
872 size_t offset, | |
873 size_t* bytes_written) { | |
874 if (state_ == SS_CLOSED) { | |
875 return SR_EOS; | |
876 } | |
877 | |
878 if (data_length_ + offset >= buffer_length_) { | |
879 return SR_BLOCK; | |
880 } | |
881 | |
882 const size_t available = buffer_length_ - data_length_ - offset; | |
883 const size_t write_position = (read_position_ + data_length_ + offset) | |
884 % buffer_length_; | |
885 const size_t copy = std::min(bytes, available); | |
886 const size_t tail_copy = std::min(copy, buffer_length_ - write_position); | |
887 const char* const p = static_cast<const char*>(buffer); | |
888 memcpy(&buffer_[write_position], p, tail_copy); | |
889 memcpy(&buffer_[0], p + tail_copy, copy - tail_copy); | |
890 | |
891 if (bytes_written) { | |
892 *bytes_written = copy; | |
893 } | |
894 return SR_SUCCESS; | |
895 } | |
896 | |
897 | |
898 | |
899 /////////////////////////////////////////////////////////////////////////////// | |
900 // LoggingAdapter | |
901 /////////////////////////////////////////////////////////////////////////////// | |
902 | |
903 LoggingAdapter::LoggingAdapter(StreamInterface* stream, LoggingSeverity level, | |
904 const std::string& label, bool hex_mode) | |
905 : StreamAdapterInterface(stream), level_(level), hex_mode_(hex_mode) { | |
906 set_label(label); | |
907 } | |
908 | |
909 void LoggingAdapter::set_label(const std::string& label) { | |
910 label_.assign("["); | |
911 label_.append(label); | |
912 label_.append("]"); | |
913 } | |
914 | |
915 StreamResult LoggingAdapter::Read(void* buffer, size_t buffer_len, | |
916 size_t* read, int* error) { | |
917 size_t local_read; if (!read) read = &local_read; | |
918 StreamResult result = StreamAdapterInterface::Read(buffer, buffer_len, read, | |
919 error); | |
920 if (result == SR_SUCCESS) { | |
921 LogMultiline(level_, label_.c_str(), true, buffer, *read, hex_mode_, &lms_); | |
922 } | |
923 return result; | |
924 } | |
925 | |
926 StreamResult LoggingAdapter::Write(const void* data, size_t data_len, | |
927 size_t* written, int* error) { | |
928 size_t local_written; | |
929 if (!written) written = &local_written; | |
930 StreamResult result = StreamAdapterInterface::Write(data, data_len, written, | |
931 error); | |
932 if (result == SR_SUCCESS) { | |
933 LogMultiline(level_, label_.c_str(), false, data, *written, hex_mode_, | |
934 &lms_); | |
935 } | |
936 return result; | |
937 } | |
938 | |
939 void LoggingAdapter::Close() { | |
940 LogMultiline(level_, label_.c_str(), false, nullptr, 0, hex_mode_, &lms_); | |
941 LogMultiline(level_, label_.c_str(), true, nullptr, 0, hex_mode_, &lms_); | |
942 LOG_V(level_) << label_ << " Closed locally"; | |
943 StreamAdapterInterface::Close(); | |
944 } | |
945 | |
946 void LoggingAdapter::OnEvent(StreamInterface* stream, int events, int err) { | |
947 if (events & SE_OPEN) { | |
948 LOG_V(level_) << label_ << " Open"; | |
949 } else if (events & SE_CLOSE) { | |
950 LogMultiline(level_, label_.c_str(), false, nullptr, 0, hex_mode_, &lms_); | |
951 LogMultiline(level_, label_.c_str(), true, nullptr, 0, hex_mode_, &lms_); | |
952 LOG_V(level_) << label_ << " Closed with error: " << err; | |
953 } | |
954 StreamAdapterInterface::OnEvent(stream, events, err); | |
955 } | |
956 | |
957 /////////////////////////////////////////////////////////////////////////////// | |
958 // StringStream - Reads/Writes to an external std::string | |
959 /////////////////////////////////////////////////////////////////////////////// | |
960 | |
961 StringStream::StringStream(std::string* str) | |
962 : str_(*str), read_pos_(0), read_only_(false) { | |
963 } | |
964 | |
965 StringStream::StringStream(const std::string& str) | |
966 : str_(const_cast<std::string&>(str)), read_pos_(0), read_only_(true) { | |
967 } | |
968 | |
969 StreamState StringStream::GetState() const { | |
970 return SS_OPEN; | |
971 } | |
972 | |
973 StreamResult StringStream::Read(void* buffer, size_t buffer_len, | |
974 size_t* read, int* error) { | |
975 size_t available = std::min(buffer_len, str_.size() - read_pos_); | |
976 if (!available) | |
977 return SR_EOS; | |
978 memcpy(buffer, str_.data() + read_pos_, available); | |
979 read_pos_ += available; | |
980 if (read) | |
981 *read = available; | |
982 return SR_SUCCESS; | |
983 } | |
984 | |
985 StreamResult StringStream::Write(const void* data, size_t data_len, | |
986 size_t* written, int* error) { | |
987 if (read_only_) { | |
988 if (error) { | |
989 *error = -1; | |
990 } | |
991 return SR_ERROR; | |
992 } | |
993 str_.append(static_cast<const char*>(data), | |
994 static_cast<const char*>(data) + data_len); | |
995 if (written) | |
996 *written = data_len; | |
997 return SR_SUCCESS; | |
998 } | |
999 | |
1000 void StringStream::Close() { | |
1001 } | |
1002 | |
1003 bool StringStream::SetPosition(size_t position) { | |
1004 if (position > str_.size()) | |
1005 return false; | |
1006 read_pos_ = position; | |
1007 return true; | |
1008 } | |
1009 | |
1010 bool StringStream::GetPosition(size_t* position) const { | |
1011 if (position) | |
1012 *position = read_pos_; | |
1013 return true; | |
1014 } | |
1015 | |
1016 bool StringStream::GetSize(size_t* size) const { | |
1017 if (size) | |
1018 *size = str_.size(); | |
1019 return true; | |
1020 } | |
1021 | |
1022 bool StringStream::GetAvailable(size_t* size) const { | |
1023 if (size) | |
1024 *size = str_.size() - read_pos_; | |
1025 return true; | |
1026 } | |
1027 | |
1028 bool StringStream::ReserveSize(size_t size) { | |
1029 if (read_only_) | |
1030 return false; | |
1031 str_.reserve(size); | |
1032 return true; | |
1033 } | |
1034 | |
1035 /////////////////////////////////////////////////////////////////////////////// | |
1036 // StreamReference | |
1037 /////////////////////////////////////////////////////////////////////////////// | |
1038 | |
1039 StreamReference::StreamReference(StreamInterface* stream) | |
1040 : StreamAdapterInterface(stream, false) { | |
1041 // owner set to false so the destructor does not free the stream. | |
1042 stream_ref_count_ = new StreamRefCount(stream); | |
1043 } | |
1044 | |
1045 StreamInterface* StreamReference::NewReference() { | |
1046 stream_ref_count_->AddReference(); | |
1047 return new StreamReference(stream_ref_count_, stream()); | |
1048 } | |
1049 | |
1050 StreamReference::~StreamReference() { | |
1051 stream_ref_count_->Release(); | |
1052 } | |
1053 | |
1054 StreamReference::StreamReference(StreamRefCount* stream_ref_count, | |
1055 StreamInterface* stream) | |
1056 : StreamAdapterInterface(stream, false), | |
1057 stream_ref_count_(stream_ref_count) { | |
1058 } | |
1059 | |
1060 /////////////////////////////////////////////////////////////////////////////// | |
1061 | |
1062 StreamResult Flow(StreamInterface* source, | |
1063 char* buffer, | |
1064 size_t buffer_len, | |
1065 StreamInterface* sink, | |
1066 size_t* data_len /* = nullptr */) { | |
1067 RTC_DCHECK(buffer_len > 0); | |
1068 | |
1069 StreamResult result; | |
1070 size_t count, read_pos, write_pos; | |
1071 if (data_len) { | |
1072 read_pos = *data_len; | |
1073 } else { | |
1074 read_pos = 0; | |
1075 } | |
1076 | |
1077 bool end_of_stream = false; | |
1078 do { | |
1079 // Read until buffer is full, end of stream, or error | |
1080 while (!end_of_stream && (read_pos < buffer_len)) { | |
1081 result = source->Read(buffer + read_pos, buffer_len - read_pos, &count, | |
1082 nullptr); | |
1083 if (result == SR_EOS) { | |
1084 end_of_stream = true; | |
1085 } else if (result != SR_SUCCESS) { | |
1086 if (data_len) { | |
1087 *data_len = read_pos; | |
1088 } | |
1089 return result; | |
1090 } else { | |
1091 read_pos += count; | |
1092 } | |
1093 } | |
1094 | |
1095 // Write until buffer is empty, or error (including end of stream) | |
1096 write_pos = 0; | |
1097 while (write_pos < read_pos) { | |
1098 result = sink->Write(buffer + write_pos, read_pos - write_pos, &count, | |
1099 nullptr); | |
1100 if (result != SR_SUCCESS) { | |
1101 if (data_len) { | |
1102 *data_len = read_pos - write_pos; | |
1103 if (write_pos > 0) { | |
1104 memmove(buffer, buffer + write_pos, *data_len); | |
1105 } | |
1106 } | |
1107 return result; | |
1108 } | |
1109 write_pos += count; | |
1110 } | |
1111 | |
1112 read_pos = 0; | |
1113 } while (!end_of_stream); | |
1114 | |
1115 if (data_len) { | |
1116 *data_len = 0; | |
1117 } | |
1118 return SR_SUCCESS; | |
1119 } | |
1120 | |
1121 /////////////////////////////////////////////////////////////////////////////// | |
1122 | |
1123 } // namespace rtc | |
OLD | NEW |