Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(103)

Side by Side Diff: webrtc/call/rtc_event_log_helper_thread.cc

Issue 1687703002: Refactored CL for moving the output to a separate thread. (Closed) Base URL: https://chromium.googlesource.com/external/webrtc.git@master
Patch Set: Change path for swap_queue.h, anticipating move Created 4 years, 9 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 /*
2 * Copyright (c) 2016 The WebRTC project authors. All Rights Reserved.
3 *
4 * Use of this source code is governed by a BSD-style license
5 * that can be found in the LICENSE file in the root of the source
6 * tree. An additional intellectual property rights grant can be found
7 * in the file PATENTS. All contributing project authors may
8 * be found in the AUTHORS file in the root of the source tree.
9 */
10
11 #include "webrtc/call/rtc_event_log_helper_thread.h"
12
13 #include "webrtc/base/checks.h"
14 #include "webrtc/system_wrappers/include/logging.h"
15
16 #ifdef ENABLE_RTC_EVENT_LOG
17
18 namespace webrtc {
19
20 namespace {
21 const int kEventsInHistory = 10000;
22 } // namespace
23
24 // RtcEventLogImpl member functions.
25 RtcEventLogHelperThread::RtcEventLogHelperThread(
26 SwapQueue<ControlMessage>* message_queue,
27 SwapQueue<rtclog::Event>* config_queue,
28 SwapQueue<rtclog::Event>* rtp_queue,
29 SwapQueue<rtclog::Event>* rtcp_queue,
30 SwapQueue<rtclog::Event>* acm_playout_queue,
31 SwapQueue<rtclog::Event>* bwe_loss_queue,
32 rtc::Event* wake_up,
33 rtc::Event* stopped,
34 const Clock* const clock)
35 : message_queue_(message_queue),
36 config_queue_(config_queue),
37 rtp_queue_(rtp_queue),
38 rtcp_queue_(rtcp_queue),
39 acm_playout_queue_(acm_playout_queue),
40 bwe_loss_queue_(bwe_loss_queue),
41 history_(kEventsInHistory),
42 config_history_(),
43 file_(FileWrapper::Create()),
44 thread_(&ThreadOutputFunction, this, "RtcEventLog thread"),
45 max_size_bytes_(std::numeric_limits<int64_t>::max()),
46 written_bytes_(0),
47 start_time_(0),
48 stop_time_(std::numeric_limits<int64_t>::max()),
49 config_event_(),
50 rtp_event_(),
51 rtcp_event_(),
52 playout_event_(),
53 loss_event_(),
54 valid_config_event_(false),
55 valid_rtp_event_(false),
56 valid_rtcp_event_(false),
57 valid_playout_event_(false),
58 valid_loss_event_(false),
59 output_string_(),
60 wake_up_(wake_up),
61 stopped_(stopped),
62 clock_(clock) {
63 RTC_DCHECK(message_queue_);
64 RTC_DCHECK(config_queue_);
65 RTC_DCHECK(rtp_queue_);
66 RTC_DCHECK(rtcp_queue_);
67 RTC_DCHECK(acm_playout_queue_);
68 RTC_DCHECK(bwe_loss_queue_);
69 RTC_DCHECK(wake_up_);
70 RTC_DCHECK(stopped_);
71 RTC_DCHECK(clock_);
72 thread_.Start();
73 }
74
75 RtcEventLogHelperThread::~RtcEventLogHelperThread() {
76 ControlMessage message;
77 message.message_type = ControlMessage::TERMINATE_THREAD;
78 message.stop_time = clock_->TimeInMicroseconds();
79 while (!message_queue_->Insert(&message)) {
80 // We can't destroy the event log until we have stopped the thread,
81 // so clear the message queue and try again. Note that if we clear
82 // any STOP_FILE events, then the threads calling StopLogging would likely
83 // wait indefinitely. However, there should not be any such calls as we
84 // are executing the destructor.
85 LOG(LS_WARNING) << "Clearing message queue to terminate thread.";
86 message_queue_->Clear();
87 }
88 wake_up_->Set(); // Wake up the output thread.
89 thread_.Stop(); // Wait for the thread to terminate.
90 }
91
92 bool RtcEventLogHelperThread::AppendEventToString(rtclog::Event* event) {
93 rtclog::EventStream event_stream;
94 event_stream.add_stream();
95 event_stream.mutable_stream(0)->Swap(event);
96 // We create a new event stream per event but because of the way protobufs
97 // are encoded, events can be merged by concatenating them. Therefore,
98 // it will look like a single stream when we read it back from file.
99 bool stop = true;
100 if (written_bytes_ + static_cast<int64_t>(output_string_.size()) +
101 event_stream.ByteSize() <=
102 max_size_bytes_) {
stefan-webrtc 2016/04/12 15:48:50 Shouldn't this be moved to the line above?
terelius 2016/04/16 23:29:24 It is automatically formatted. I agree it isn't ve
103 event_stream.AppendToString(&output_string_);
104 stop = false;
105 }
106 // Swap the event back so that we don't mix event types in the queues.
107 event_stream.mutable_stream(0)->Swap(event);
108 return stop;
109 }
110
111 void RtcEventLogHelperThread::AppendEventToHistory(const rtclog::Event& event) {
112 history_.push_back(event);
113 }
114
115 // Traverses the SwapQueues in timestamp order and copies all events earlier
116 // than |current_time| either to the history or to a string that will be
117 // written to disc.
118 bool RtcEventLogHelperThread::ProcessInOrder(bool memory,
119 int64_t current_time) {
120 bool stop = false;
121 enum EventType {
122 CONFIG_EVENT,
123 RTP_EVENT,
124 RTCP_EVENT,
125 PLAYOUT_EVENT,
126 LOSS_EVENT
127 };
128
129 const size_t kNumberOfQueues = 5;
stefan-webrtc 2016/04/12 15:48:50 You should be able to write sizeof(queue)/sizeof(q
terelius 2016/04/16 23:29:24 I removed this completely.
130 const size_t kConfigQueueIndex = 0;
131 SwapQueue<rtclog::Event>* queue[] = {config_queue_, rtp_queue_, rtcp_queue_,
132 acm_playout_queue_, bwe_loss_queue_};
133 rtclog::Event* head_event[] = {&config_event_, &rtp_event_, &rtcp_event_,
134 &playout_event_, &loss_event_};
135 bool* valid_event[] = {&valid_config_event_, &valid_rtp_event_,
136 &valid_rtcp_event_, &valid_playout_event_,
137 &valid_loss_event_};
138
139 while (!stop) {
140 // Extract the head of each queue.
141 for (size_t i = 0; i < kNumberOfQueues; i++) {
142 if (!*valid_event[i]) {
143 *valid_event[i] = queue[i]->Remove(head_event[i]);
144 }
145 }
146 // Find the earliest event (in timestamp order).
147 size_t index_of_earliest_event = kNumberOfQueues;
148 int64_t first_timestamp = std::numeric_limits<int64_t>::max();
149 for (size_t i = 0; i < kNumberOfQueues; i++) {
150 if (*valid_event[i] && head_event[i]->timestamp_us() < first_timestamp) {
151 first_timestamp = head_event[i]->timestamp_us();
152 index_of_earliest_event = i;
153 }
154 }
155
156 if (first_timestamp > current_time) {
157 // We have handled all events earlier than current_time.
158 break;
159 }
160
161 if (index_of_earliest_event == kConfigQueueIndex) {
162 // Stream configurations need special treatment because old configs,
163 // unlike normal events stored in the history, should never be removed.
164 // We need the configurations to parse e.g. RTP headers, so we have to
165 // store them in a special config_history for future logs, regardless of
166 // whether we are logging right now.
167 if (!memory) {
168 stop = AppendEventToString(head_event[index_of_earliest_event]);
169 }
170 config_history_.push_back(*head_event[index_of_earliest_event]);
171 *valid_event[index_of_earliest_event] =
172 queue[index_of_earliest_event]->Remove(
173 head_event[index_of_earliest_event]);
174 } else {
175 // Normal events are either stored in the history or serialized to a
176 // string (which is later written to disc).
177 if (memory) {
178 AppendEventToHistory(*head_event[index_of_earliest_event]);
179 } else {
180 stop = AppendEventToString(head_event[index_of_earliest_event]);
181 }
182 if (!stop) {
183 *valid_event[index_of_earliest_event] =
184 queue[index_of_earliest_event]->Remove(
185 head_event[index_of_earliest_event]);
186 }
187 }
188 }
189 // We want to stop logging either if we have struck the file size limit
190 // or if we have logged all events older than |stop_time_|.
191 return stop || (current_time > stop_time_);
192 }
193
194 void RtcEventLogHelperThread::LogToMemory() {
195 RTC_DCHECK(!file_->Open());
196
197 // Process each event in order and append it to the appropriate history_.
198 int64_t current_time = clock_->TimeInMicroseconds();
199 ProcessInOrder(true, current_time);
200 }
201
202 void RtcEventLogHelperThread::StartLogFile() {
203 RTC_DCHECK(file_->Open());
204 bool stop = false;
205 output_string_.clear();
206
207 // Create and serialize the LOG_START event.
208 rtclog::Event start_event;
209 start_event.set_timestamp_us(start_time_);
210 start_event.set_type(rtclog::Event::LOG_START);
211 AppendEventToString(&start_event);
212
213 // Serialize the config information for all old streams.
214 for (rtclog::Event& event : config_history_) {
215 AppendEventToString(&event);
216 }
217
218 // Serialize the events in the event queue.
219 while (!history_.empty() && !stop) {
220 stop = AppendEventToString(&history_.front());
221 if (!stop) {
222 history_.pop_front();
223 }
224 }
225
226 // Write to file.
227 file_->Write(output_string_.data(), output_string_.size());
228 written_bytes_ += output_string_.size();
229
230 // Free the allocated memory since we probably won't need this amount of
231 // space again.
232 output_string_.clear();
233 output_string_.shrink_to_fit();
234
235 if (stop) {
236 RTC_DCHECK(file_->Open());
237 StopLogFile();
238 }
239 }
240
241 void RtcEventLogHelperThread::LogToFile() {
242 RTC_DCHECK(file_->Open());
243 output_string_.clear();
244
245 // Process each event in order and append it to the output_string_.
246 int64_t current_time = clock_->TimeInMicroseconds();
247 bool stop = ProcessInOrder(false, current_time);
248
249 // Write string to file.
250 file_->Write(output_string_.data(), output_string_.size());
251 written_bytes_ += output_string_.size();
252
253 if (stop || stop_time_ <= current_time) {
254 RTC_DCHECK(file_->Open());
255 StopLogFile();
256 }
257 }
258
259 void RtcEventLogHelperThread::StopLogFile() {
260 RTC_DCHECK(file_->Open());
261 output_string_.clear();
262
263 rtclog::Event end_event;
264 end_event.set_timestamp_us(stop_time_);
265 end_event.set_type(rtclog::Event::LOG_END);
266 AppendEventToString(&end_event);
267
268 if (written_bytes_ + static_cast<int64_t>(output_string_.size()) <=
269 max_size_bytes_) {
270 file_->Write(output_string_.data(), output_string_.size());
271 written_bytes_ += output_string_.size();
272 }
273
274 max_size_bytes_ = std::numeric_limits<int64_t>::max();
275 written_bytes_ = 0;
276 start_time_ = 0;
277 stop_time_ = std::numeric_limits<int64_t>::max();
278 output_string_.clear();
279 file_->CloseFile();
the sun 2016/03/24 13:49:43 file_.reset(nullptr);
280 RTC_DCHECK(!file_->Open());
281 }
282
283 void RtcEventLogHelperThread::WriteLog() {
284 ControlMessage message;
285
286 while (true) {
287 // Process control messages.
288 while (message_queue_->Remove(&message)) {
289 switch (message.message_type) {
290 case ControlMessage::START_FILE:
291 if (!file_->Open()) {
the sun 2016/03/24 13:49:43 Checking file_ != nullptr should be enough. file_-
292 max_size_bytes_ = message.max_size_bytes;
293 start_time_ = message.start_time;
294 stop_time_ = message.stop_time;
295 file_.swap(message.file);
296 StartLogFile();
297 } else {
298 // Already started. Ignore message and close file handle.
299 message.file->CloseFile();
the sun 2016/03/24 13:49:43 I understand closing the old file handle, but why
300 }
301 break;
302 case ControlMessage::STOP_FILE:
303 if (file_->Open()) {
304 stop_time_ = message.stop_time;
305 LogToFile(); // Log remaining events from message queues.
306 }
307 // LogToFile might stop on it's own so we need to recheck the state.
308 if (file_->Open()) {
309 StopLogFile();
310 }
311 stopped_->Set();
312 break;
313 case ControlMessage::TERMINATE_THREAD:
314 if (file_->Open()) {
315 StopLogFile();
316 }
317 return;
318 }
319 }
320
321 // Write events to file or memory
322 if (file_->Open()) {
323 LogToFile();
324 } else {
325 LogToMemory();
326 }
327
328 // Accumulate a new batch of events instead of processing them one at a
329 // time.
330 wake_up_->Wait(50);
331 }
332 }
333
334 bool RtcEventLogHelperThread::ThreadOutputFunction(void* obj) {
335 RtcEventLogHelperThread* helper = static_cast<RtcEventLogHelperThread*>(obj);
336 helper->WriteLog();
337 return false;
338 }
339
340 } // namespace webrtc
341
342 #endif // ENABLE_RTC_EVENT_LOG
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698