Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(307)

Unified Diff: webrtc/call/rtc_event_log_helper_thread.cc

Issue 1687703002: Refactored CL for moving the output to a separate thread. (Closed) Base URL: https://chromium.googlesource.com/external/webrtc.git@master
Patch Set: Fix compile errors Created 4 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: webrtc/call/rtc_event_log_helper_thread.cc
diff --git a/webrtc/call/rtc_event_log_helper_thread.cc b/webrtc/call/rtc_event_log_helper_thread.cc
new file mode 100644
index 0000000000000000000000000000000000000000..f5fd7a989eab41aabfca6a3d853bedbed42688dd
--- /dev/null
+++ b/webrtc/call/rtc_event_log_helper_thread.cc
@@ -0,0 +1,376 @@
+/*
+ * Copyright (c) 2015 The WebRTC project authors. All Rights Reserved.
+ *
+ * Use of this source code is governed by a BSD-style license
+ * that can be found in the LICENSE file in the root of the source
+ * tree. An additional intellectual property rights grant can be found
+ * in the file PATENTS. All contributing project authors may
+ * be found in the AUTHORS file in the root of the source tree.
+ */
+
+#include "webrtc/call/rtc_event_log_helper_thread.h"
+
+#include "webrtc/base/checks.h"
+#include "webrtc/system_wrappers/include/logging.h"
+
+#ifdef ENABLE_RTC_EVENT_LOG
+
+namespace webrtc {
+
+namespace {
+const int kEventsInHistory = 10000;
+} // namespace
+
+// RtcEventLogImpl member functions.
+RtcEventLogHelperThread::RtcEventLogHelperThread(
+ SwapQueue<EventLogMessage>* message_queue,
+ SwapQueue<rtclog::Event>* config_queue,
+ SwapQueue<rtclog::Event>* rtp_queue,
+ SwapQueue<rtclog::Event>* rtcp_queue,
+ SwapQueue<rtclog::Event>* acm_playout_queue,
+ SwapQueue<rtclog::Event>* bwe_loss_queue,
+ rtc::Event* wake_up,
+ rtc::Event* stopped,
+ const Clock* const clock)
+ : message_queue_(message_queue),
+ config_queue_(config_queue),
+ rtp_queue_(rtp_queue),
+ rtcp_queue_(rtcp_queue),
+ acm_playout_queue_(acm_playout_queue),
+ bwe_loss_queue_(bwe_loss_queue),
+ history_(kEventsInHistory),
+ config_history_(),
+ log_to_memory_(true),
the sun 2016/02/25 15:23:19 log_to_memory and file_ represent the same informa
terelius 2016/03/09 19:49:39 Done.
+ file_(FileWrapper::Create()),
+ thread_(&ThreadOutputFunction, this, "RtcEventLog thread"),
+ max_size_bytes_(std::numeric_limits<int64_t>::max()),
+ written_bytes_(0),
+ start_time_(0),
+ stop_time_(std::numeric_limits<int64_t>::max()),
+ config_event_(),
+ rtp_event_(),
+ rtcp_event_(),
+ playout_event_(),
+ loss_event_(),
+ valid_config_event_(false),
+ valid_rtp_event_(false),
+ valid_rtcp_event_(false),
+ valid_playout_event_(false),
+ valid_loss_event_(false),
+ output_string_(),
+ wake_up_(wake_up),
+ stopped_(stopped),
+ clock_(clock) {
the sun 2016/02/25 15:23:19 Add RTC_DCHECK(message_queue), and the other point
terelius 2016/03/09 19:49:39 Done.
+ thread_.Start();
+}
+
+RtcEventLogHelperThread::~RtcEventLogHelperThread() {
+ EventLogMessage message;
+ message.message_type = EventLogMessage::TERMINATE_THREAD;
+ message.stop_time = clock_->TimeInMicroseconds();
+ while (!message_queue_->Insert(&message)) {
+ // We can't destroy the event log until we have stopped the thread,
+ // so clear the message queue and try again. Note that if we clear
+ // any STOP_FILE events, then the threads calling StopLogging would likely
+ // wait indefinitely. However, there should not be any such calls as we
+ // are executing the destructor.
+ LOG(LS_WARNING) << "Clearing message queue to terminate thread.";
+ message_queue_->Clear();
+ }
+ wake_up_->Set(); // Wake up the output thread.
+ thread_.Stop(); // Wait for the thread to terminate.
+}
+
+bool RtcEventLogHelperThread::AppendEventToString(rtclog::Event* event) {
+ rtclog::EventStream event_stream;
+ event_stream.add_stream();
+ event_stream.mutable_stream(0)->Swap(event);
+ // TODO(terelius): We create a new event strem per event, but it will look
+ // like a single stream when we read it back from file.
+ // Is this guaranteed to work e.g. in future versions of protobuf?
+ bool stop = true;
+ if (written_bytes_ + static_cast<int64_t>(output_string_.size()) +
+ event_stream.ByteSize() <=
+ max_size_bytes_) {
+ event_stream.AppendToString(&output_string_);
+ stop = false;
+ }
+ // Swap the event back so that we don't mix event types in the queues.
+ event_stream.mutable_stream(0)->Swap(event);
+ return stop;
+}
+
+void RtcEventLogHelperThread::AppendEventToHistory(const rtclog::Event& event) {
+ history_.push_back(event);
+}
+
+// Traverses the SwapQueues in timestamp order and copies all events earlier
+// than |current_time| either to the history or to a string that will be
+// written to disc.
+bool RtcEventLogHelperThread::ProcessInOrder(bool memory,
+ int64_t current_time) {
+ bool stop = false;
+ enum EventType {
+ CONFIG_EVENT,
+ RTP_EVENT,
+ RTCP_EVENT,
+ PLAYOUT_EVENT,
+ LOSS_EVENT
+ };
+
+ // Extract the head of each queue.
the sun 2016/03/03 14:31:42 Could you organize the code something like this:
terelius 2016/03/09 19:49:39 Done something similar.
+ if (!valid_config_event_) {
+ valid_config_event_ = config_queue_->Remove(&config_event_);
+ }
+
+ if (!valid_rtp_event_) {
+ valid_rtp_event_ = rtp_queue_->Remove(&rtp_event_);
+ }
+
+ if (!valid_rtcp_event_) {
+ valid_rtcp_event_ = rtcp_queue_->Remove(&rtcp_event_);
+ }
+
+ if (!valid_playout_event_) {
+ valid_playout_event_ = acm_playout_queue_->Remove(&playout_event_);
+ }
+
+ if (!valid_loss_event_) {
+ valid_loss_event_ = bwe_loss_queue_->Remove(&loss_event_);
+ }
+
+ while ((valid_config_event_ || valid_rtp_event_ || valid_rtcp_event_ ||
the sun 2016/02/25 15:23:19 This handling doesn't look right to me. Have you u
stefan-webrtc 2016/03/01 09:44:16 I'm not sure I follow this. Do you mean that we sh
terelius 2016/03/09 19:49:39 I agree with Stefan; we can't wait for all types o
+ valid_playout_event_ || valid_loss_event_) &&
+ !stop) {
+ // Find the earliest event (in timestamp order).
+ EventType type = CONFIG_EVENT;
+ int64_t first_timestamp =
+ (valid_config_event_ ? config_event_.timestamp_us()
+ : std::numeric_limits<int64_t>::max());
+ if (valid_rtp_event_ && (rtp_event_.timestamp_us() < first_timestamp)) {
+ first_timestamp = rtp_event_.timestamp_us();
+ type = RTP_EVENT;
+ }
+ if (valid_rtcp_event_ && (rtcp_event_.timestamp_us() < first_timestamp)) {
+ first_timestamp = rtcp_event_.timestamp_us();
+ type = RTCP_EVENT;
+ }
+ if (valid_playout_event_ &&
+ playout_event_.timestamp_us() < first_timestamp) {
+ first_timestamp = playout_event_.timestamp_us();
+ type = PLAYOUT_EVENT;
+ }
+ if (valid_loss_event_ && loss_event_.timestamp_us() < first_timestamp) {
+ first_timestamp = loss_event_.timestamp_us();
+ type = LOSS_EVENT;
+ }
+
+ if (first_timestamp > current_time) {
+ // We have handled all events earlier than current_time.
+ break;
+ }
+ // Serialize the event and fetch the next event of that type.
+ switch (type) {
+ case CONFIG_EVENT:
+ if (!memory) {
+ stop = AppendEventToString(&config_event_);
+ }
+ config_history_.push_back(config_event_);
+ if (!stop) {
+ valid_config_event_ = config_queue_->Remove(&config_event_);
+ }
+ break;
+ case RTP_EVENT:
+ if (memory) {
+ AppendEventToHistory(rtp_event_);
+ } else {
+ stop = AppendEventToString(&rtp_event_);
+ }
stefan-webrtc 2016/03/01 09:44:16 Lines 184-188 may be possible to break out to a he
terelius 2016/03/09 19:49:39 Refactored in a different way.
+ if (!stop) {
+ valid_rtp_event_ = rtp_queue_->Remove(&rtp_event_);
+ }
+ break;
+ case RTCP_EVENT:
+ if (memory) {
+ AppendEventToHistory(rtcp_event_);
+ } else {
+ stop = AppendEventToString(&rtcp_event_);
+ }
+ if (!stop) {
+ valid_rtcp_event_ = rtcp_queue_->Remove(&rtcp_event_);
+ }
+ break;
+ case PLAYOUT_EVENT:
+ if (memory) {
+ AppendEventToHistory(playout_event_);
+ } else {
+ stop = AppendEventToString(&playout_event_);
+ }
+ if (!stop) {
+ valid_playout_event_ = acm_playout_queue_->Remove(&playout_event_);
+ }
+ break;
+ case LOSS_EVENT:
+ if (memory) {
+ AppendEventToHistory(loss_event_);
+ } else {
+ stop = AppendEventToString(&loss_event_);
+ }
+ if (!stop) {
+ valid_loss_event_ = bwe_loss_queue_->Remove(&loss_event_);
+ }
+ break;
+ }
+ }
+ // We want to stop logging either if we have struck the file size limit
+ // or if we have logged all events older than |stop_time_|.
+ return stop || (current_time > stop_time_);
+}
+
+void RtcEventLogHelperThread::LogToMemory() {
+ RTC_DCHECK(log_to_memory_);
+
+ // Process each event in order and append it to the appropriate history_.
+ int64_t current_time = clock_->TimeInMicroseconds();
+ ProcessInOrder(true, current_time);
+}
+
+void RtcEventLogHelperThread::StartLogFile() {
+ RTC_DCHECK(log_to_memory_);
+ bool stop = false;
+ output_string_.clear();
+
+ // Create and serialize the LOG_START event.
+ rtclog::Event start_event;
+ start_event.set_timestamp_us(start_time_);
+ start_event.set_type(rtclog::Event::LOG_START);
+ AppendEventToString(&start_event);
+
+ // Serialize the config information for all old streams.
+ for (rtclog::Event& event : config_history_) {
+ AppendEventToString(&event);
+ }
+
+ // Serialize the events in the event queue.
+ for (int i = 0; !history_.empty() && !stop; i++) {
+ stop = AppendEventToString(&history_.front());
+ if (!stop) {
+ history_.pop_front();
+ }
+ }
+
+ // Write to file.
+ file_->Write(output_string_.data(), output_string_.size());
+ written_bytes_ += output_string_.size();
+
+ log_to_memory_ = false;
+ if (stop) {
+ StopLogFile();
+ }
+}
+
+void RtcEventLogHelperThread::LogToFile() {
+ RTC_DCHECK(!log_to_memory_);
+ output_string_.clear();
+
+ // Process each event in order and append it to the output_string_.
+ int64_t current_time = clock_->TimeInMicroseconds();
+ bool stop = ProcessInOrder(false, current_time);
+
+ // Write string to file.
+ file_->Write(output_string_.data(), output_string_.size());
+ written_bytes_ += output_string_.size();
+
+ if (stop || stop_time_ <= current_time) {
+ StopLogFile();
+ }
+}
+
+void RtcEventLogHelperThread::StopLogFile() {
+ RTC_DCHECK(!log_to_memory_);
+ output_string_.clear();
+
+ rtclog::Event end_event;
+ end_event.set_timestamp_us(stop_time_);
+ end_event.set_type(rtclog::Event::LOG_END);
+ AppendEventToString(&end_event);
+
+ if (written_bytes_ + static_cast<int64_t>(output_string_.size()) <=
+ max_size_bytes_) {
+ file_->Write(output_string_.data(), output_string_.size());
+ written_bytes_ += output_string_.size();
+ }
+
+ log_to_memory_ = true;
+ max_size_bytes_ = std::numeric_limits<int64_t>::max();
+ written_bytes_ = 0;
+ start_time_ = 0;
+ stop_time_ = std::numeric_limits<int64_t>::max();
+ output_string_.clear();
+ file_->CloseFile();
+}
+
+void RtcEventLogHelperThread::WriteLog() {
+ bool valid_message = false;
the sun 2016/02/25 15:23:19 not needed
terelius 2016/03/09 19:49:39 Done.
+ EventLogMessage message;
+
+ while (true) {
+ // Process messages.
+ if (!valid_message) {
+ valid_message = message_queue_->Remove(&message);
+ }
+ while (valid_message) {
the sun 2016/02/25 15:23:19 while (message_queue_->Remove(&message)) {
terelius 2016/03/09 19:49:39 Done.
+ switch (message.message_type) {
+ case EventLogMessage::START_FILE:
+ if (log_to_memory_) {
+ max_size_bytes_ = message.max_size_bytes;
+ start_time_ = message.start_time;
+ stop_time_ = message.stop_time;
+ file_.reset(message.file.release());
the sun 2016/02/25 15:23:19 file_.swap(message.file);
terelius 2016/03/09 19:49:39 Done.
+ StartLogFile();
+ } else {
+ // Already stopped. Close file handle.
+ message.file->CloseFile();
+ }
+ break;
+ case EventLogMessage::STOP_FILE:
+ if (!log_to_memory_) {
+ stop_time_ = message.stop_time;
+ LogToFile(); // Log remaining events from message queues.
+ }
+ // LogToFile might stop on it's own so we need to recheck the state.
+ if (!log_to_memory_) {
+ StopLogFile();
+ }
+ stopped_->Set();
+ break;
+ case EventLogMessage::TERMINATE_THREAD:
+ if (!log_to_memory_) {
+ StopLogFile();
+ }
+ return;
+ }
+ valid_message = message_queue_->Remove(&message);
the sun 2016/02/25 15:23:19 remove
terelius 2016/03/09 19:49:39 Done.
+ }
+
+ if (log_to_memory_) {
+ LogToMemory();
+ } else {
+ LogToFile();
+ }
+
+ // Accumulate a new batch of events instead of processing them one at a
+ // time.
+ wake_up_->Wait(50);
+ }
+}
+
+bool RtcEventLogHelperThread::ThreadOutputFunction(void* obj) {
+ RtcEventLogHelperThread* helper = static_cast<RtcEventLogHelperThread*>(obj);
+ helper->WriteLog();
+ return false;
+}
+
+} // namespace webrtc
+
+#endif // ENABLE_RTC_EVENT_LOG

Powered by Google App Engine
This is Rietveld 408576698