Chromium Code Reviews| Index: webrtc/call/rtc_event_log_helper_thread.cc | 
| diff --git a/webrtc/call/rtc_event_log_helper_thread.cc b/webrtc/call/rtc_event_log_helper_thread.cc | 
| new file mode 100644 | 
| index 0000000000000000000000000000000000000000..e276f38aeb1a2c95830290635693289b45229fd6 | 
| --- /dev/null | 
| +++ b/webrtc/call/rtc_event_log_helper_thread.cc | 
| @@ -0,0 +1,332 @@ | 
| +/* | 
| + * Copyright (c) 2016 The WebRTC project authors. All Rights Reserved. | 
| + * | 
| + * Use of this source code is governed by a BSD-style license | 
| + * that can be found in the LICENSE file in the root of the source | 
| + * tree. An additional intellectual property rights grant can be found | 
| + * in the file PATENTS. All contributing project authors may | 
| + * be found in the AUTHORS file in the root of the source tree. | 
| + */ | 
| + | 
| +#include "webrtc/call/rtc_event_log_helper_thread.h" | 
| + | 
| +#include "webrtc/base/checks.h" | 
| +#include "webrtc/system_wrappers/include/logging.h" | 
| + | 
| +#ifdef ENABLE_RTC_EVENT_LOG | 
| + | 
| +namespace webrtc { | 
| + | 
| +namespace { | 
| +const int kEventsInHistory = 10000; | 
| +} // namespace | 
| + | 
| +// RtcEventLogImpl member functions. | 
| +RtcEventLogHelperThread::RtcEventLogHelperThread( | 
| + SwapQueue<ControlMessage>* message_queue, | 
| + SwapQueue<rtclog::Event>* config_queue, | 
| + SwapQueue<rtclog::Event>* rtp_queue, | 
| + SwapQueue<rtclog::Event>* rtcp_queue, | 
| + SwapQueue<rtclog::Event>* acm_playout_queue, | 
| + SwapQueue<rtclog::Event>* bwe_loss_queue, | 
| + rtc::Event* wake_up, | 
| + rtc::Event* stopped, | 
| + const Clock* const clock) | 
| + : message_queue_(message_queue), | 
| + config_queue_(config_queue), | 
| + rtp_queue_(rtp_queue), | 
| + rtcp_queue_(rtcp_queue), | 
| + acm_playout_queue_(acm_playout_queue), | 
| + bwe_loss_queue_(bwe_loss_queue), | 
| + history_(kEventsInHistory), | 
| + config_history_(), | 
| + file_(FileWrapper::Create()), | 
| + thread_(&ThreadOutputFunction, this, "RtcEventLog thread"), | 
| + max_size_bytes_(std::numeric_limits<int64_t>::max()), | 
| + written_bytes_(0), | 
| + start_time_(0), | 
| + stop_time_(std::numeric_limits<int64_t>::max()), | 
| + config_event_(), | 
| + rtp_event_(), | 
| + rtcp_event_(), | 
| + playout_event_(), | 
| + loss_event_(), | 
| + valid_config_event_(false), | 
| + valid_rtp_event_(false), | 
| + valid_rtcp_event_(false), | 
| + valid_playout_event_(false), | 
| + valid_loss_event_(false), | 
| + output_string_(), | 
| + wake_up_(wake_up), | 
| + stopped_(stopped), | 
| + clock_(clock) { | 
| + RTC_DCHECK(message_queue_); | 
| + RTC_DCHECK(config_queue_); | 
| + RTC_DCHECK(rtp_queue_); | 
| + RTC_DCHECK(rtcp_queue_); | 
| + RTC_DCHECK(acm_playout_queue_); | 
| + RTC_DCHECK(bwe_loss_queue_); | 
| + RTC_DCHECK(wake_up_); | 
| + RTC_DCHECK(stopped_); | 
| + RTC_DCHECK(clock_); | 
| + thread_.Start(); | 
| +} | 
| + | 
| +RtcEventLogHelperThread::~RtcEventLogHelperThread() { | 
| + ControlMessage message; | 
| + message.message_type = ControlMessage::TERMINATE_THREAD; | 
| + message.stop_time = clock_->TimeInMicroseconds(); | 
| + while (!message_queue_->Insert(&message)) { | 
| + // We can't destroy the event log until we have stopped the thread, | 
| + // so clear the message queue and try again. Note that if we clear | 
| + // any STOP_FILE events, then the threads calling StopLogging would likely | 
| + // wait indefinitely. However, there should not be any such calls as we | 
| + // are executing the destructor. | 
| + LOG(LS_WARNING) << "Clearing message queue to terminate thread."; | 
| + message_queue_->Clear(); | 
| + } | 
| + wake_up_->Set(); // Wake up the output thread. | 
| + thread_.Stop(); // Wait for the thread to terminate. | 
| +} | 
| + | 
| +bool RtcEventLogHelperThread::AppendEventToString(rtclog::Event* event) { | 
| + rtclog::EventStream event_stream; | 
| + event_stream.add_stream(); | 
| + event_stream.mutable_stream(0)->Swap(event); | 
| + // TODO(terelius): We create a new event strem per event, but it will look | 
| 
 
ivoc
2016/03/11 13:29:53
strem -> stream
 
terelius
2016/03/18 14:03:24
Done.
 
 | 
| + // like a single stream when we read it back from file. | 
| + // Is this guaranteed to work e.g. in future versions of protobuf? | 
| + bool stop = true; | 
| + if (written_bytes_ + static_cast<int64_t>(output_string_.size()) + | 
| + event_stream.ByteSize() <= | 
| + max_size_bytes_) { | 
| 
 
ivoc
2016/03/11 13:29:53
Can this be moved to the previous line? The format
 
terelius
2016/03/18 14:03:24
Done.
 
 | 
| + event_stream.AppendToString(&output_string_); | 
| + stop = false; | 
| + } | 
| + // Swap the event back so that we don't mix event types in the queues. | 
| + event_stream.mutable_stream(0)->Swap(event); | 
| + return stop; | 
| +} | 
| + | 
| +void RtcEventLogHelperThread::AppendEventToHistory(const rtclog::Event& event) { | 
| + history_.push_back(event); | 
| +} | 
| + | 
| +// Traverses the SwapQueues in timestamp order and copies all events earlier | 
| +// than |current_time| either to the history or to a string that will be | 
| +// written to disc. | 
| +bool RtcEventLogHelperThread::ProcessInOrder(bool memory, | 
| + int64_t current_time) { | 
| + bool stop = false; | 
| + enum EventType { | 
| + CONFIG_EVENT, | 
| + RTP_EVENT, | 
| + RTCP_EVENT, | 
| + PLAYOUT_EVENT, | 
| + LOSS_EVENT | 
| + }; | 
| + | 
| + const size_t kNumberOfQueues = 5; | 
| + const size_t kConfigQueueIndex = 0; | 
| + SwapQueue<rtclog::Event>* queue[] = {config_queue_, rtp_queue_, rtcp_queue_, | 
| + acm_playout_queue_, bwe_loss_queue_}; | 
| + rtclog::Event* head_event[] = {&config_event_, &rtp_event_, &rtcp_event_, | 
| + &playout_event_, &loss_event_}; | 
| + bool* valid_event[] = {&valid_config_event_, &valid_rtp_event_, | 
| + &valid_rtcp_event_, &valid_playout_event_, | 
| + &valid_loss_event_}; | 
| + | 
| + while (!stop) { | 
| + // Extract the head of each queue. | 
| + for (size_t i = 0; i < kNumberOfQueues; i++) { | 
| + if (!*valid_event[i]) { | 
| + *valid_event[i] = queue[i]->Remove(head_event[i]); | 
| + } | 
| + } | 
| + // Find the earliest event (in timestamp order). | 
| + size_t index_of_earliest_event = kNumberOfQueues; | 
| + int64_t first_timestamp = std::numeric_limits<int64_t>::max(); | 
| + for (size_t i = 0; i < kNumberOfQueues; i++) { | 
| + if (*valid_event[i] && head_event[i]->timestamp_us() < first_timestamp) { | 
| + first_timestamp = head_event[i]->timestamp_us(); | 
| + index_of_earliest_event = i; | 
| + } | 
| + } | 
| + | 
| + if (first_timestamp > current_time) { | 
| + // We have handled all events earlier than current_time. | 
| + break; | 
| + } | 
| + | 
| + if (index_of_earliest_event == kConfigQueueIndex) { | 
| + // Configuration nees special treatment since we want to store them in | 
| 
 
stefan-webrtc
2016/03/10 13:03:15
needs
Also maybe mention why we want to store the
 
terelius
2016/03/10 13:47:41
Done.
 
 | 
| + // the config history. | 
| + if (!memory) { | 
| + stop = AppendEventToString(head_event[index_of_earliest_event]); | 
| + } | 
| + config_history_.push_back(*head_event[index_of_earliest_event]); | 
| + *valid_event[index_of_earliest_event] = | 
| + queue[index_of_earliest_event]->Remove( | 
| + head_event[index_of_earliest_event]); | 
| + } else { | 
| + // Normal events are either stored in the history or serialized to a | 
| + // string (which is later written to disc). | 
| + if (memory) { | 
| + AppendEventToHistory(*head_event[index_of_earliest_event]); | 
| + } else { | 
| + stop = AppendEventToString(head_event[index_of_earliest_event]); | 
| + } | 
| + if (!stop) { | 
| + *valid_event[index_of_earliest_event] = | 
| + queue[index_of_earliest_event]->Remove( | 
| + head_event[index_of_earliest_event]); | 
| + } | 
| + } | 
| + } | 
| + // We want to stop logging either if we have struck the file size limit | 
| + // or if we have logged all events older than |stop_time_|. | 
| + return stop || (current_time > stop_time_); | 
| +} | 
| + | 
| +void RtcEventLogHelperThread::LogToMemory() { | 
| + RTC_DCHECK(!file_->Open()); | 
| + | 
| + // Process each event in order and append it to the appropriate history_. | 
| + int64_t current_time = clock_->TimeInMicroseconds(); | 
| + ProcessInOrder(true, current_time); | 
| +} | 
| + | 
| +void RtcEventLogHelperThread::StartLogFile() { | 
| + RTC_DCHECK(file_->Open()); | 
| + bool stop = false; | 
| + output_string_.clear(); | 
| + | 
| + // Create and serialize the LOG_START event. | 
| + rtclog::Event start_event; | 
| + start_event.set_timestamp_us(start_time_); | 
| + start_event.set_type(rtclog::Event::LOG_START); | 
| + AppendEventToString(&start_event); | 
| + | 
| + // Serialize the config information for all old streams. | 
| + for (rtclog::Event& event : config_history_) { | 
| + AppendEventToString(&event); | 
| + } | 
| + | 
| + // Serialize the events in the event queue. | 
| + for (int i = 0; !history_.empty() && !stop; i++) { | 
| + stop = AppendEventToString(&history_.front()); | 
| + if (!stop) { | 
| + history_.pop_front(); | 
| + } | 
| + } | 
| + | 
| + // Write to file. | 
| + file_->Write(output_string_.data(), output_string_.size()); | 
| + written_bytes_ += output_string_.size(); | 
| + | 
| + if (stop) { | 
| + StopLogFile(); | 
| + } | 
| +} | 
| + | 
| +void RtcEventLogHelperThread::LogToFile() { | 
| + RTC_DCHECK(file_->Open()); | 
| + output_string_.clear(); | 
| + | 
| + // Process each event in order and append it to the output_string_. | 
| + int64_t current_time = clock_->TimeInMicroseconds(); | 
| + bool stop = ProcessInOrder(false, current_time); | 
| + | 
| + // Write string to file. | 
| + file_->Write(output_string_.data(), output_string_.size()); | 
| + written_bytes_ += output_string_.size(); | 
| + | 
| + if (stop || stop_time_ <= current_time) { | 
| + StopLogFile(); | 
| + } | 
| +} | 
| + | 
| +void RtcEventLogHelperThread::StopLogFile() { | 
| + RTC_DCHECK(file_->Open()); | 
| + output_string_.clear(); | 
| + | 
| + rtclog::Event end_event; | 
| + end_event.set_timestamp_us(stop_time_); | 
| + end_event.set_type(rtclog::Event::LOG_END); | 
| + AppendEventToString(&end_event); | 
| + | 
| + if (written_bytes_ + static_cast<int64_t>(output_string_.size()) <= | 
| + max_size_bytes_) { | 
| + file_->Write(output_string_.data(), output_string_.size()); | 
| + written_bytes_ += output_string_.size(); | 
| + } | 
| + | 
| + max_size_bytes_ = std::numeric_limits<int64_t>::max(); | 
| + written_bytes_ = 0; | 
| + start_time_ = 0; | 
| + stop_time_ = std::numeric_limits<int64_t>::max(); | 
| + output_string_.clear(); | 
| + file_->CloseFile(); | 
| + RTC_DCHECK(!file_->Open()); | 
| +} | 
| + | 
| +void RtcEventLogHelperThread::WriteLog() { | 
| + ControlMessage message; | 
| + | 
| + while (true) { | 
| + // Process control messages. | 
| + while (message_queue_->Remove(&message)) { | 
| + switch (message.message_type) { | 
| + case ControlMessage::START_FILE: | 
| + if (!file_->Open()) { | 
| + max_size_bytes_ = message.max_size_bytes; | 
| + start_time_ = message.start_time; | 
| + stop_time_ = message.stop_time; | 
| + file_.swap(message.file); | 
| + StartLogFile(); | 
| + } else { | 
| + // Already started. Ignore message and close file handle. | 
| + message.file->CloseFile(); | 
| + } | 
| + break; | 
| + case ControlMessage::STOP_FILE: | 
| + if (file_->Open()) { | 
| + stop_time_ = message.stop_time; | 
| + LogToFile(); // Log remaining events from message queues. | 
| + } | 
| + // LogToFile might stop on it's own so we need to recheck the state. | 
| + if (file_->Open()) { | 
| + StopLogFile(); | 
| + } | 
| + stopped_->Set(); | 
| + break; | 
| + case ControlMessage::TERMINATE_THREAD: | 
| + if (file_->Open()) { | 
| + StopLogFile(); | 
| + } | 
| + return; | 
| + } | 
| + } | 
| + | 
| + // Write events to file or memory | 
| + if (file_->Open()) { | 
| + LogToFile(); | 
| + } else { | 
| + LogToMemory(); | 
| + } | 
| + | 
| + // Accumulate a new batch of events instead of processing them one at a | 
| + // time. | 
| + wake_up_->Wait(50); | 
| + } | 
| +} | 
| + | 
| +bool RtcEventLogHelperThread::ThreadOutputFunction(void* obj) { | 
| + RtcEventLogHelperThread* helper = static_cast<RtcEventLogHelperThread*>(obj); | 
| + helper->WriteLog(); | 
| + return false; | 
| +} | 
| + | 
| +} // namespace webrtc | 
| + | 
| +#endif // ENABLE_RTC_EVENT_LOG |