Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(329)

Side by Side Diff: webrtc/base/stream.cc

Issue 2877023002: Move webrtc/{base => rtc_base} (Closed)
Patch Set: update presubmit.py and DEPS include rules Created 3 years, 5 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « webrtc/base/stream.h ('k') | webrtc/base/stream_unittest.cc » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
(Empty)
1 /*
2 * Copyright 2004 The WebRTC Project Authors. All rights reserved.
3 *
4 * Use of this source code is governed by a BSD-style license
5 * that can be found in the LICENSE file in the root of the source
6 * tree. An additional intellectual property rights grant can be found
7 * in the file PATENTS. All contributing project authors may
8 * be found in the AUTHORS file in the root of the source tree.
9 */
10
11 #if defined(WEBRTC_POSIX)
12 #include <sys/file.h>
13 #endif // WEBRTC_POSIX
14 #include <sys/types.h>
15 #include <sys/stat.h>
16 #include <errno.h>
17
18 #include <algorithm>
19 #include <string>
20
21 #include "webrtc/base/basictypes.h"
22 #include "webrtc/base/checks.h"
23 #include "webrtc/base/logging.h"
24 #include "webrtc/base/messagequeue.h"
25 #include "webrtc/base/stream.h"
26 #include "webrtc/base/stringencode.h"
27 #include "webrtc/base/stringutils.h"
28 #include "webrtc/base/thread.h"
29 #include "webrtc/base/timeutils.h"
30
31 #if defined(WEBRTC_WIN)
32 #include "webrtc/base/win32.h"
33 #define fileno _fileno
34 #endif
35
36 namespace rtc {
37
38 ///////////////////////////////////////////////////////////////////////////////
39 // StreamInterface
40 ///////////////////////////////////////////////////////////////////////////////
41 StreamInterface::~StreamInterface() {
42 }
43
44 StreamResult StreamInterface::WriteAll(const void* data, size_t data_len,
45 size_t* written, int* error) {
46 StreamResult result = SR_SUCCESS;
47 size_t total_written = 0, current_written;
48 while (total_written < data_len) {
49 result = Write(static_cast<const char*>(data) + total_written,
50 data_len - total_written, &current_written, error);
51 if (result != SR_SUCCESS)
52 break;
53 total_written += current_written;
54 }
55 if (written)
56 *written = total_written;
57 return result;
58 }
59
60 StreamResult StreamInterface::ReadAll(void* buffer, size_t buffer_len,
61 size_t* read, int* error) {
62 StreamResult result = SR_SUCCESS;
63 size_t total_read = 0, current_read;
64 while (total_read < buffer_len) {
65 result = Read(static_cast<char*>(buffer) + total_read,
66 buffer_len - total_read, &current_read, error);
67 if (result != SR_SUCCESS)
68 break;
69 total_read += current_read;
70 }
71 if (read)
72 *read = total_read;
73 return result;
74 }
75
76 StreamResult StreamInterface::ReadLine(std::string* line) {
77 line->clear();
78 StreamResult result = SR_SUCCESS;
79 while (true) {
80 char ch;
81 result = Read(&ch, sizeof(ch), nullptr, nullptr);
82 if (result != SR_SUCCESS) {
83 break;
84 }
85 if (ch == '\n') {
86 break;
87 }
88 line->push_back(ch);
89 }
90 if (!line->empty()) { // give back the line we've collected so far with
91 result = SR_SUCCESS; // a success code. Otherwise return the last code
92 }
93 return result;
94 }
95
96 void StreamInterface::PostEvent(Thread* t, int events, int err) {
97 t->Post(RTC_FROM_HERE, this, MSG_POST_EVENT,
98 new StreamEventData(events, err));
99 }
100
101 void StreamInterface::PostEvent(int events, int err) {
102 PostEvent(Thread::Current(), events, err);
103 }
104
105 const void* StreamInterface::GetReadData(size_t* data_len) {
106 return nullptr;
107 }
108
109 void* StreamInterface::GetWriteBuffer(size_t* buf_len) {
110 return nullptr;
111 }
112
113 bool StreamInterface::SetPosition(size_t position) {
114 return false;
115 }
116
117 bool StreamInterface::GetPosition(size_t* position) const {
118 return false;
119 }
120
121 bool StreamInterface::GetSize(size_t* size) const {
122 return false;
123 }
124
125 bool StreamInterface::GetAvailable(size_t* size) const {
126 return false;
127 }
128
129 bool StreamInterface::GetWriteRemaining(size_t* size) const {
130 return false;
131 }
132
133 bool StreamInterface::Flush() {
134 return false;
135 }
136
137 bool StreamInterface::ReserveSize(size_t size) {
138 return true;
139 }
140
141 StreamInterface::StreamInterface() {
142 }
143
144 void StreamInterface::OnMessage(Message* msg) {
145 if (MSG_POST_EVENT == msg->message_id) {
146 StreamEventData* pe = static_cast<StreamEventData*>(msg->pdata);
147 SignalEvent(this, pe->events, pe->error);
148 delete msg->pdata;
149 }
150 }
151
152 ///////////////////////////////////////////////////////////////////////////////
153 // StreamAdapterInterface
154 ///////////////////////////////////////////////////////////////////////////////
155
156 StreamAdapterInterface::StreamAdapterInterface(StreamInterface* stream,
157 bool owned)
158 : stream_(stream), owned_(owned) {
159 if (nullptr != stream_)
160 stream_->SignalEvent.connect(this, &StreamAdapterInterface::OnEvent);
161 }
162
163 StreamState StreamAdapterInterface::GetState() const {
164 return stream_->GetState();
165 }
166 StreamResult StreamAdapterInterface::Read(void* buffer,
167 size_t buffer_len,
168 size_t* read,
169 int* error) {
170 return stream_->Read(buffer, buffer_len, read, error);
171 }
172 StreamResult StreamAdapterInterface::Write(const void* data,
173 size_t data_len,
174 size_t* written,
175 int* error) {
176 return stream_->Write(data, data_len, written, error);
177 }
178 void StreamAdapterInterface::Close() {
179 stream_->Close();
180 }
181
182 bool StreamAdapterInterface::SetPosition(size_t position) {
183 return stream_->SetPosition(position);
184 }
185
186 bool StreamAdapterInterface::GetPosition(size_t* position) const {
187 return stream_->GetPosition(position);
188 }
189
190 bool StreamAdapterInterface::GetSize(size_t* size) const {
191 return stream_->GetSize(size);
192 }
193
194 bool StreamAdapterInterface::GetAvailable(size_t* size) const {
195 return stream_->GetAvailable(size);
196 }
197
198 bool StreamAdapterInterface::GetWriteRemaining(size_t* size) const {
199 return stream_->GetWriteRemaining(size);
200 }
201
202 bool StreamAdapterInterface::ReserveSize(size_t size) {
203 return stream_->ReserveSize(size);
204 }
205
206 bool StreamAdapterInterface::Flush() {
207 return stream_->Flush();
208 }
209
210 void StreamAdapterInterface::Attach(StreamInterface* stream, bool owned) {
211 if (nullptr != stream_)
212 stream_->SignalEvent.disconnect(this);
213 if (owned_)
214 delete stream_;
215 stream_ = stream;
216 owned_ = owned;
217 if (nullptr != stream_)
218 stream_->SignalEvent.connect(this, &StreamAdapterInterface::OnEvent);
219 }
220
221 StreamInterface* StreamAdapterInterface::Detach() {
222 if (nullptr != stream_)
223 stream_->SignalEvent.disconnect(this);
224 StreamInterface* stream = stream_;
225 stream_ = nullptr;
226 return stream;
227 }
228
229 StreamAdapterInterface::~StreamAdapterInterface() {
230 if (owned_)
231 delete stream_;
232 }
233
234 void StreamAdapterInterface::OnEvent(StreamInterface* stream,
235 int events,
236 int err) {
237 SignalEvent(this, events, err);
238 }
239
240 ///////////////////////////////////////////////////////////////////////////////
241 // StreamTap
242 ///////////////////////////////////////////////////////////////////////////////
243
244 StreamTap::StreamTap(StreamInterface* stream, StreamInterface* tap)
245 : StreamAdapterInterface(stream), tap_(), tap_result_(SR_SUCCESS),
246 tap_error_(0) {
247 AttachTap(tap);
248 }
249
250 StreamTap::~StreamTap() = default;
251
252 void StreamTap::AttachTap(StreamInterface* tap) {
253 tap_.reset(tap);
254 }
255
256 StreamInterface* StreamTap::DetachTap() {
257 return tap_.release();
258 }
259
260 StreamResult StreamTap::GetTapResult(int* error) {
261 if (error) {
262 *error = tap_error_;
263 }
264 return tap_result_;
265 }
266
267 StreamResult StreamTap::Read(void* buffer, size_t buffer_len,
268 size_t* read, int* error) {
269 size_t backup_read;
270 if (!read) {
271 read = &backup_read;
272 }
273 StreamResult res = StreamAdapterInterface::Read(buffer, buffer_len,
274 read, error);
275 if ((res == SR_SUCCESS) && (tap_result_ == SR_SUCCESS)) {
276 tap_result_ = tap_->WriteAll(buffer, *read, nullptr, &tap_error_);
277 }
278 return res;
279 }
280
281 StreamResult StreamTap::Write(const void* data, size_t data_len,
282 size_t* written, int* error) {
283 size_t backup_written;
284 if (!written) {
285 written = &backup_written;
286 }
287 StreamResult res = StreamAdapterInterface::Write(data, data_len,
288 written, error);
289 if ((res == SR_SUCCESS) && (tap_result_ == SR_SUCCESS)) {
290 tap_result_ = tap_->WriteAll(data, *written, nullptr, &tap_error_);
291 }
292 return res;
293 }
294
295 ///////////////////////////////////////////////////////////////////////////////
296 // NullStream
297 ///////////////////////////////////////////////////////////////////////////////
298
299 NullStream::NullStream() {
300 }
301
302 NullStream::~NullStream() {
303 }
304
305 StreamState NullStream::GetState() const {
306 return SS_OPEN;
307 }
308
309 StreamResult NullStream::Read(void* buffer, size_t buffer_len,
310 size_t* read, int* error) {
311 if (error) *error = -1;
312 return SR_ERROR;
313 }
314
315 StreamResult NullStream::Write(const void* data, size_t data_len,
316 size_t* written, int* error) {
317 if (written) *written = data_len;
318 return SR_SUCCESS;
319 }
320
321 void NullStream::Close() {
322 }
323
324 ///////////////////////////////////////////////////////////////////////////////
325 // FileStream
326 ///////////////////////////////////////////////////////////////////////////////
327
328 FileStream::FileStream() : file_(nullptr) {}
329
330 FileStream::~FileStream() {
331 FileStream::Close();
332 }
333
334 bool FileStream::Open(const std::string& filename, const char* mode,
335 int* error) {
336 Close();
337 #if defined(WEBRTC_WIN)
338 std::wstring wfilename;
339 if (Utf8ToWindowsFilename(filename, &wfilename)) {
340 file_ = _wfopen(wfilename.c_str(), ToUtf16(mode).c_str());
341 } else {
342 if (error) {
343 *error = -1;
344 return false;
345 }
346 }
347 #else
348 file_ = fopen(filename.c_str(), mode);
349 #endif
350 if (!file_ && error) {
351 *error = errno;
352 }
353 return (file_ != nullptr);
354 }
355
356 bool FileStream::OpenShare(const std::string& filename, const char* mode,
357 int shflag, int* error) {
358 Close();
359 #if defined(WEBRTC_WIN)
360 std::wstring wfilename;
361 if (Utf8ToWindowsFilename(filename, &wfilename)) {
362 file_ = _wfsopen(wfilename.c_str(), ToUtf16(mode).c_str(), shflag);
363 if (!file_ && error) {
364 *error = errno;
365 return false;
366 }
367 return file_ != nullptr;
368 } else {
369 if (error) {
370 *error = -1;
371 }
372 return false;
373 }
374 #else
375 return Open(filename, mode, error);
376 #endif
377 }
378
379 bool FileStream::DisableBuffering() {
380 if (!file_)
381 return false;
382 return (setvbuf(file_, nullptr, _IONBF, 0) == 0);
383 }
384
385 StreamState FileStream::GetState() const {
386 return (file_ == nullptr) ? SS_CLOSED : SS_OPEN;
387 }
388
389 StreamResult FileStream::Read(void* buffer, size_t buffer_len,
390 size_t* read, int* error) {
391 if (!file_)
392 return SR_EOS;
393 size_t result = fread(buffer, 1, buffer_len, file_);
394 if ((result == 0) && (buffer_len > 0)) {
395 if (feof(file_))
396 return SR_EOS;
397 if (error)
398 *error = errno;
399 return SR_ERROR;
400 }
401 if (read)
402 *read = result;
403 return SR_SUCCESS;
404 }
405
406 StreamResult FileStream::Write(const void* data, size_t data_len,
407 size_t* written, int* error) {
408 if (!file_)
409 return SR_EOS;
410 size_t result = fwrite(data, 1, data_len, file_);
411 if ((result == 0) && (data_len > 0)) {
412 if (error)
413 *error = errno;
414 return SR_ERROR;
415 }
416 if (written)
417 *written = result;
418 return SR_SUCCESS;
419 }
420
421 void FileStream::Close() {
422 if (file_) {
423 DoClose();
424 file_ = nullptr;
425 }
426 }
427
428 bool FileStream::SetPosition(size_t position) {
429 if (!file_)
430 return false;
431 return (fseek(file_, static_cast<int>(position), SEEK_SET) == 0);
432 }
433
434 bool FileStream::GetPosition(size_t* position) const {
435 RTC_DCHECK(nullptr != position);
436 if (!file_)
437 return false;
438 long result = ftell(file_);
439 if (result < 0)
440 return false;
441 if (position)
442 *position = result;
443 return true;
444 }
445
446 bool FileStream::GetSize(size_t* size) const {
447 RTC_DCHECK(nullptr != size);
448 if (!file_)
449 return false;
450 struct stat file_stats;
451 if (fstat(fileno(file_), &file_stats) != 0)
452 return false;
453 if (size)
454 *size = file_stats.st_size;
455 return true;
456 }
457
458 bool FileStream::GetAvailable(size_t* size) const {
459 RTC_DCHECK(nullptr != size);
460 if (!GetSize(size))
461 return false;
462 long result = ftell(file_);
463 if (result < 0)
464 return false;
465 if (size)
466 *size -= result;
467 return true;
468 }
469
470 bool FileStream::ReserveSize(size_t size) {
471 // TODO: extend the file to the proper length
472 return true;
473 }
474
475 bool FileStream::GetSize(const std::string& filename, size_t* size) {
476 struct stat file_stats;
477 if (stat(filename.c_str(), &file_stats) != 0)
478 return false;
479 *size = file_stats.st_size;
480 return true;
481 }
482
483 bool FileStream::Flush() {
484 if (file_) {
485 return (0 == fflush(file_));
486 }
487 // try to flush empty file?
488 RTC_NOTREACHED();
489 return false;
490 }
491
492 #if defined(WEBRTC_POSIX) && !defined(__native_client__)
493
494 bool FileStream::TryLock() {
495 if (file_ == nullptr) {
496 // Stream not open.
497 RTC_NOTREACHED();
498 return false;
499 }
500
501 return flock(fileno(file_), LOCK_EX|LOCK_NB) == 0;
502 }
503
504 bool FileStream::Unlock() {
505 if (file_ == nullptr) {
506 // Stream not open.
507 RTC_NOTREACHED();
508 return false;
509 }
510
511 return flock(fileno(file_), LOCK_UN) == 0;
512 }
513
514 #endif
515
516 void FileStream::DoClose() {
517 fclose(file_);
518 }
519
520 ///////////////////////////////////////////////////////////////////////////////
521 // MemoryStream
522 ///////////////////////////////////////////////////////////////////////////////
523
524 MemoryStreamBase::MemoryStreamBase()
525 : buffer_(nullptr), buffer_length_(0), data_length_(0), seek_position_(0) {}
526
527 StreamState MemoryStreamBase::GetState() const {
528 return SS_OPEN;
529 }
530
531 StreamResult MemoryStreamBase::Read(void* buffer, size_t bytes,
532 size_t* bytes_read, int* error) {
533 if (seek_position_ >= data_length_) {
534 return SR_EOS;
535 }
536 size_t available = data_length_ - seek_position_;
537 if (bytes > available) {
538 // Read partial buffer
539 bytes = available;
540 }
541 memcpy(buffer, &buffer_[seek_position_], bytes);
542 seek_position_ += bytes;
543 if (bytes_read) {
544 *bytes_read = bytes;
545 }
546 return SR_SUCCESS;
547 }
548
549 StreamResult MemoryStreamBase::Write(const void* buffer, size_t bytes,
550 size_t* bytes_written, int* error) {
551 size_t available = buffer_length_ - seek_position_;
552 if (0 == available) {
553 // Increase buffer size to the larger of:
554 // a) new position rounded up to next 256 bytes
555 // b) double the previous length
556 size_t new_buffer_length =
557 std::max(((seek_position_ + bytes) | 0xFF) + 1, buffer_length_ * 2);
558 StreamResult result = DoReserve(new_buffer_length, error);
559 if (SR_SUCCESS != result) {
560 return result;
561 }
562 RTC_DCHECK(buffer_length_ >= new_buffer_length);
563 available = buffer_length_ - seek_position_;
564 }
565
566 if (bytes > available) {
567 bytes = available;
568 }
569 memcpy(&buffer_[seek_position_], buffer, bytes);
570 seek_position_ += bytes;
571 if (data_length_ < seek_position_) {
572 data_length_ = seek_position_;
573 }
574 if (bytes_written) {
575 *bytes_written = bytes;
576 }
577 return SR_SUCCESS;
578 }
579
580 void MemoryStreamBase::Close() {
581 // nothing to do
582 }
583
584 bool MemoryStreamBase::SetPosition(size_t position) {
585 if (position > data_length_)
586 return false;
587 seek_position_ = position;
588 return true;
589 }
590
591 bool MemoryStreamBase::GetPosition(size_t* position) const {
592 if (position)
593 *position = seek_position_;
594 return true;
595 }
596
597 bool MemoryStreamBase::GetSize(size_t* size) const {
598 if (size)
599 *size = data_length_;
600 return true;
601 }
602
603 bool MemoryStreamBase::GetAvailable(size_t* size) const {
604 if (size)
605 *size = data_length_ - seek_position_;
606 return true;
607 }
608
609 bool MemoryStreamBase::ReserveSize(size_t size) {
610 return (SR_SUCCESS == DoReserve(size, nullptr));
611 }
612
613 StreamResult MemoryStreamBase::DoReserve(size_t size, int* error) {
614 return (buffer_length_ >= size) ? SR_SUCCESS : SR_EOS;
615 }
616
617 ///////////////////////////////////////////////////////////////////////////////
618
619 MemoryStream::MemoryStream() : buffer_alloc_(nullptr) {}
620
621 MemoryStream::MemoryStream(const char* data) : buffer_alloc_(nullptr) {
622 SetData(data, strlen(data));
623 }
624
625 MemoryStream::MemoryStream(const void* data, size_t length)
626 : buffer_alloc_(nullptr) {
627 SetData(data, length);
628 }
629
630 MemoryStream::~MemoryStream() {
631 delete [] buffer_alloc_;
632 }
633
634 void MemoryStream::SetData(const void* data, size_t length) {
635 data_length_ = buffer_length_ = length;
636 delete [] buffer_alloc_;
637 buffer_alloc_ = new char[buffer_length_ + kAlignment];
638 buffer_ = reinterpret_cast<char*>(ALIGNP(buffer_alloc_, kAlignment));
639 memcpy(buffer_, data, data_length_);
640 seek_position_ = 0;
641 }
642
643 StreamResult MemoryStream::DoReserve(size_t size, int* error) {
644 if (buffer_length_ >= size)
645 return SR_SUCCESS;
646
647 if (char* new_buffer_alloc = new char[size + kAlignment]) {
648 char* new_buffer = reinterpret_cast<char*>(
649 ALIGNP(new_buffer_alloc, kAlignment));
650 memcpy(new_buffer, buffer_, data_length_);
651 delete [] buffer_alloc_;
652 buffer_alloc_ = new_buffer_alloc;
653 buffer_ = new_buffer;
654 buffer_length_ = size;
655 return SR_SUCCESS;
656 }
657
658 if (error) {
659 *error = ENOMEM;
660 }
661 return SR_ERROR;
662 }
663
664 ///////////////////////////////////////////////////////////////////////////////
665
666 ExternalMemoryStream::ExternalMemoryStream() {
667 }
668
669 ExternalMemoryStream::ExternalMemoryStream(void* data, size_t length) {
670 SetData(data, length);
671 }
672
673 ExternalMemoryStream::~ExternalMemoryStream() {
674 }
675
676 void ExternalMemoryStream::SetData(void* data, size_t length) {
677 data_length_ = buffer_length_ = length;
678 buffer_ = static_cast<char*>(data);
679 seek_position_ = 0;
680 }
681
682 ///////////////////////////////////////////////////////////////////////////////
683 // FifoBuffer
684 ///////////////////////////////////////////////////////////////////////////////
685
686 FifoBuffer::FifoBuffer(size_t size)
687 : state_(SS_OPEN), buffer_(new char[size]), buffer_length_(size),
688 data_length_(0), read_position_(0), owner_(Thread::Current()) {
689 // all events are done on the owner_ thread
690 }
691
692 FifoBuffer::FifoBuffer(size_t size, Thread* owner)
693 : state_(SS_OPEN), buffer_(new char[size]), buffer_length_(size),
694 data_length_(0), read_position_(0), owner_(owner) {
695 // all events are done on the owner_ thread
696 }
697
698 FifoBuffer::~FifoBuffer() {
699 }
700
701 bool FifoBuffer::GetBuffered(size_t* size) const {
702 CritScope cs(&crit_);
703 *size = data_length_;
704 return true;
705 }
706
707 bool FifoBuffer::SetCapacity(size_t size) {
708 CritScope cs(&crit_);
709 if (data_length_ > size) {
710 return false;
711 }
712
713 if (size != buffer_length_) {
714 char* buffer = new char[size];
715 const size_t copy = data_length_;
716 const size_t tail_copy = std::min(copy, buffer_length_ - read_position_);
717 memcpy(buffer, &buffer_[read_position_], tail_copy);
718 memcpy(buffer + tail_copy, &buffer_[0], copy - tail_copy);
719 buffer_.reset(buffer);
720 read_position_ = 0;
721 buffer_length_ = size;
722 }
723 return true;
724 }
725
726 StreamResult FifoBuffer::ReadOffset(void* buffer, size_t bytes,
727 size_t offset, size_t* bytes_read) {
728 CritScope cs(&crit_);
729 return ReadOffsetLocked(buffer, bytes, offset, bytes_read);
730 }
731
732 StreamResult FifoBuffer::WriteOffset(const void* buffer, size_t bytes,
733 size_t offset, size_t* bytes_written) {
734 CritScope cs(&crit_);
735 return WriteOffsetLocked(buffer, bytes, offset, bytes_written);
736 }
737
738 StreamState FifoBuffer::GetState() const {
739 CritScope cs(&crit_);
740 return state_;
741 }
742
743 StreamResult FifoBuffer::Read(void* buffer, size_t bytes,
744 size_t* bytes_read, int* error) {
745 CritScope cs(&crit_);
746 const bool was_writable = data_length_ < buffer_length_;
747 size_t copy = 0;
748 StreamResult result = ReadOffsetLocked(buffer, bytes, 0, &copy);
749
750 if (result == SR_SUCCESS) {
751 // If read was successful then adjust the read position and number of
752 // bytes buffered.
753 read_position_ = (read_position_ + copy) % buffer_length_;
754 data_length_ -= copy;
755 if (bytes_read) {
756 *bytes_read = copy;
757 }
758
759 // if we were full before, and now we're not, post an event
760 if (!was_writable && copy > 0) {
761 PostEvent(owner_, SE_WRITE, 0);
762 }
763 }
764 return result;
765 }
766
767 StreamResult FifoBuffer::Write(const void* buffer, size_t bytes,
768 size_t* bytes_written, int* error) {
769 CritScope cs(&crit_);
770
771 const bool was_readable = (data_length_ > 0);
772 size_t copy = 0;
773 StreamResult result = WriteOffsetLocked(buffer, bytes, 0, &copy);
774
775 if (result == SR_SUCCESS) {
776 // If write was successful then adjust the number of readable bytes.
777 data_length_ += copy;
778 if (bytes_written) {
779 *bytes_written = copy;
780 }
781
782 // if we didn't have any data to read before, and now we do, post an event
783 if (!was_readable && copy > 0) {
784 PostEvent(owner_, SE_READ, 0);
785 }
786 }
787 return result;
788 }
789
790 void FifoBuffer::Close() {
791 CritScope cs(&crit_);
792 state_ = SS_CLOSED;
793 }
794
795 const void* FifoBuffer::GetReadData(size_t* size) {
796 CritScope cs(&crit_);
797 *size = (read_position_ + data_length_ <= buffer_length_) ?
798 data_length_ : buffer_length_ - read_position_;
799 return &buffer_[read_position_];
800 }
801
802 void FifoBuffer::ConsumeReadData(size_t size) {
803 CritScope cs(&crit_);
804 RTC_DCHECK(size <= data_length_);
805 const bool was_writable = data_length_ < buffer_length_;
806 read_position_ = (read_position_ + size) % buffer_length_;
807 data_length_ -= size;
808 if (!was_writable && size > 0) {
809 PostEvent(owner_, SE_WRITE, 0);
810 }
811 }
812
813 void* FifoBuffer::GetWriteBuffer(size_t* size) {
814 CritScope cs(&crit_);
815 if (state_ == SS_CLOSED) {
816 return nullptr;
817 }
818
819 // if empty, reset the write position to the beginning, so we can get
820 // the biggest possible block
821 if (data_length_ == 0) {
822 read_position_ = 0;
823 }
824
825 const size_t write_position = (read_position_ + data_length_)
826 % buffer_length_;
827 *size = (write_position > read_position_ || data_length_ == 0) ?
828 buffer_length_ - write_position : read_position_ - write_position;
829 return &buffer_[write_position];
830 }
831
832 void FifoBuffer::ConsumeWriteBuffer(size_t size) {
833 CritScope cs(&crit_);
834 RTC_DCHECK(size <= buffer_length_ - data_length_);
835 const bool was_readable = (data_length_ > 0);
836 data_length_ += size;
837 if (!was_readable && size > 0) {
838 PostEvent(owner_, SE_READ, 0);
839 }
840 }
841
842 bool FifoBuffer::GetWriteRemaining(size_t* size) const {
843 CritScope cs(&crit_);
844 *size = buffer_length_ - data_length_;
845 return true;
846 }
847
848 StreamResult FifoBuffer::ReadOffsetLocked(void* buffer,
849 size_t bytes,
850 size_t offset,
851 size_t* bytes_read) {
852 if (offset >= data_length_) {
853 return (state_ != SS_CLOSED) ? SR_BLOCK : SR_EOS;
854 }
855
856 const size_t available = data_length_ - offset;
857 const size_t read_position = (read_position_ + offset) % buffer_length_;
858 const size_t copy = std::min(bytes, available);
859 const size_t tail_copy = std::min(copy, buffer_length_ - read_position);
860 char* const p = static_cast<char*>(buffer);
861 memcpy(p, &buffer_[read_position], tail_copy);
862 memcpy(p + tail_copy, &buffer_[0], copy - tail_copy);
863
864 if (bytes_read) {
865 *bytes_read = copy;
866 }
867 return SR_SUCCESS;
868 }
869
870 StreamResult FifoBuffer::WriteOffsetLocked(const void* buffer,
871 size_t bytes,
872 size_t offset,
873 size_t* bytes_written) {
874 if (state_ == SS_CLOSED) {
875 return SR_EOS;
876 }
877
878 if (data_length_ + offset >= buffer_length_) {
879 return SR_BLOCK;
880 }
881
882 const size_t available = buffer_length_ - data_length_ - offset;
883 const size_t write_position = (read_position_ + data_length_ + offset)
884 % buffer_length_;
885 const size_t copy = std::min(bytes, available);
886 const size_t tail_copy = std::min(copy, buffer_length_ - write_position);
887 const char* const p = static_cast<const char*>(buffer);
888 memcpy(&buffer_[write_position], p, tail_copy);
889 memcpy(&buffer_[0], p + tail_copy, copy - tail_copy);
890
891 if (bytes_written) {
892 *bytes_written = copy;
893 }
894 return SR_SUCCESS;
895 }
896
897
898
899 ///////////////////////////////////////////////////////////////////////////////
900 // LoggingAdapter
901 ///////////////////////////////////////////////////////////////////////////////
902
903 LoggingAdapter::LoggingAdapter(StreamInterface* stream, LoggingSeverity level,
904 const std::string& label, bool hex_mode)
905 : StreamAdapterInterface(stream), level_(level), hex_mode_(hex_mode) {
906 set_label(label);
907 }
908
909 void LoggingAdapter::set_label(const std::string& label) {
910 label_.assign("[");
911 label_.append(label);
912 label_.append("]");
913 }
914
915 StreamResult LoggingAdapter::Read(void* buffer, size_t buffer_len,
916 size_t* read, int* error) {
917 size_t local_read; if (!read) read = &local_read;
918 StreamResult result = StreamAdapterInterface::Read(buffer, buffer_len, read,
919 error);
920 if (result == SR_SUCCESS) {
921 LogMultiline(level_, label_.c_str(), true, buffer, *read, hex_mode_, &lms_);
922 }
923 return result;
924 }
925
926 StreamResult LoggingAdapter::Write(const void* data, size_t data_len,
927 size_t* written, int* error) {
928 size_t local_written;
929 if (!written) written = &local_written;
930 StreamResult result = StreamAdapterInterface::Write(data, data_len, written,
931 error);
932 if (result == SR_SUCCESS) {
933 LogMultiline(level_, label_.c_str(), false, data, *written, hex_mode_,
934 &lms_);
935 }
936 return result;
937 }
938
939 void LoggingAdapter::Close() {
940 LogMultiline(level_, label_.c_str(), false, nullptr, 0, hex_mode_, &lms_);
941 LogMultiline(level_, label_.c_str(), true, nullptr, 0, hex_mode_, &lms_);
942 LOG_V(level_) << label_ << " Closed locally";
943 StreamAdapterInterface::Close();
944 }
945
946 void LoggingAdapter::OnEvent(StreamInterface* stream, int events, int err) {
947 if (events & SE_OPEN) {
948 LOG_V(level_) << label_ << " Open";
949 } else if (events & SE_CLOSE) {
950 LogMultiline(level_, label_.c_str(), false, nullptr, 0, hex_mode_, &lms_);
951 LogMultiline(level_, label_.c_str(), true, nullptr, 0, hex_mode_, &lms_);
952 LOG_V(level_) << label_ << " Closed with error: " << err;
953 }
954 StreamAdapterInterface::OnEvent(stream, events, err);
955 }
956
957 ///////////////////////////////////////////////////////////////////////////////
958 // StringStream - Reads/Writes to an external std::string
959 ///////////////////////////////////////////////////////////////////////////////
960
961 StringStream::StringStream(std::string* str)
962 : str_(*str), read_pos_(0), read_only_(false) {
963 }
964
965 StringStream::StringStream(const std::string& str)
966 : str_(const_cast<std::string&>(str)), read_pos_(0), read_only_(true) {
967 }
968
969 StreamState StringStream::GetState() const {
970 return SS_OPEN;
971 }
972
973 StreamResult StringStream::Read(void* buffer, size_t buffer_len,
974 size_t* read, int* error) {
975 size_t available = std::min(buffer_len, str_.size() - read_pos_);
976 if (!available)
977 return SR_EOS;
978 memcpy(buffer, str_.data() + read_pos_, available);
979 read_pos_ += available;
980 if (read)
981 *read = available;
982 return SR_SUCCESS;
983 }
984
985 StreamResult StringStream::Write(const void* data, size_t data_len,
986 size_t* written, int* error) {
987 if (read_only_) {
988 if (error) {
989 *error = -1;
990 }
991 return SR_ERROR;
992 }
993 str_.append(static_cast<const char*>(data),
994 static_cast<const char*>(data) + data_len);
995 if (written)
996 *written = data_len;
997 return SR_SUCCESS;
998 }
999
1000 void StringStream::Close() {
1001 }
1002
1003 bool StringStream::SetPosition(size_t position) {
1004 if (position > str_.size())
1005 return false;
1006 read_pos_ = position;
1007 return true;
1008 }
1009
1010 bool StringStream::GetPosition(size_t* position) const {
1011 if (position)
1012 *position = read_pos_;
1013 return true;
1014 }
1015
1016 bool StringStream::GetSize(size_t* size) const {
1017 if (size)
1018 *size = str_.size();
1019 return true;
1020 }
1021
1022 bool StringStream::GetAvailable(size_t* size) const {
1023 if (size)
1024 *size = str_.size() - read_pos_;
1025 return true;
1026 }
1027
1028 bool StringStream::ReserveSize(size_t size) {
1029 if (read_only_)
1030 return false;
1031 str_.reserve(size);
1032 return true;
1033 }
1034
1035 ///////////////////////////////////////////////////////////////////////////////
1036 // StreamReference
1037 ///////////////////////////////////////////////////////////////////////////////
1038
1039 StreamReference::StreamReference(StreamInterface* stream)
1040 : StreamAdapterInterface(stream, false) {
1041 // owner set to false so the destructor does not free the stream.
1042 stream_ref_count_ = new StreamRefCount(stream);
1043 }
1044
1045 StreamInterface* StreamReference::NewReference() {
1046 stream_ref_count_->AddReference();
1047 return new StreamReference(stream_ref_count_, stream());
1048 }
1049
1050 StreamReference::~StreamReference() {
1051 stream_ref_count_->Release();
1052 }
1053
1054 StreamReference::StreamReference(StreamRefCount* stream_ref_count,
1055 StreamInterface* stream)
1056 : StreamAdapterInterface(stream, false),
1057 stream_ref_count_(stream_ref_count) {
1058 }
1059
1060 ///////////////////////////////////////////////////////////////////////////////
1061
1062 StreamResult Flow(StreamInterface* source,
1063 char* buffer,
1064 size_t buffer_len,
1065 StreamInterface* sink,
1066 size_t* data_len /* = nullptr */) {
1067 RTC_DCHECK(buffer_len > 0);
1068
1069 StreamResult result;
1070 size_t count, read_pos, write_pos;
1071 if (data_len) {
1072 read_pos = *data_len;
1073 } else {
1074 read_pos = 0;
1075 }
1076
1077 bool end_of_stream = false;
1078 do {
1079 // Read until buffer is full, end of stream, or error
1080 while (!end_of_stream && (read_pos < buffer_len)) {
1081 result = source->Read(buffer + read_pos, buffer_len - read_pos, &count,
1082 nullptr);
1083 if (result == SR_EOS) {
1084 end_of_stream = true;
1085 } else if (result != SR_SUCCESS) {
1086 if (data_len) {
1087 *data_len = read_pos;
1088 }
1089 return result;
1090 } else {
1091 read_pos += count;
1092 }
1093 }
1094
1095 // Write until buffer is empty, or error (including end of stream)
1096 write_pos = 0;
1097 while (write_pos < read_pos) {
1098 result = sink->Write(buffer + write_pos, read_pos - write_pos, &count,
1099 nullptr);
1100 if (result != SR_SUCCESS) {
1101 if (data_len) {
1102 *data_len = read_pos - write_pos;
1103 if (write_pos > 0) {
1104 memmove(buffer, buffer + write_pos, *data_len);
1105 }
1106 }
1107 return result;
1108 }
1109 write_pos += count;
1110 }
1111
1112 read_pos = 0;
1113 } while (!end_of_stream);
1114
1115 if (data_len) {
1116 *data_len = 0;
1117 }
1118 return SR_SUCCESS;
1119 }
1120
1121 ///////////////////////////////////////////////////////////////////////////////
1122
1123 } // namespace rtc
OLDNEW
« no previous file with comments | « webrtc/base/stream.h ('k') | webrtc/base/stream_unittest.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698