Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(117)

Side by Side Diff: webrtc/call/rtc_event_log_helper_thread.cc

Issue 1687703002: Refactored CL for moving the output to a separate thread. (Closed) Base URL: https://chromium.googlesource.com/external/webrtc.git@master
Patch Set: Refactoring and other comments Created 4 years, 9 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 /*
2 * Copyright (c) 2016 The WebRTC project authors. All Rights Reserved.
3 *
4 * Use of this source code is governed by a BSD-style license
5 * that can be found in the LICENSE file in the root of the source
6 * tree. An additional intellectual property rights grant can be found
7 * in the file PATENTS. All contributing project authors may
8 * be found in the AUTHORS file in the root of the source tree.
9 */
10
11 #include "webrtc/call/rtc_event_log_helper_thread.h"
12
13 #include "webrtc/base/checks.h"
14 #include "webrtc/system_wrappers/include/logging.h"
15
16 #ifdef ENABLE_RTC_EVENT_LOG
17
18 namespace webrtc {
19
20 namespace {
21 const int kEventsInHistory = 10000;
22 } // namespace
23
24 // RtcEventLogImpl member functions.
25 RtcEventLogHelperThread::RtcEventLogHelperThread(
26 SwapQueue<ControlMessage>* message_queue,
27 SwapQueue<rtclog::Event>* config_queue,
28 SwapQueue<rtclog::Event>* rtp_queue,
29 SwapQueue<rtclog::Event>* rtcp_queue,
30 SwapQueue<rtclog::Event>* acm_playout_queue,
31 SwapQueue<rtclog::Event>* bwe_loss_queue,
32 rtc::Event* wake_up,
33 rtc::Event* stopped,
34 const Clock* const clock)
35 : message_queue_(message_queue),
36 config_queue_(config_queue),
37 rtp_queue_(rtp_queue),
38 rtcp_queue_(rtcp_queue),
39 acm_playout_queue_(acm_playout_queue),
40 bwe_loss_queue_(bwe_loss_queue),
41 history_(kEventsInHistory),
42 config_history_(),
43 file_(FileWrapper::Create()),
44 thread_(&ThreadOutputFunction, this, "RtcEventLog thread"),
45 max_size_bytes_(std::numeric_limits<int64_t>::max()),
46 written_bytes_(0),
47 start_time_(0),
48 stop_time_(std::numeric_limits<int64_t>::max()),
49 config_event_(),
50 rtp_event_(),
51 rtcp_event_(),
52 playout_event_(),
53 loss_event_(),
54 valid_config_event_(false),
55 valid_rtp_event_(false),
56 valid_rtcp_event_(false),
57 valid_playout_event_(false),
58 valid_loss_event_(false),
59 output_string_(),
60 wake_up_(wake_up),
61 stopped_(stopped),
62 clock_(clock) {
63 RTC_DCHECK(message_queue_);
64 RTC_DCHECK(config_queue_);
65 RTC_DCHECK(rtp_queue_);
66 RTC_DCHECK(rtcp_queue_);
67 RTC_DCHECK(acm_playout_queue_);
68 RTC_DCHECK(bwe_loss_queue_);
69 RTC_DCHECK(wake_up_);
70 RTC_DCHECK(stopped_);
71 RTC_DCHECK(clock_);
72 thread_.Start();
73 }
74
75 RtcEventLogHelperThread::~RtcEventLogHelperThread() {
76 ControlMessage message;
77 message.message_type = ControlMessage::TERMINATE_THREAD;
78 message.stop_time = clock_->TimeInMicroseconds();
79 while (!message_queue_->Insert(&message)) {
80 // We can't destroy the event log until we have stopped the thread,
81 // so clear the message queue and try again. Note that if we clear
82 // any STOP_FILE events, then the threads calling StopLogging would likely
83 // wait indefinitely. However, there should not be any such calls as we
84 // are executing the destructor.
85 LOG(LS_WARNING) << "Clearing message queue to terminate thread.";
86 message_queue_->Clear();
87 }
88 wake_up_->Set(); // Wake up the output thread.
89 thread_.Stop(); // Wait for the thread to terminate.
90 }
91
92 bool RtcEventLogHelperThread::AppendEventToString(rtclog::Event* event) {
93 rtclog::EventStream event_stream;
94 event_stream.add_stream();
95 event_stream.mutable_stream(0)->Swap(event);
96 // TODO(terelius): We create a new event strem per event, but it will look
ivoc 2016/03/11 13:29:53 strem -> stream
terelius 2016/03/18 14:03:24 Done.
97 // like a single stream when we read it back from file.
98 // Is this guaranteed to work e.g. in future versions of protobuf?
99 bool stop = true;
100 if (written_bytes_ + static_cast<int64_t>(output_string_.size()) +
101 event_stream.ByteSize() <=
102 max_size_bytes_) {
ivoc 2016/03/11 13:29:53 Can this be moved to the previous line? The format
terelius 2016/03/18 14:03:24 Done.
103 event_stream.AppendToString(&output_string_);
104 stop = false;
105 }
106 // Swap the event back so that we don't mix event types in the queues.
107 event_stream.mutable_stream(0)->Swap(event);
108 return stop;
109 }
110
111 void RtcEventLogHelperThread::AppendEventToHistory(const rtclog::Event& event) {
112 history_.push_back(event);
113 }
114
115 // Traverses the SwapQueues in timestamp order and copies all events earlier
116 // than |current_time| either to the history or to a string that will be
117 // written to disc.
118 bool RtcEventLogHelperThread::ProcessInOrder(bool memory,
119 int64_t current_time) {
120 bool stop = false;
121 enum EventType {
122 CONFIG_EVENT,
123 RTP_EVENT,
124 RTCP_EVENT,
125 PLAYOUT_EVENT,
126 LOSS_EVENT
127 };
128
129 const size_t kNumberOfQueues = 5;
130 const size_t kConfigQueueIndex = 0;
131 SwapQueue<rtclog::Event>* queue[] = {config_queue_, rtp_queue_, rtcp_queue_,
132 acm_playout_queue_, bwe_loss_queue_};
133 rtclog::Event* head_event[] = {&config_event_, &rtp_event_, &rtcp_event_,
134 &playout_event_, &loss_event_};
135 bool* valid_event[] = {&valid_config_event_, &valid_rtp_event_,
136 &valid_rtcp_event_, &valid_playout_event_,
137 &valid_loss_event_};
138
139 while (!stop) {
140 // Extract the head of each queue.
141 for (size_t i = 0; i < kNumberOfQueues; i++) {
142 if (!*valid_event[i]) {
143 *valid_event[i] = queue[i]->Remove(head_event[i]);
144 }
145 }
146 // Find the earliest event (in timestamp order).
147 size_t index_of_earliest_event = kNumberOfQueues;
148 int64_t first_timestamp = std::numeric_limits<int64_t>::max();
149 for (size_t i = 0; i < kNumberOfQueues; i++) {
150 if (*valid_event[i] && head_event[i]->timestamp_us() < first_timestamp) {
151 first_timestamp = head_event[i]->timestamp_us();
152 index_of_earliest_event = i;
153 }
154 }
155
156 if (first_timestamp > current_time) {
157 // We have handled all events earlier than current_time.
158 break;
159 }
160
161 if (index_of_earliest_event == kConfigQueueIndex) {
162 // Configuration nees special treatment since we want to store them in
stefan-webrtc 2016/03/10 13:03:15 needs Also maybe mention why we want to store the
terelius 2016/03/10 13:47:41 Done.
163 // the config history.
164 if (!memory) {
165 stop = AppendEventToString(head_event[index_of_earliest_event]);
166 }
167 config_history_.push_back(*head_event[index_of_earliest_event]);
168 *valid_event[index_of_earliest_event] =
169 queue[index_of_earliest_event]->Remove(
170 head_event[index_of_earliest_event]);
171 } else {
172 // Normal events are either stored in the history or serialized to a
173 // string (which is later written to disc).
174 if (memory) {
175 AppendEventToHistory(*head_event[index_of_earliest_event]);
176 } else {
177 stop = AppendEventToString(head_event[index_of_earliest_event]);
178 }
179 if (!stop) {
180 *valid_event[index_of_earliest_event] =
181 queue[index_of_earliest_event]->Remove(
182 head_event[index_of_earliest_event]);
183 }
184 }
185 }
186 // We want to stop logging either if we have struck the file size limit
187 // or if we have logged all events older than |stop_time_|.
188 return stop || (current_time > stop_time_);
189 }
190
191 void RtcEventLogHelperThread::LogToMemory() {
192 RTC_DCHECK(!file_->Open());
193
194 // Process each event in order and append it to the appropriate history_.
195 int64_t current_time = clock_->TimeInMicroseconds();
196 ProcessInOrder(true, current_time);
197 }
198
199 void RtcEventLogHelperThread::StartLogFile() {
200 RTC_DCHECK(file_->Open());
201 bool stop = false;
202 output_string_.clear();
203
204 // Create and serialize the LOG_START event.
205 rtclog::Event start_event;
206 start_event.set_timestamp_us(start_time_);
207 start_event.set_type(rtclog::Event::LOG_START);
208 AppendEventToString(&start_event);
209
210 // Serialize the config information for all old streams.
211 for (rtclog::Event& event : config_history_) {
212 AppendEventToString(&event);
213 }
214
215 // Serialize the events in the event queue.
216 for (int i = 0; !history_.empty() && !stop; i++) {
217 stop = AppendEventToString(&history_.front());
218 if (!stop) {
219 history_.pop_front();
220 }
221 }
222
223 // Write to file.
224 file_->Write(output_string_.data(), output_string_.size());
225 written_bytes_ += output_string_.size();
226
227 if (stop) {
228 StopLogFile();
229 }
230 }
231
232 void RtcEventLogHelperThread::LogToFile() {
233 RTC_DCHECK(file_->Open());
234 output_string_.clear();
235
236 // Process each event in order and append it to the output_string_.
237 int64_t current_time = clock_->TimeInMicroseconds();
238 bool stop = ProcessInOrder(false, current_time);
239
240 // Write string to file.
241 file_->Write(output_string_.data(), output_string_.size());
242 written_bytes_ += output_string_.size();
243
244 if (stop || stop_time_ <= current_time) {
245 StopLogFile();
246 }
247 }
248
249 void RtcEventLogHelperThread::StopLogFile() {
250 RTC_DCHECK(file_->Open());
251 output_string_.clear();
252
253 rtclog::Event end_event;
254 end_event.set_timestamp_us(stop_time_);
255 end_event.set_type(rtclog::Event::LOG_END);
256 AppendEventToString(&end_event);
257
258 if (written_bytes_ + static_cast<int64_t>(output_string_.size()) <=
259 max_size_bytes_) {
260 file_->Write(output_string_.data(), output_string_.size());
261 written_bytes_ += output_string_.size();
262 }
263
264 max_size_bytes_ = std::numeric_limits<int64_t>::max();
265 written_bytes_ = 0;
266 start_time_ = 0;
267 stop_time_ = std::numeric_limits<int64_t>::max();
268 output_string_.clear();
269 file_->CloseFile();
270 RTC_DCHECK(!file_->Open());
271 }
272
273 void RtcEventLogHelperThread::WriteLog() {
274 ControlMessage message;
275
276 while (true) {
277 // Process control messages.
278 while (message_queue_->Remove(&message)) {
279 switch (message.message_type) {
280 case ControlMessage::START_FILE:
281 if (!file_->Open()) {
282 max_size_bytes_ = message.max_size_bytes;
283 start_time_ = message.start_time;
284 stop_time_ = message.stop_time;
285 file_.swap(message.file);
286 StartLogFile();
287 } else {
288 // Already started. Ignore message and close file handle.
289 message.file->CloseFile();
290 }
291 break;
292 case ControlMessage::STOP_FILE:
293 if (file_->Open()) {
294 stop_time_ = message.stop_time;
295 LogToFile(); // Log remaining events from message queues.
296 }
297 // LogToFile might stop on it's own so we need to recheck the state.
298 if (file_->Open()) {
299 StopLogFile();
300 }
301 stopped_->Set();
302 break;
303 case ControlMessage::TERMINATE_THREAD:
304 if (file_->Open()) {
305 StopLogFile();
306 }
307 return;
308 }
309 }
310
311 // Write events to file or memory
312 if (file_->Open()) {
313 LogToFile();
314 } else {
315 LogToMemory();
316 }
317
318 // Accumulate a new batch of events instead of processing them one at a
319 // time.
320 wake_up_->Wait(50);
321 }
322 }
323
324 bool RtcEventLogHelperThread::ThreadOutputFunction(void* obj) {
325 RtcEventLogHelperThread* helper = static_cast<RtcEventLogHelperThread*>(obj);
326 helper->WriteLog();
327 return false;
328 }
329
330 } // namespace webrtc
331
332 #endif // ENABLE_RTC_EVENT_LOG
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698