lts/src/tracing/node_trace_writer.cc
#include "tracing/node_trace_writer.h"
#include "util-inl.h"
#include <fcntl.h>
#include <cstring>
namespace node {
namespace tracing {
NodeTraceWriter::NodeTraceWriter(const std::string& log_file_pattern)
: log_file_pattern_(log_file_pattern) {}
void NodeTraceWriter::InitializeOnThread(uv_loop_t* loop) {
CHECK_NULL(tracing_loop_);
tracing_loop_ = loop;
flush_signal_.data = this;
int err = uv_async_init(tracing_loop_, &flush_signal_,
[](uv_async_t* signal) {
NodeTraceWriter* trace_writer =
ContainerOf(&NodeTraceWriter::flush_signal_, signal);
trace_writer->FlushPrivate();
});
CHECK_EQ(err, 0);
exit_signal_.data = this;
err = uv_async_init(tracing_loop_, &exit_signal_, ExitSignalCb);
CHECK_EQ(err, 0);
}
void NodeTraceWriter::WriteSuffix() {
// If our final log file has traces, then end the file appropriately.
// This means that if no trace events are recorded, then no trace file is
// produced.
bool should_flush = false;
{
Mutex::ScopedLock scoped_lock(stream_mutex_);
if (total_traces_ > 0) {
total_traces_ = kTracesPerFile; // Act as if we reached the file limit.
should_flush = true;
}
}
if (should_flush) {
Flush(true);
}
}
NodeTraceWriter::~NodeTraceWriter() {
WriteSuffix();
uv_fs_t req;
if (fd_ != -1) {
CHECK_EQ(0, uv_fs_close(nullptr, &req, fd_, nullptr));
uv_fs_req_cleanup(&req);
}
uv_async_send(&exit_signal_);
Mutex::ScopedLock scoped_lock(request_mutex_);
while (!exited_) {
exit_cond_.Wait(scoped_lock);
}
}
void replace_substring(std::string* target,
const std::string& search,
const std::string& insert) {
size_t pos = target->find(search);
for (; pos != std::string::npos; pos = target->find(search, pos)) {
target->replace(pos, search.size(), insert);
pos += insert.size();
}
}
void NodeTraceWriter::OpenNewFileForStreaming() {
++file_num_;
uv_fs_t req;
// Evaluate a JS-style template string, it accepts the values ${pid} and
// ${rotation}
std::string filepath(log_file_pattern_);
replace_substring(&filepath, "${pid}", std::to_string(uv_os_getpid()));
replace_substring(&filepath, "${rotation}", std::to_string(file_num_));
if (fd_ != -1) {
CHECK_EQ(uv_fs_close(nullptr, &req, fd_, nullptr), 0);
uv_fs_req_cleanup(&req);
}
fd_ = uv_fs_open(nullptr, &req, filepath.c_str(),
O_CREAT | O_WRONLY | O_TRUNC, 0644, nullptr);
uv_fs_req_cleanup(&req);
if (fd_ < 0) {
fprintf(stderr, "Could not open trace file %s: %s\n",
filepath.c_str(),
uv_strerror(fd_));
fd_ = -1;
}
}
void NodeTraceWriter::AppendTraceEvent(TraceObject* trace_event) {
Mutex::ScopedLock scoped_lock(stream_mutex_);
// If this is the first trace event, open a new file for streaming.
if (total_traces_ == 0) {
OpenNewFileForStreaming();
// Constructing a new JSONTraceWriter object appends "{\"traceEvents\":["
// to stream_.
// In other words, the constructor initializes the serialization stream
// to a state where we can start writing trace events to it.
// Repeatedly constructing and destroying json_trace_writer_ allows
// us to use V8's JSON writer instead of implementing our own.
json_trace_writer_.reset(TraceWriter::CreateJSONTraceWriter(stream_));
}
++total_traces_;
json_trace_writer_->AppendTraceEvent(trace_event);
}
void NodeTraceWriter::FlushPrivate() {
std::string str;
int highest_request_id;
{
Mutex::ScopedLock stream_scoped_lock(stream_mutex_);
if (total_traces_ >= kTracesPerFile) {
total_traces_ = 0;
// Destroying the member JSONTraceWriter object appends "]}" to
// stream_ - in other words, ending a JSON file.
json_trace_writer_.reset();
}
// str() makes a copy of the contents of the stream.
str = stream_.str();
stream_.str("");
stream_.clear();
}
{
Mutex::ScopedLock request_scoped_lock(request_mutex_);
highest_request_id = num_write_requests_;
}
WriteToFile(std::move(str), highest_request_id);
}
void NodeTraceWriter::Flush(bool blocking) {
Mutex::ScopedLock scoped_lock(request_mutex_);
{
// We need to lock the mutexes here in a nested fashion; stream_mutex_
// protects json_trace_writer_, and without request_mutex_ there might be
// a time window in which the stream state changes?
Mutex::ScopedLock stream_mutex_lock(stream_mutex_);
if (!json_trace_writer_)
return;
}
int request_id = ++num_write_requests_;
int err = uv_async_send(&flush_signal_);
CHECK_EQ(err, 0);
if (blocking) {
// Wait until data associated with this request id has been written to disk.
// This guarantees that data from all earlier requests have also been
// written.
while (request_id > highest_request_id_completed_) {
request_cond_.Wait(scoped_lock);
}
}
}
void NodeTraceWriter::WriteToFile(std::string&& str, int highest_request_id) {
if (fd_ == -1) return;
uv_buf_t buf = uv_buf_init(nullptr, 0);
{
Mutex::ScopedLock lock(request_mutex_);
write_req_queue_.emplace(WriteRequest {
std::move(str), highest_request_id
});
if (write_req_queue_.size() == 1) {
buf = uv_buf_init(
const_cast<char*>(write_req_queue_.front().str.c_str()),
write_req_queue_.front().str.length());
}
}
// Only one write request for the same file descriptor should be active at
// a time.
if (buf.base != nullptr && fd_ != -1) {
StartWrite(buf);
}
}
void NodeTraceWriter::StartWrite(uv_buf_t buf) {
int err = uv_fs_write(
tracing_loop_, &write_req_, fd_, &buf, 1, -1,
[](uv_fs_t* req) {
NodeTraceWriter* writer =
ContainerOf(&NodeTraceWriter::write_req_, req);
writer->AfterWrite();
});
CHECK_EQ(err, 0);
}
void NodeTraceWriter::AfterWrite() {
CHECK_GE(write_req_.result, 0);
uv_fs_req_cleanup(&write_req_);
uv_buf_t buf = uv_buf_init(nullptr, 0);
{
Mutex::ScopedLock scoped_lock(request_mutex_);
int highest_request_id = write_req_queue_.front().highest_request_id;
write_req_queue_.pop();
highest_request_id_completed_ = highest_request_id;
request_cond_.Broadcast(scoped_lock);
if (!write_req_queue_.empty()) {
buf = uv_buf_init(
const_cast<char*>(write_req_queue_.front().str.c_str()),
write_req_queue_.front().str.length());
}
}
if (buf.base != nullptr && fd_ != -1) {
StartWrite(buf);
}
}
// static
void NodeTraceWriter::ExitSignalCb(uv_async_t* signal) {
NodeTraceWriter* trace_writer =
ContainerOf(&NodeTraceWriter::exit_signal_, signal);
// Close both flush_signal_ and exit_signal_.
uv_close(reinterpret_cast<uv_handle_t*>(&trace_writer->flush_signal_),
[](uv_handle_t* signal) {
NodeTraceWriter* trace_writer =
ContainerOf(&NodeTraceWriter::flush_signal_,
reinterpret_cast<uv_async_t*>(signal));
uv_close(
reinterpret_cast<uv_handle_t*>(&trace_writer->exit_signal_),
[](uv_handle_t* signal) {
NodeTraceWriter* trace_writer =
ContainerOf(&NodeTraceWriter::exit_signal_,
reinterpret_cast<uv_async_t*>(signal));
Mutex::ScopedLock scoped_lock(trace_writer->request_mutex_);
trace_writer->exited_ = true;
trace_writer->exit_cond_.Signal(scoped_lock);
});
});
}
} // namespace tracing
} // namespace node