Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(187)

Side by Side Diff: webrtc/call/rtc_event_log_helper_thread.cc

Issue 1687703002: Refactored CL for moving the output to a separate thread. (Closed) Base URL: https://chromium.googlesource.com/external/webrtc.git@master
Patch Set: Minor update Created 4 years, 9 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 /*
2 * Copyright (c) 2016 The WebRTC project authors. All Rights Reserved.
3 *
4 * Use of this source code is governed by a BSD-style license
5 * that can be found in the LICENSE file in the root of the source
6 * tree. An additional intellectual property rights grant can be found
7 * in the file PATENTS. All contributing project authors may
8 * be found in the AUTHORS file in the root of the source tree.
9 */
10
11 #include "webrtc/call/rtc_event_log_helper_thread.h"
12
13 #include "webrtc/base/checks.h"
14 #include "webrtc/system_wrappers/include/logging.h"
15
16 #ifdef ENABLE_RTC_EVENT_LOG
17
18 namespace webrtc {
19
20 namespace {
21 const int kEventsInHistory = 10000;
the sun 2016/03/11 13:21:57 I thought we wanted e.g. a 10s back buffer. Does 1
terelius 2016/03/18 14:03:25 Imo it is much better to set the history limit bas
the sun 2016/03/24 13:49:43 But that's exactly one of the problems with the cu
terelius 2016/03/29 09:38:32 But there is no very large event type. Even if all
the sun 2016/03/30 09:12:38 So what happens then if a large event type is *add
terelius 2016/03/30 10:36:59 This code will break the memory limit if a new eve
22 } // namespace
23
24 // RtcEventLogImpl member functions.
25 RtcEventLogHelperThread::RtcEventLogHelperThread(
26 SwapQueue<ControlMessage>* message_queue,
27 SwapQueue<rtclog::Event>* config_queue,
28 SwapQueue<rtclog::Event>* rtp_queue,
29 SwapQueue<rtclog::Event>* rtcp_queue,
30 SwapQueue<rtclog::Event>* acm_playout_queue,
31 SwapQueue<rtclog::Event>* bwe_loss_queue,
32 rtc::Event* wake_up,
33 rtc::Event* stopped,
34 const Clock* const clock)
35 : message_queue_(message_queue),
36 config_queue_(config_queue),
37 rtp_queue_(rtp_queue),
38 rtcp_queue_(rtcp_queue),
39 acm_playout_queue_(acm_playout_queue),
40 bwe_loss_queue_(bwe_loss_queue),
41 history_(kEventsInHistory),
42 config_history_(),
43 file_(FileWrapper::Create()),
the sun 2016/03/11 13:21:57 Why not just let it be null?
terelius 2016/03/18 14:03:25 I could do that, but I don't think there is much d
44 thread_(&ThreadOutputFunction, this, "RtcEventLog thread"),
45 max_size_bytes_(std::numeric_limits<int64_t>::max()),
46 written_bytes_(0),
47 start_time_(0),
48 stop_time_(std::numeric_limits<int64_t>::max()),
49 config_event_(),
50 rtp_event_(),
51 rtcp_event_(),
52 playout_event_(),
53 loss_event_(),
54 valid_config_event_(false),
55 valid_rtp_event_(false),
56 valid_rtcp_event_(false),
57 valid_playout_event_(false),
58 valid_loss_event_(false),
59 output_string_(),
60 wake_up_(wake_up),
61 stopped_(stopped),
62 clock_(clock) {
63 RTC_DCHECK(message_queue_);
64 RTC_DCHECK(config_queue_);
65 RTC_DCHECK(rtp_queue_);
66 RTC_DCHECK(rtcp_queue_);
67 RTC_DCHECK(acm_playout_queue_);
68 RTC_DCHECK(bwe_loss_queue_);
69 RTC_DCHECK(wake_up_);
70 RTC_DCHECK(stopped_);
71 RTC_DCHECK(clock_);
72 thread_.Start();
73 }
74
75 RtcEventLogHelperThread::~RtcEventLogHelperThread() {
76 ControlMessage message;
77 message.message_type = ControlMessage::TERMINATE_THREAD;
78 message.stop_time = clock_->TimeInMicroseconds();
79 while (!message_queue_->Insert(&message)) {
80 // We can't destroy the event log until we have stopped the thread,
81 // so clear the message queue and try again. Note that if we clear
82 // any STOP_FILE events, then the threads calling StopLogging would likely
83 // wait indefinitely. However, there should not be any such calls as we
84 // are executing the destructor.
85 LOG(LS_WARNING) << "Clearing message queue to terminate thread.";
86 message_queue_->Clear();
87 }
88 wake_up_->Set(); // Wake up the output thread.
89 thread_.Stop(); // Wait for the thread to terminate.
90 }
91
92 bool RtcEventLogHelperThread::AppendEventToString(rtclog::Event* event) {
93 rtclog::EventStream event_stream;
94 event_stream.add_stream();
95 event_stream.mutable_stream(0)->Swap(event);
96 // TODO(terelius): We create a new event strem per event, but it will look
97 // like a single stream when we read it back from file.
98 // Is this guaranteed to work e.g. in future versions of protobuf?
the sun 2016/03/11 13:21:57 https://developers.google.com/protocol-buffers/doc
terelius 2016/03/18 14:03:25 Acknowledged.
99 bool stop = true;
100 if (written_bytes_ + static_cast<int64_t>(output_string_.size()) +
101 event_stream.ByteSize() <=
102 max_size_bytes_) {
103 event_stream.AppendToString(&output_string_);
104 stop = false;
105 }
106 // Swap the event back so that we don't mix event types in the queues.
107 event_stream.mutable_stream(0)->Swap(event);
108 return stop;
109 }
110
111 void RtcEventLogHelperThread::AppendEventToHistory(const rtclog::Event& event) {
112 history_.push_back(event);
113 }
114
115 // Traverses the SwapQueues in timestamp order and copies all events earlier
116 // than |current_time| either to the history or to a string that will be
117 // written to disc.
118 bool RtcEventLogHelperThread::ProcessInOrder(bool memory,
119 int64_t current_time) {
120 bool stop = false;
121 enum EventType {
122 CONFIG_EVENT,
123 RTP_EVENT,
124 RTCP_EVENT,
125 PLAYOUT_EVENT,
126 LOSS_EVENT
127 };
128
129 const size_t kNumberOfQueues = 5;
130 const size_t kConfigQueueIndex = 0;
131 SwapQueue<rtclog::Event>* queue[] = {config_queue_, rtp_queue_, rtcp_queue_,
132 acm_playout_queue_, bwe_loss_queue_};
133 rtclog::Event* head_event[] = {&config_event_, &rtp_event_, &rtcp_event_,
134 &playout_event_, &loss_event_};
135 bool* valid_event[] = {&valid_config_event_, &valid_rtp_event_,
136 &valid_rtcp_event_, &valid_playout_event_,
137 &valid_loss_event_};
138
139 while (!stop) {
140 // Extract the head of each queue.
141 for (size_t i = 0; i < kNumberOfQueues; i++) {
142 if (!*valid_event[i]) {
143 *valid_event[i] = queue[i]->Remove(head_event[i]);
144 }
145 }
146 // Find the earliest event (in timestamp order).
147 size_t index_of_earliest_event = kNumberOfQueues;
148 int64_t first_timestamp = std::numeric_limits<int64_t>::max();
149 for (size_t i = 0; i < kNumberOfQueues; i++) {
150 if (*valid_event[i] && head_event[i]->timestamp_us() < first_timestamp) {
151 first_timestamp = head_event[i]->timestamp_us();
152 index_of_earliest_event = i;
153 }
154 }
155
156 if (first_timestamp > current_time) {
157 // We have handled all events earlier than current_time.
158 break;
159 }
160
161 if (index_of_earliest_event == kConfigQueueIndex) {
162 // Stream configurations need special treatment because old configs,
163 // unlike normal events stored in the history, should never be removed.
164 // We need the configurations to parse e.g. RTP headers, so we have to
165 // store them in a special config_history for future logs, regardless of
166 // whether we are logging right now.
167 if (!memory) {
168 stop = AppendEventToString(head_event[index_of_earliest_event]);
169 }
170 config_history_.push_back(*head_event[index_of_earliest_event]);
171 *valid_event[index_of_earliest_event] =
172 queue[index_of_earliest_event]->Remove(
173 head_event[index_of_earliest_event]);
174 } else {
175 // Normal events are either stored in the history or serialized to a
176 // string (which is later written to disc).
177 if (memory) {
178 AppendEventToHistory(*head_event[index_of_earliest_event]);
179 } else {
180 stop = AppendEventToString(head_event[index_of_earliest_event]);
181 }
182 if (!stop) {
183 *valid_event[index_of_earliest_event] =
184 queue[index_of_earliest_event]->Remove(
185 head_event[index_of_earliest_event]);
186 }
187 }
188 }
189 // We want to stop logging either if we have struck the file size limit
190 // or if we have logged all events older than |stop_time_|.
191 return stop || (current_time > stop_time_);
192 }
193
194 void RtcEventLogHelperThread::LogToMemory() {
195 RTC_DCHECK(!file_->Open());
the sun 2016/03/11 13:21:57 Why not RTC_DCHECK(!file_); ?
terelius 2016/03/18 14:03:25 That would work too, but see discussion above.
196
197 // Process each event in order and append it to the appropriate history_.
198 int64_t current_time = clock_->TimeInMicroseconds();
199 ProcessInOrder(true, current_time);
200 }
201
202 void RtcEventLogHelperThread::StartLogFile() {
203 RTC_DCHECK(file_->Open());
204 bool stop = false;
205 output_string_.clear();
206
207 // Create and serialize the LOG_START event.
208 rtclog::Event start_event;
209 start_event.set_timestamp_us(start_time_);
210 start_event.set_type(rtclog::Event::LOG_START);
211 AppendEventToString(&start_event);
212
213 // Serialize the config information for all old streams.
214 for (rtclog::Event& event : config_history_) {
215 AppendEventToString(&event);
216 }
217
218 // Serialize the events in the event queue.
219 for (int i = 0; !history_.empty() && !stop; i++) {
the sun 2016/03/11 13:21:57 What is "i" for?
terelius 2016/03/18 14:03:25 Done, thanks. Was used for debugging.
220 stop = AppendEventToString(&history_.front());
the sun 2016/03/11 13:21:57 Does this mean output_string will now have grown t
terelius 2016/03/18 14:03:25 Yes it does, but the string version uses significa
221 if (!stop) {
222 history_.pop_front();
223 }
224 }
225
226 // Write to file.
227 file_->Write(output_string_.data(), output_string_.size());
228 written_bytes_ += output_string_.size();
229
230 if (stop) {
231 RTC_DCHECK(file_->Open());
232 StopLogFile();
233 }
234 }
235
236 void RtcEventLogHelperThread::LogToFile() {
237 RTC_DCHECK(file_->Open());
238 output_string_.clear();
239
240 // Process each event in order and append it to the output_string_.
241 int64_t current_time = clock_->TimeInMicroseconds();
242 bool stop = ProcessInOrder(false, current_time);
243
244 // Write string to file.
245 file_->Write(output_string_.data(), output_string_.size());
246 written_bytes_ += output_string_.size();
247
248 if (stop || stop_time_ <= current_time) {
249 RTC_DCHECK(file_->Open());
250 StopLogFile();
251 }
252 }
253
254 void RtcEventLogHelperThread::StopLogFile() {
255 RTC_DCHECK(file_->Open());
256 output_string_.clear();
257
258 rtclog::Event end_event;
259 end_event.set_timestamp_us(stop_time_);
260 end_event.set_type(rtclog::Event::LOG_END);
261 AppendEventToString(&end_event);
262
263 if (written_bytes_ + static_cast<int64_t>(output_string_.size()) <=
264 max_size_bytes_) {
265 file_->Write(output_string_.data(), output_string_.size());
266 written_bytes_ += output_string_.size();
267 }
268
269 max_size_bytes_ = std::numeric_limits<int64_t>::max();
270 written_bytes_ = 0;
271 start_time_ = 0;
272 stop_time_ = std::numeric_limits<int64_t>::max();
273 output_string_.clear();
274 file_->CloseFile();
275 RTC_DCHECK(!file_->Open());
276 }
277
278 void RtcEventLogHelperThread::WriteLog() {
279 ControlMessage message;
280
281 while (true) {
282 // Process control messages.
283 while (message_queue_->Remove(&message)) {
284 switch (message.message_type) {
285 case ControlMessage::START_FILE:
286 if (!file_->Open()) {
287 max_size_bytes_ = message.max_size_bytes;
288 start_time_ = message.start_time;
289 stop_time_ = message.stop_time;
290 file_.swap(message.file);
291 StartLogFile();
292 } else {
293 // Already started. Ignore message and close file handle.
294 message.file->CloseFile();
295 }
296 break;
297 case ControlMessage::STOP_FILE:
298 if (file_->Open()) {
299 stop_time_ = message.stop_time;
300 LogToFile(); // Log remaining events from message queues.
301 }
302 // LogToFile might stop on it's own so we need to recheck the state.
303 if (file_->Open()) {
304 StopLogFile();
305 }
306 stopped_->Set();
307 break;
308 case ControlMessage::TERMINATE_THREAD:
309 if (file_->Open()) {
310 StopLogFile();
311 }
312 return;
313 }
314 }
315
316 // Write events to file or memory
317 if (file_->Open()) {
318 LogToFile();
319 } else {
320 LogToMemory();
321 }
322
323 // Accumulate a new batch of events instead of processing them one at a
324 // time.
325 wake_up_->Wait(50);
326 }
327 }
328
329 bool RtcEventLogHelperThread::ThreadOutputFunction(void* obj) {
330 RtcEventLogHelperThread* helper = static_cast<RtcEventLogHelperThread*>(obj);
331 helper->WriteLog();
332 return false;
333 }
334
335 } // namespace webrtc
336
337 #endif // ENABLE_RTC_EVENT_LOG
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698