Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(682)

Side by Side Diff: webrtc/logging/rtc_event_log/rtc_event_log_helper_thread.cc

Issue 3007473002: Make RtcEventLogImpl to use a TaskQueue instead of a helper-thread (Closed)
Patch Set: Relax access' threading constraints. Created 3 years, 3 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « webrtc/logging/rtc_event_log/rtc_event_log_helper_thread.h ('k') | no next file » | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
(Empty)
1 /*
2 * Copyright (c) 2016 The WebRTC project authors. All Rights Reserved.
3 *
4 * Use of this source code is governed by a BSD-style license
5 * that can be found in the LICENSE file in the root of the source
6 * tree. An additional intellectual property rights grant can be found
7 * in the file PATENTS. All contributing project authors may
8 * be found in the AUTHORS file in the root of the source tree.
9 */
10
11 #include "webrtc/logging/rtc_event_log/rtc_event_log_helper_thread.h"
12
13 #include <algorithm>
14
15 #include "webrtc/rtc_base/checks.h"
16 #include "webrtc/rtc_base/logging.h"
17 #include "webrtc/rtc_base/timeutils.h"
18
19 #ifdef ENABLE_RTC_EVENT_LOG
20
21 namespace webrtc {
22
23 namespace {
24 const int kEventsInHistory = 10000;
25
26 bool IsConfigEvent(const rtclog::Event& event) {
27 rtclog::Event_EventType event_type = event.type();
28 return event_type == rtclog::Event::VIDEO_RECEIVER_CONFIG_EVENT ||
29 event_type == rtclog::Event::VIDEO_SENDER_CONFIG_EVENT ||
30 event_type == rtclog::Event::AUDIO_RECEIVER_CONFIG_EVENT ||
31 event_type == rtclog::Event::AUDIO_SENDER_CONFIG_EVENT;
32 }
33 } // namespace
34
35 // RtcEventLogImpl member functions.
36 RtcEventLogHelperThread::RtcEventLogHelperThread(
37 SwapQueue<ControlMessage>* message_queue,
38 SwapQueue<std::unique_ptr<rtclog::Event>>* event_queue)
39 : message_queue_(message_queue),
40 event_queue_(event_queue),
41 file_(FileWrapper::Create()),
42 thread_(&ThreadOutputFunction, this, "RtcEventLog thread"),
43 max_size_bytes_(std::numeric_limits<int64_t>::max()),
44 written_bytes_(0),
45 start_time_(0),
46 stop_time_(std::numeric_limits<int64_t>::max()),
47 has_recent_event_(false),
48 wake_periodically_(false, false),
49 wake_from_hibernation_(false, false),
50 file_finished_(false, false) {
51 RTC_DCHECK(message_queue_);
52 RTC_DCHECK(event_queue_);
53 thread_.Start();
54 }
55
56 RtcEventLogHelperThread::~RtcEventLogHelperThread() {
57 ControlMessage message;
58 message.message_type = ControlMessage::TERMINATE_THREAD;
59 message.stop_time = rtc::TimeMicros();
60 while (!message_queue_->Insert(&message)) {
61 // We can't destroy the event log until we have stopped the thread,
62 // so clear the message queue and try again. Note that if we clear
63 // any STOP_FILE events, then the threads calling StopLogging would likely
64 // wait indefinitely. However, there should not be any such calls as we
65 // are executing the destructor.
66 LOG(LS_WARNING) << "Clearing message queue to terminate thread.";
67 message_queue_->Clear();
68 }
69 wake_from_hibernation_.Set();
70 wake_periodically_.Set(); // Wake up the output thread.
71 thread_.Stop(); // Wait for the thread to terminate.
72 }
73
74 void RtcEventLogHelperThread::WaitForFileFinished() {
75 wake_from_hibernation_.Set();
76 wake_periodically_.Set();
77 file_finished_.Wait(rtc::Event::kForever);
78 }
79
80 void RtcEventLogHelperThread::SignalNewEvent() {
81 wake_from_hibernation_.Set();
82 }
83
84 bool RtcEventLogHelperThread::AppendEventToString(rtclog::Event* event) {
85 rtclog::EventStream event_stream;
86 event_stream.add_stream();
87 event_stream.mutable_stream(0)->Swap(event);
88 // We create a new event stream per event but because of the way protobufs
89 // are encoded, events can be merged by concatenating them. Therefore,
90 // it will look like a single stream when we read it back from file.
91 bool stop = true;
92 if (written_bytes_ + static_cast<int64_t>(output_string_.size()) +
93 event_stream.ByteSize() <=
94 max_size_bytes_) {
95 event_stream.AppendToString(&output_string_);
96 stop = false;
97 }
98 // Swap the event back so that we don't mix event types in the queues.
99 event_stream.mutable_stream(0)->Swap(event);
100 return stop;
101 }
102
103 bool RtcEventLogHelperThread::LogToMemory() {
104 RTC_DCHECK(!file_->is_open());
105 bool message_received = false;
106
107 // Process each event earlier than the current time and append it to the
108 // appropriate history_.
109 int64_t current_time = rtc::TimeMicros();
110 if (!has_recent_event_) {
111 has_recent_event_ = event_queue_->Remove(&most_recent_event_);
112 }
113 while (has_recent_event_ &&
114 most_recent_event_->timestamp_us() <= current_time) {
115 if (IsConfigEvent(*most_recent_event_)) {
116 config_history_.push_back(std::move(most_recent_event_));
117 } else {
118 history_.push_back(std::move(most_recent_event_));
119 if (history_.size() > kEventsInHistory)
120 history_.pop_front();
121 }
122 has_recent_event_ = event_queue_->Remove(&most_recent_event_);
123 message_received = true;
124 }
125 return message_received;
126 }
127
128 void RtcEventLogHelperThread::StartLogFile() {
129 RTC_DCHECK(file_->is_open());
130 bool stop = false;
131 output_string_.clear();
132
133 // Create and serialize the LOG_START event.
134 rtclog::Event start_event;
135 start_event.set_timestamp_us(start_time_);
136 start_event.set_type(rtclog::Event::LOG_START);
137 AppendEventToString(&start_event);
138
139 // Serialize the config information for all old streams.
140 for (auto& event : config_history_) {
141 AppendEventToString(event.get());
142 }
143
144 // Serialize the events in the event queue.
145 while (!history_.empty() && !stop) {
146 stop = AppendEventToString(history_.front().get());
147 if (!stop) {
148 history_.pop_front();
149 }
150 }
151
152 // Write to file.
153 if (!file_->Write(output_string_.data(), output_string_.size())) {
154 LOG(LS_ERROR) << "FileWrapper failed to write WebRtcEventLog file.";
155 // The current FileWrapper implementation closes the file on error.
156 RTC_DCHECK(!file_->is_open());
157 return;
158 }
159 written_bytes_ += output_string_.size();
160
161 // Free the allocated memory since we probably won't need this amount of
162 // space again.
163 output_string_.clear();
164 output_string_.shrink_to_fit();
165
166 if (stop) {
167 RTC_DCHECK(file_->is_open());
168 StopLogFile();
169 }
170 }
171
172 bool RtcEventLogHelperThread::LogToFile() {
173 RTC_DCHECK(file_->is_open());
174 output_string_.clear();
175 bool message_received = false;
176
177 // Append each event older than both the current time and the stop time
178 // to the output_string_.
179 int64_t current_time = rtc::TimeMicros();
180 int64_t time_limit = std::min(current_time, stop_time_);
181 if (!has_recent_event_) {
182 has_recent_event_ = event_queue_->Remove(&most_recent_event_);
183 }
184 bool stop = false;
185 while (!stop && has_recent_event_ &&
186 most_recent_event_->timestamp_us() <= time_limit) {
187 stop = AppendEventToString(most_recent_event_.get());
188 if (!stop) {
189 if (IsConfigEvent(*most_recent_event_)) {
190 config_history_.push_back(std::move(most_recent_event_));
191 }
192 has_recent_event_ = event_queue_->Remove(&most_recent_event_);
193 }
194 message_received = true;
195 }
196
197 // Write string to file.
198 if (!file_->Write(output_string_.data(), output_string_.size())) {
199 LOG(LS_ERROR) << "FileWrapper failed to write WebRtcEventLog file.";
200 // The current FileWrapper implementation closes the file on error.
201 RTC_DCHECK(!file_->is_open());
202 return message_received;
203 }
204 written_bytes_ += output_string_.size();
205
206 // We want to stop logging if we have reached the file size limit. We also
207 // want to stop logging if the remaining events are more recent than the
208 // time limit, or in other words if we have terminated the loop despite
209 // having more events in the queue.
210 if ((has_recent_event_ && most_recent_event_->timestamp_us() > stop_time_) ||
211 stop) {
212 RTC_DCHECK(file_->is_open());
213 StopLogFile();
214 }
215 return message_received;
216 }
217
218 void RtcEventLogHelperThread::StopLogFile() {
219 RTC_DCHECK(file_->is_open());
220 output_string_.clear();
221
222 rtclog::Event end_event;
223 // This function can be called either because we have reached the stop time,
224 // or because we have reached the log file size limit. Therefore, use the
225 // current time if we have not reached the time limit.
226 end_event.set_timestamp_us(
227 std::min(stop_time_, rtc::TimeMicros()));
228 end_event.set_type(rtclog::Event::LOG_END);
229 AppendEventToString(&end_event);
230
231 if (written_bytes_ + static_cast<int64_t>(output_string_.size()) <=
232 max_size_bytes_) {
233 if (!file_->Write(output_string_.data(), output_string_.size())) {
234 LOG(LS_ERROR) << "FileWrapper failed to write WebRtcEventLog file.";
235 // The current FileWrapper implementation closes the file on error.
236 RTC_DCHECK(!file_->is_open());
237 }
238 written_bytes_ += output_string_.size();
239 }
240
241 max_size_bytes_ = std::numeric_limits<int64_t>::max();
242 written_bytes_ = 0;
243 start_time_ = 0;
244 stop_time_ = std::numeric_limits<int64_t>::max();
245 output_string_.clear();
246 file_->CloseFile();
247 RTC_DCHECK(!file_->is_open());
248 }
249
250 void RtcEventLogHelperThread::ProcessEvents() {
251 ControlMessage message;
252
253 while (true) {
254 bool message_received = false;
255 // Process control messages.
256 while (message_queue_->Remove(&message)) {
257 switch (message.message_type) {
258 case ControlMessage::START_FILE:
259 if (!file_->is_open()) {
260 max_size_bytes_ = message.max_size_bytes;
261 start_time_ = message.start_time;
262 stop_time_ = message.stop_time;
263 file_.swap(message.file);
264 StartLogFile();
265 } else {
266 // Already started. Ignore message and close file handle.
267 message.file->CloseFile();
268 }
269 message_received = true;
270 break;
271 case ControlMessage::STOP_FILE:
272 if (file_->is_open()) {
273 stop_time_ = message.stop_time;
274 LogToFile(); // Log remaining events from message queues.
275 }
276 // LogToFile might stop on it's own so we need to recheck the state.
277 if (file_->is_open()) {
278 StopLogFile();
279 }
280 file_finished_.Set();
281 message_received = true;
282 break;
283 case ControlMessage::TERMINATE_THREAD:
284 if (file_->is_open()) {
285 StopLogFile();
286 }
287 return;
288 }
289 }
290
291 // Write events to file or memory.
292 if (file_->is_open()) {
293 message_received |= LogToFile();
294 } else {
295 message_received |= LogToMemory();
296 }
297
298 // Accumulate a new batch of events instead of processing them one at a
299 // time.
300 if (message_received) {
301 wake_periodically_.Wait(100);
302 } else {
303 wake_from_hibernation_.Wait(rtc::Event::kForever);
304 }
305 }
306 }
307
308 void RtcEventLogHelperThread::ThreadOutputFunction(void* obj) {
309 RtcEventLogHelperThread* helper = static_cast<RtcEventLogHelperThread*>(obj);
310 helper->ProcessEvents();
311 }
312
313 } // namespace webrtc
314
315 #endif // ENABLE_RTC_EVENT_LOG
OLDNEW
« no previous file with comments | « webrtc/logging/rtc_event_log/rtc_event_log_helper_thread.h ('k') | no next file » | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698