Index: webrtc/call/rtc_event_log_helper_thread.cc |
diff --git a/webrtc/call/rtc_event_log_helper_thread.cc b/webrtc/call/rtc_event_log_helper_thread.cc |
new file mode 100644 |
index 0000000000000000000000000000000000000000..26d6044282d423cc69f740f89e918663e064145c |
--- /dev/null |
+++ b/webrtc/call/rtc_event_log_helper_thread.cc |
@@ -0,0 +1,337 @@ |
+/* |
+ * Copyright (c) 2016 The WebRTC project authors. All Rights Reserved. |
+ * |
+ * Use of this source code is governed by a BSD-style license |
+ * that can be found in the LICENSE file in the root of the source |
+ * tree. An additional intellectual property rights grant can be found |
+ * in the file PATENTS. All contributing project authors may |
+ * be found in the AUTHORS file in the root of the source tree. |
+ */ |
+ |
+#include "webrtc/call/rtc_event_log_helper_thread.h" |
+ |
+#include "webrtc/base/checks.h" |
+#include "webrtc/system_wrappers/include/logging.h" |
+ |
+#ifdef ENABLE_RTC_EVENT_LOG |
+ |
+namespace webrtc { |
+ |
+namespace { |
+const int kEventsInHistory = 10000; |
the sun
2016/03/11 13:21:57
I thought we wanted e.g. a 10s back buffer. Does 1
terelius
2016/03/18 14:03:25
Imo it is much better to set the history limit bas
the sun
2016/03/24 13:49:43
But that's exactly one of the problems with the cu
terelius
2016/03/29 09:38:32
But there is no very large event type. Even if all
the sun
2016/03/30 09:12:38
So what happens then if a large event type is *add
terelius
2016/03/30 10:36:59
This code will break the memory limit if a new eve
|
+} // namespace |
+ |
+// RtcEventLogImpl member functions. |
+RtcEventLogHelperThread::RtcEventLogHelperThread( |
+ SwapQueue<ControlMessage>* message_queue, |
+ SwapQueue<rtclog::Event>* config_queue, |
+ SwapQueue<rtclog::Event>* rtp_queue, |
+ SwapQueue<rtclog::Event>* rtcp_queue, |
+ SwapQueue<rtclog::Event>* acm_playout_queue, |
+ SwapQueue<rtclog::Event>* bwe_loss_queue, |
+ rtc::Event* wake_up, |
+ rtc::Event* stopped, |
+ const Clock* const clock) |
+ : message_queue_(message_queue), |
+ config_queue_(config_queue), |
+ rtp_queue_(rtp_queue), |
+ rtcp_queue_(rtcp_queue), |
+ acm_playout_queue_(acm_playout_queue), |
+ bwe_loss_queue_(bwe_loss_queue), |
+ history_(kEventsInHistory), |
+ config_history_(), |
+ file_(FileWrapper::Create()), |
the sun
2016/03/11 13:21:57
Why not just let it be null?
terelius
2016/03/18 14:03:25
I could do that, but I don't think there is much d
|
+ thread_(&ThreadOutputFunction, this, "RtcEventLog thread"), |
+ max_size_bytes_(std::numeric_limits<int64_t>::max()), |
+ written_bytes_(0), |
+ start_time_(0), |
+ stop_time_(std::numeric_limits<int64_t>::max()), |
+ config_event_(), |
+ rtp_event_(), |
+ rtcp_event_(), |
+ playout_event_(), |
+ loss_event_(), |
+ valid_config_event_(false), |
+ valid_rtp_event_(false), |
+ valid_rtcp_event_(false), |
+ valid_playout_event_(false), |
+ valid_loss_event_(false), |
+ output_string_(), |
+ wake_up_(wake_up), |
+ stopped_(stopped), |
+ clock_(clock) { |
+ RTC_DCHECK(message_queue_); |
+ RTC_DCHECK(config_queue_); |
+ RTC_DCHECK(rtp_queue_); |
+ RTC_DCHECK(rtcp_queue_); |
+ RTC_DCHECK(acm_playout_queue_); |
+ RTC_DCHECK(bwe_loss_queue_); |
+ RTC_DCHECK(wake_up_); |
+ RTC_DCHECK(stopped_); |
+ RTC_DCHECK(clock_); |
+ thread_.Start(); |
+} |
+ |
+RtcEventLogHelperThread::~RtcEventLogHelperThread() { |
+ ControlMessage message; |
+ message.message_type = ControlMessage::TERMINATE_THREAD; |
+ message.stop_time = clock_->TimeInMicroseconds(); |
+ while (!message_queue_->Insert(&message)) { |
+ // We can't destroy the event log until we have stopped the thread, |
+ // so clear the message queue and try again. Note that if we clear |
+ // any STOP_FILE events, then the threads calling StopLogging would likely |
+ // wait indefinitely. However, there should not be any such calls as we |
+ // are executing the destructor. |
+ LOG(LS_WARNING) << "Clearing message queue to terminate thread."; |
+ message_queue_->Clear(); |
+ } |
+ wake_up_->Set(); // Wake up the output thread. |
+ thread_.Stop(); // Wait for the thread to terminate. |
+} |
+ |
+bool RtcEventLogHelperThread::AppendEventToString(rtclog::Event* event) { |
+ rtclog::EventStream event_stream; |
+ event_stream.add_stream(); |
+ event_stream.mutable_stream(0)->Swap(event); |
+ // TODO(terelius): We create a new event strem per event, but it will look |
+ // like a single stream when we read it back from file. |
+ // Is this guaranteed to work e.g. in future versions of protobuf? |
the sun
2016/03/11 13:21:57
https://developers.google.com/protocol-buffers/doc
terelius
2016/03/18 14:03:25
Acknowledged.
|
+ bool stop = true; |
+ if (written_bytes_ + static_cast<int64_t>(output_string_.size()) + |
+ event_stream.ByteSize() <= |
+ max_size_bytes_) { |
+ event_stream.AppendToString(&output_string_); |
+ stop = false; |
+ } |
+ // Swap the event back so that we don't mix event types in the queues. |
+ event_stream.mutable_stream(0)->Swap(event); |
+ return stop; |
+} |
+ |
+void RtcEventLogHelperThread::AppendEventToHistory(const rtclog::Event& event) { |
+ history_.push_back(event); |
+} |
+ |
+// Traverses the SwapQueues in timestamp order and copies all events earlier |
+// than |current_time| either to the history or to a string that will be |
+// written to disc. |
+bool RtcEventLogHelperThread::ProcessInOrder(bool memory, |
+ int64_t current_time) { |
+ bool stop = false; |
+ enum EventType { |
+ CONFIG_EVENT, |
+ RTP_EVENT, |
+ RTCP_EVENT, |
+ PLAYOUT_EVENT, |
+ LOSS_EVENT |
+ }; |
+ |
+ const size_t kNumberOfQueues = 5; |
+ const size_t kConfigQueueIndex = 0; |
+ SwapQueue<rtclog::Event>* queue[] = {config_queue_, rtp_queue_, rtcp_queue_, |
+ acm_playout_queue_, bwe_loss_queue_}; |
+ rtclog::Event* head_event[] = {&config_event_, &rtp_event_, &rtcp_event_, |
+ &playout_event_, &loss_event_}; |
+ bool* valid_event[] = {&valid_config_event_, &valid_rtp_event_, |
+ &valid_rtcp_event_, &valid_playout_event_, |
+ &valid_loss_event_}; |
+ |
+ while (!stop) { |
+ // Extract the head of each queue. |
+ for (size_t i = 0; i < kNumberOfQueues; i++) { |
+ if (!*valid_event[i]) { |
+ *valid_event[i] = queue[i]->Remove(head_event[i]); |
+ } |
+ } |
+ // Find the earliest event (in timestamp order). |
+ size_t index_of_earliest_event = kNumberOfQueues; |
+ int64_t first_timestamp = std::numeric_limits<int64_t>::max(); |
+ for (size_t i = 0; i < kNumberOfQueues; i++) { |
+ if (*valid_event[i] && head_event[i]->timestamp_us() < first_timestamp) { |
+ first_timestamp = head_event[i]->timestamp_us(); |
+ index_of_earliest_event = i; |
+ } |
+ } |
+ |
+ if (first_timestamp > current_time) { |
+ // We have handled all events earlier than current_time. |
+ break; |
+ } |
+ |
+ if (index_of_earliest_event == kConfigQueueIndex) { |
+ // Stream configurations need special treatment because old configs, |
+ // unlike normal events stored in the history, should never be removed. |
+ // We need the configurations to parse e.g. RTP headers, so we have to |
+ // store them in a special config_history for future logs, regardless of |
+ // whether we are logging right now. |
+ if (!memory) { |
+ stop = AppendEventToString(head_event[index_of_earliest_event]); |
+ } |
+ config_history_.push_back(*head_event[index_of_earliest_event]); |
+ *valid_event[index_of_earliest_event] = |
+ queue[index_of_earliest_event]->Remove( |
+ head_event[index_of_earliest_event]); |
+ } else { |
+ // Normal events are either stored in the history or serialized to a |
+ // string (which is later written to disc). |
+ if (memory) { |
+ AppendEventToHistory(*head_event[index_of_earliest_event]); |
+ } else { |
+ stop = AppendEventToString(head_event[index_of_earliest_event]); |
+ } |
+ if (!stop) { |
+ *valid_event[index_of_earliest_event] = |
+ queue[index_of_earliest_event]->Remove( |
+ head_event[index_of_earliest_event]); |
+ } |
+ } |
+ } |
+ // We want to stop logging either if we have struck the file size limit |
+ // or if we have logged all events older than |stop_time_|. |
+ return stop || (current_time > stop_time_); |
+} |
+ |
+void RtcEventLogHelperThread::LogToMemory() { |
+ RTC_DCHECK(!file_->Open()); |
the sun
2016/03/11 13:21:57
Why not RTC_DCHECK(!file_); ?
terelius
2016/03/18 14:03:25
That would work too, but see discussion above.
|
+ |
+ // Process each event in order and append it to the appropriate history_. |
+ int64_t current_time = clock_->TimeInMicroseconds(); |
+ ProcessInOrder(true, current_time); |
+} |
+ |
+void RtcEventLogHelperThread::StartLogFile() { |
+ RTC_DCHECK(file_->Open()); |
+ bool stop = false; |
+ output_string_.clear(); |
+ |
+ // Create and serialize the LOG_START event. |
+ rtclog::Event start_event; |
+ start_event.set_timestamp_us(start_time_); |
+ start_event.set_type(rtclog::Event::LOG_START); |
+ AppendEventToString(&start_event); |
+ |
+ // Serialize the config information for all old streams. |
+ for (rtclog::Event& event : config_history_) { |
+ AppendEventToString(&event); |
+ } |
+ |
+ // Serialize the events in the event queue. |
+ for (int i = 0; !history_.empty() && !stop; i++) { |
the sun
2016/03/11 13:21:57
What is "i" for?
terelius
2016/03/18 14:03:25
Done, thanks. Was used for debugging.
|
+ stop = AppendEventToString(&history_.front()); |
the sun
2016/03/11 13:21:57
Does this mean output_string will now have grown t
terelius
2016/03/18 14:03:25
Yes it does, but the string version uses significa
|
+ if (!stop) { |
+ history_.pop_front(); |
+ } |
+ } |
+ |
+ // Write to file. |
+ file_->Write(output_string_.data(), output_string_.size()); |
+ written_bytes_ += output_string_.size(); |
+ |
+ if (stop) { |
+ RTC_DCHECK(file_->Open()); |
+ StopLogFile(); |
+ } |
+} |
+ |
+void RtcEventLogHelperThread::LogToFile() { |
+ RTC_DCHECK(file_->Open()); |
+ output_string_.clear(); |
+ |
+ // Process each event in order and append it to the output_string_. |
+ int64_t current_time = clock_->TimeInMicroseconds(); |
+ bool stop = ProcessInOrder(false, current_time); |
+ |
+ // Write string to file. |
+ file_->Write(output_string_.data(), output_string_.size()); |
+ written_bytes_ += output_string_.size(); |
+ |
+ if (stop || stop_time_ <= current_time) { |
+ RTC_DCHECK(file_->Open()); |
+ StopLogFile(); |
+ } |
+} |
+ |
+void RtcEventLogHelperThread::StopLogFile() { |
+ RTC_DCHECK(file_->Open()); |
+ output_string_.clear(); |
+ |
+ rtclog::Event end_event; |
+ end_event.set_timestamp_us(stop_time_); |
+ end_event.set_type(rtclog::Event::LOG_END); |
+ AppendEventToString(&end_event); |
+ |
+ if (written_bytes_ + static_cast<int64_t>(output_string_.size()) <= |
+ max_size_bytes_) { |
+ file_->Write(output_string_.data(), output_string_.size()); |
+ written_bytes_ += output_string_.size(); |
+ } |
+ |
+ max_size_bytes_ = std::numeric_limits<int64_t>::max(); |
+ written_bytes_ = 0; |
+ start_time_ = 0; |
+ stop_time_ = std::numeric_limits<int64_t>::max(); |
+ output_string_.clear(); |
+ file_->CloseFile(); |
+ RTC_DCHECK(!file_->Open()); |
+} |
+ |
+void RtcEventLogHelperThread::WriteLog() { |
+ ControlMessage message; |
+ |
+ while (true) { |
+ // Process control messages. |
+ while (message_queue_->Remove(&message)) { |
+ switch (message.message_type) { |
+ case ControlMessage::START_FILE: |
+ if (!file_->Open()) { |
+ max_size_bytes_ = message.max_size_bytes; |
+ start_time_ = message.start_time; |
+ stop_time_ = message.stop_time; |
+ file_.swap(message.file); |
+ StartLogFile(); |
+ } else { |
+ // Already started. Ignore message and close file handle. |
+ message.file->CloseFile(); |
+ } |
+ break; |
+ case ControlMessage::STOP_FILE: |
+ if (file_->Open()) { |
+ stop_time_ = message.stop_time; |
+ LogToFile(); // Log remaining events from message queues. |
+ } |
+ // LogToFile might stop on it's own so we need to recheck the state. |
+ if (file_->Open()) { |
+ StopLogFile(); |
+ } |
+ stopped_->Set(); |
+ break; |
+ case ControlMessage::TERMINATE_THREAD: |
+ if (file_->Open()) { |
+ StopLogFile(); |
+ } |
+ return; |
+ } |
+ } |
+ |
+ // Write events to file or memory |
+ if (file_->Open()) { |
+ LogToFile(); |
+ } else { |
+ LogToMemory(); |
+ } |
+ |
+ // Accumulate a new batch of events instead of processing them one at a |
+ // time. |
+ wake_up_->Wait(50); |
+ } |
+} |
+ |
+bool RtcEventLogHelperThread::ThreadOutputFunction(void* obj) { |
+ RtcEventLogHelperThread* helper = static_cast<RtcEventLogHelperThread*>(obj); |
+ helper->WriteLog(); |
+ return false; |
+} |
+ |
+} // namespace webrtc |
+ |
+#endif // ENABLE_RTC_EVENT_LOG |