Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(234)

Side by Side Diff: webrtc/call/rtc_event_log_helper_thread.cc

Issue 1687703002: Refactored CL for moving the output to a separate thread. (Closed) Base URL: https://chromium.googlesource.com/external/webrtc.git@master
Patch Set: Fix compile errors Created 4 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 /*
2 * Copyright (c) 2015 The WebRTC project authors. All Rights Reserved.
3 *
4 * Use of this source code is governed by a BSD-style license
5 * that can be found in the LICENSE file in the root of the source
6 * tree. An additional intellectual property rights grant can be found
7 * in the file PATENTS. All contributing project authors may
8 * be found in the AUTHORS file in the root of the source tree.
9 */
10
11 #include "webrtc/call/rtc_event_log_helper_thread.h"
12
13 #include "webrtc/base/checks.h"
14 #include "webrtc/system_wrappers/include/logging.h"
15
16 #ifdef ENABLE_RTC_EVENT_LOG
17
18 namespace webrtc {
19
20 namespace {
21 const int kEventsInHistory = 10000;
22 } // namespace
23
24 // RtcEventLogImpl member functions.
25 RtcEventLogHelperThread::RtcEventLogHelperThread(
26 SwapQueue<EventLogMessage>* message_queue,
27 SwapQueue<rtclog::Event>* config_queue,
28 SwapQueue<rtclog::Event>* rtp_queue,
29 SwapQueue<rtclog::Event>* rtcp_queue,
30 SwapQueue<rtclog::Event>* acm_playout_queue,
31 SwapQueue<rtclog::Event>* bwe_loss_queue,
32 rtc::Event* wake_up,
33 rtc::Event* stopped,
34 const Clock* const clock)
35 : message_queue_(message_queue),
36 config_queue_(config_queue),
37 rtp_queue_(rtp_queue),
38 rtcp_queue_(rtcp_queue),
39 acm_playout_queue_(acm_playout_queue),
40 bwe_loss_queue_(bwe_loss_queue),
41 history_(kEventsInHistory),
42 config_history_(),
43 log_to_memory_(true),
the sun 2016/02/25 15:23:19 log_to_memory and file_ represent the same informa
terelius 2016/03/09 19:49:39 Done.
44 file_(FileWrapper::Create()),
45 thread_(&ThreadOutputFunction, this, "RtcEventLog thread"),
46 max_size_bytes_(std::numeric_limits<int64_t>::max()),
47 written_bytes_(0),
48 start_time_(0),
49 stop_time_(std::numeric_limits<int64_t>::max()),
50 config_event_(),
51 rtp_event_(),
52 rtcp_event_(),
53 playout_event_(),
54 loss_event_(),
55 valid_config_event_(false),
56 valid_rtp_event_(false),
57 valid_rtcp_event_(false),
58 valid_playout_event_(false),
59 valid_loss_event_(false),
60 output_string_(),
61 wake_up_(wake_up),
62 stopped_(stopped),
63 clock_(clock) {
the sun 2016/02/25 15:23:19 Add RTC_DCHECK(message_queue), and the other point
terelius 2016/03/09 19:49:39 Done.
64 thread_.Start();
65 }
66
67 RtcEventLogHelperThread::~RtcEventLogHelperThread() {
68 EventLogMessage message;
69 message.message_type = EventLogMessage::TERMINATE_THREAD;
70 message.stop_time = clock_->TimeInMicroseconds();
71 while (!message_queue_->Insert(&message)) {
72 // We can't destroy the event log until we have stopped the thread,
73 // so clear the message queue and try again. Note that if we clear
74 // any STOP_FILE events, then the threads calling StopLogging would likely
75 // wait indefinitely. However, there should not be any such calls as we
76 // are executing the destructor.
77 LOG(LS_WARNING) << "Clearing message queue to terminate thread.";
78 message_queue_->Clear();
79 }
80 wake_up_->Set(); // Wake up the output thread.
81 thread_.Stop(); // Wait for the thread to terminate.
82 }
83
84 bool RtcEventLogHelperThread::AppendEventToString(rtclog::Event* event) {
85 rtclog::EventStream event_stream;
86 event_stream.add_stream();
87 event_stream.mutable_stream(0)->Swap(event);
88 // TODO(terelius): We create a new event strem per event, but it will look
89 // like a single stream when we read it back from file.
90 // Is this guaranteed to work e.g. in future versions of protobuf?
91 bool stop = true;
92 if (written_bytes_ + static_cast<int64_t>(output_string_.size()) +
93 event_stream.ByteSize() <=
94 max_size_bytes_) {
95 event_stream.AppendToString(&output_string_);
96 stop = false;
97 }
98 // Swap the event back so that we don't mix event types in the queues.
99 event_stream.mutable_stream(0)->Swap(event);
100 return stop;
101 }
102
103 void RtcEventLogHelperThread::AppendEventToHistory(const rtclog::Event& event) {
104 history_.push_back(event);
105 }
106
107 // Traverses the SwapQueues in timestamp order and copies all events earlier
108 // than |current_time| either to the history or to a string that will be
109 // written to disc.
110 bool RtcEventLogHelperThread::ProcessInOrder(bool memory,
111 int64_t current_time) {
112 bool stop = false;
113 enum EventType {
114 CONFIG_EVENT,
115 RTP_EVENT,
116 RTCP_EVENT,
117 PLAYOUT_EVENT,
118 LOSS_EVENT
119 };
120
121 // Extract the head of each queue.
the sun 2016/03/03 14:31:42 Could you organize the code something like this:
terelius 2016/03/09 19:49:39 Done something similar.
122 if (!valid_config_event_) {
123 valid_config_event_ = config_queue_->Remove(&config_event_);
124 }
125
126 if (!valid_rtp_event_) {
127 valid_rtp_event_ = rtp_queue_->Remove(&rtp_event_);
128 }
129
130 if (!valid_rtcp_event_) {
131 valid_rtcp_event_ = rtcp_queue_->Remove(&rtcp_event_);
132 }
133
134 if (!valid_playout_event_) {
135 valid_playout_event_ = acm_playout_queue_->Remove(&playout_event_);
136 }
137
138 if (!valid_loss_event_) {
139 valid_loss_event_ = bwe_loss_queue_->Remove(&loss_event_);
140 }
141
142 while ((valid_config_event_ || valid_rtp_event_ || valid_rtcp_event_ ||
the sun 2016/02/25 15:23:19 This handling doesn't look right to me. Have you u
stefan-webrtc 2016/03/01 09:44:16 I'm not sure I follow this. Do you mean that we sh
terelius 2016/03/09 19:49:39 I agree with Stefan; we can't wait for all types o
143 valid_playout_event_ || valid_loss_event_) &&
144 !stop) {
145 // Find the earliest event (in timestamp order).
146 EventType type = CONFIG_EVENT;
147 int64_t first_timestamp =
148 (valid_config_event_ ? config_event_.timestamp_us()
149 : std::numeric_limits<int64_t>::max());
150 if (valid_rtp_event_ && (rtp_event_.timestamp_us() < first_timestamp)) {
151 first_timestamp = rtp_event_.timestamp_us();
152 type = RTP_EVENT;
153 }
154 if (valid_rtcp_event_ && (rtcp_event_.timestamp_us() < first_timestamp)) {
155 first_timestamp = rtcp_event_.timestamp_us();
156 type = RTCP_EVENT;
157 }
158 if (valid_playout_event_ &&
159 playout_event_.timestamp_us() < first_timestamp) {
160 first_timestamp = playout_event_.timestamp_us();
161 type = PLAYOUT_EVENT;
162 }
163 if (valid_loss_event_ && loss_event_.timestamp_us() < first_timestamp) {
164 first_timestamp = loss_event_.timestamp_us();
165 type = LOSS_EVENT;
166 }
167
168 if (first_timestamp > current_time) {
169 // We have handled all events earlier than current_time.
170 break;
171 }
172 // Serialize the event and fetch the next event of that type.
173 switch (type) {
174 case CONFIG_EVENT:
175 if (!memory) {
176 stop = AppendEventToString(&config_event_);
177 }
178 config_history_.push_back(config_event_);
179 if (!stop) {
180 valid_config_event_ = config_queue_->Remove(&config_event_);
181 }
182 break;
183 case RTP_EVENT:
184 if (memory) {
185 AppendEventToHistory(rtp_event_);
186 } else {
187 stop = AppendEventToString(&rtp_event_);
188 }
stefan-webrtc 2016/03/01 09:44:16 Lines 184-188 may be possible to break out to a he
terelius 2016/03/09 19:49:39 Refactored in a different way.
189 if (!stop) {
190 valid_rtp_event_ = rtp_queue_->Remove(&rtp_event_);
191 }
192 break;
193 case RTCP_EVENT:
194 if (memory) {
195 AppendEventToHistory(rtcp_event_);
196 } else {
197 stop = AppendEventToString(&rtcp_event_);
198 }
199 if (!stop) {
200 valid_rtcp_event_ = rtcp_queue_->Remove(&rtcp_event_);
201 }
202 break;
203 case PLAYOUT_EVENT:
204 if (memory) {
205 AppendEventToHistory(playout_event_);
206 } else {
207 stop = AppendEventToString(&playout_event_);
208 }
209 if (!stop) {
210 valid_playout_event_ = acm_playout_queue_->Remove(&playout_event_);
211 }
212 break;
213 case LOSS_EVENT:
214 if (memory) {
215 AppendEventToHistory(loss_event_);
216 } else {
217 stop = AppendEventToString(&loss_event_);
218 }
219 if (!stop) {
220 valid_loss_event_ = bwe_loss_queue_->Remove(&loss_event_);
221 }
222 break;
223 }
224 }
225 // We want to stop logging either if we have struck the file size limit
226 // or if we have logged all events older than |stop_time_|.
227 return stop || (current_time > stop_time_);
228 }
229
230 void RtcEventLogHelperThread::LogToMemory() {
231 RTC_DCHECK(log_to_memory_);
232
233 // Process each event in order and append it to the appropriate history_.
234 int64_t current_time = clock_->TimeInMicroseconds();
235 ProcessInOrder(true, current_time);
236 }
237
238 void RtcEventLogHelperThread::StartLogFile() {
239 RTC_DCHECK(log_to_memory_);
240 bool stop = false;
241 output_string_.clear();
242
243 // Create and serialize the LOG_START event.
244 rtclog::Event start_event;
245 start_event.set_timestamp_us(start_time_);
246 start_event.set_type(rtclog::Event::LOG_START);
247 AppendEventToString(&start_event);
248
249 // Serialize the config information for all old streams.
250 for (rtclog::Event& event : config_history_) {
251 AppendEventToString(&event);
252 }
253
254 // Serialize the events in the event queue.
255 for (int i = 0; !history_.empty() && !stop; i++) {
256 stop = AppendEventToString(&history_.front());
257 if (!stop) {
258 history_.pop_front();
259 }
260 }
261
262 // Write to file.
263 file_->Write(output_string_.data(), output_string_.size());
264 written_bytes_ += output_string_.size();
265
266 log_to_memory_ = false;
267 if (stop) {
268 StopLogFile();
269 }
270 }
271
272 void RtcEventLogHelperThread::LogToFile() {
273 RTC_DCHECK(!log_to_memory_);
274 output_string_.clear();
275
276 // Process each event in order and append it to the output_string_.
277 int64_t current_time = clock_->TimeInMicroseconds();
278 bool stop = ProcessInOrder(false, current_time);
279
280 // Write string to file.
281 file_->Write(output_string_.data(), output_string_.size());
282 written_bytes_ += output_string_.size();
283
284 if (stop || stop_time_ <= current_time) {
285 StopLogFile();
286 }
287 }
288
289 void RtcEventLogHelperThread::StopLogFile() {
290 RTC_DCHECK(!log_to_memory_);
291 output_string_.clear();
292
293 rtclog::Event end_event;
294 end_event.set_timestamp_us(stop_time_);
295 end_event.set_type(rtclog::Event::LOG_END);
296 AppendEventToString(&end_event);
297
298 if (written_bytes_ + static_cast<int64_t>(output_string_.size()) <=
299 max_size_bytes_) {
300 file_->Write(output_string_.data(), output_string_.size());
301 written_bytes_ += output_string_.size();
302 }
303
304 log_to_memory_ = true;
305 max_size_bytes_ = std::numeric_limits<int64_t>::max();
306 written_bytes_ = 0;
307 start_time_ = 0;
308 stop_time_ = std::numeric_limits<int64_t>::max();
309 output_string_.clear();
310 file_->CloseFile();
311 }
312
313 void RtcEventLogHelperThread::WriteLog() {
314 bool valid_message = false;
the sun 2016/02/25 15:23:19 not needed
terelius 2016/03/09 19:49:39 Done.
315 EventLogMessage message;
316
317 while (true) {
318 // Process messages.
319 if (!valid_message) {
320 valid_message = message_queue_->Remove(&message);
321 }
322 while (valid_message) {
the sun 2016/02/25 15:23:19 while (message_queue_->Remove(&message)) {
terelius 2016/03/09 19:49:39 Done.
323 switch (message.message_type) {
324 case EventLogMessage::START_FILE:
325 if (log_to_memory_) {
326 max_size_bytes_ = message.max_size_bytes;
327 start_time_ = message.start_time;
328 stop_time_ = message.stop_time;
329 file_.reset(message.file.release());
the sun 2016/02/25 15:23:19 file_.swap(message.file);
terelius 2016/03/09 19:49:39 Done.
330 StartLogFile();
331 } else {
332 // Already stopped. Close file handle.
333 message.file->CloseFile();
334 }
335 break;
336 case EventLogMessage::STOP_FILE:
337 if (!log_to_memory_) {
338 stop_time_ = message.stop_time;
339 LogToFile(); // Log remaining events from message queues.
340 }
341 // LogToFile might stop on it's own so we need to recheck the state.
342 if (!log_to_memory_) {
343 StopLogFile();
344 }
345 stopped_->Set();
346 break;
347 case EventLogMessage::TERMINATE_THREAD:
348 if (!log_to_memory_) {
349 StopLogFile();
350 }
351 return;
352 }
353 valid_message = message_queue_->Remove(&message);
the sun 2016/02/25 15:23:19 remove
terelius 2016/03/09 19:49:39 Done.
354 }
355
356 if (log_to_memory_) {
357 LogToMemory();
358 } else {
359 LogToFile();
360 }
361
362 // Accumulate a new batch of events instead of processing them one at a
363 // time.
364 wake_up_->Wait(50);
365 }
366 }
367
368 bool RtcEventLogHelperThread::ThreadOutputFunction(void* obj) {
369 RtcEventLogHelperThread* helper = static_cast<RtcEventLogHelperThread*>(obj);
370 helper->WriteLog();
371 return false;
372 }
373
374 } // namespace webrtc
375
376 #endif // ENABLE_RTC_EVENT_LOG
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698