enclose-io/compiler

View on GitHub
lts/src/tracing/node_trace_writer.cc

Summary

Maintainability
Test Coverage
#include "tracing/node_trace_writer.h"

#include "util-inl.h"

#include <fcntl.h>
#include <cstring>

namespace node {
namespace tracing {

NodeTraceWriter::NodeTraceWriter(const std::string& log_file_pattern)
    : log_file_pattern_(log_file_pattern) {}

void NodeTraceWriter::InitializeOnThread(uv_loop_t* loop) {
  CHECK_NULL(tracing_loop_);
  tracing_loop_ = loop;

  flush_signal_.data = this;
  int err = uv_async_init(tracing_loop_, &flush_signal_,
                          [](uv_async_t* signal) {
    NodeTraceWriter* trace_writer =
        ContainerOf(&NodeTraceWriter::flush_signal_, signal);
    trace_writer->FlushPrivate();
  });
  CHECK_EQ(err, 0);

  exit_signal_.data = this;
  err = uv_async_init(tracing_loop_, &exit_signal_, ExitSignalCb);
  CHECK_EQ(err, 0);
}

void NodeTraceWriter::WriteSuffix() {
  // If our final log file has traces, then end the file appropriately.
  // This means that if no trace events are recorded, then no trace file is
  // produced.
  bool should_flush = false;
  {
    Mutex::ScopedLock scoped_lock(stream_mutex_);
    if (total_traces_ > 0) {
      total_traces_ = kTracesPerFile;  // Act as if we reached the file limit.
      should_flush = true;
    }
  }
  if (should_flush) {
    Flush(true);
  }
}

NodeTraceWriter::~NodeTraceWriter() {
  WriteSuffix();
  uv_fs_t req;
  if (fd_ != -1) {
    CHECK_EQ(0, uv_fs_close(nullptr, &req, fd_, nullptr));
    uv_fs_req_cleanup(&req);
  }
  uv_async_send(&exit_signal_);
  Mutex::ScopedLock scoped_lock(request_mutex_);
  while (!exited_) {
    exit_cond_.Wait(scoped_lock);
  }
}

void replace_substring(std::string* target,
                       const std::string& search,
                       const std::string& insert) {
  size_t pos = target->find(search);
  for (; pos != std::string::npos; pos = target->find(search, pos)) {
    target->replace(pos, search.size(), insert);
    pos += insert.size();
  }
}

void NodeTraceWriter::OpenNewFileForStreaming() {
  ++file_num_;
  uv_fs_t req;

  // Evaluate a JS-style template string, it accepts the values ${pid} and
  // ${rotation}
  std::string filepath(log_file_pattern_);
  replace_substring(&filepath, "${pid}", std::to_string(uv_os_getpid()));
  replace_substring(&filepath, "${rotation}", std::to_string(file_num_));

  if (fd_ != -1) {
    CHECK_EQ(uv_fs_close(nullptr, &req, fd_, nullptr), 0);
    uv_fs_req_cleanup(&req);
  }

  fd_ = uv_fs_open(nullptr, &req, filepath.c_str(),
      O_CREAT | O_WRONLY | O_TRUNC, 0644, nullptr);
  uv_fs_req_cleanup(&req);
  if (fd_ < 0) {
    fprintf(stderr, "Could not open trace file %s: %s\n",
                    filepath.c_str(),
                    uv_strerror(fd_));
    fd_ = -1;
  }
}

void NodeTraceWriter::AppendTraceEvent(TraceObject* trace_event) {
  Mutex::ScopedLock scoped_lock(stream_mutex_);
  // If this is the first trace event, open a new file for streaming.
  if (total_traces_ == 0) {
    OpenNewFileForStreaming();
    // Constructing a new JSONTraceWriter object appends "{\"traceEvents\":["
    // to stream_.
    // In other words, the constructor initializes the serialization stream
    // to a state where we can start writing trace events to it.
    // Repeatedly constructing and destroying json_trace_writer_ allows
    // us to use V8's JSON writer instead of implementing our own.
    json_trace_writer_.reset(TraceWriter::CreateJSONTraceWriter(stream_));
  }
  ++total_traces_;
  json_trace_writer_->AppendTraceEvent(trace_event);
}

void NodeTraceWriter::FlushPrivate() {
  std::string str;
  int highest_request_id;
  {
    Mutex::ScopedLock stream_scoped_lock(stream_mutex_);
    if (total_traces_ >= kTracesPerFile) {
      total_traces_ = 0;
      // Destroying the member JSONTraceWriter object appends "]}" to
      // stream_ - in other words, ending a JSON file.
      json_trace_writer_.reset();
    }
    // str() makes a copy of the contents of the stream.
    str = stream_.str();
    stream_.str("");
    stream_.clear();
  }
  {
    Mutex::ScopedLock request_scoped_lock(request_mutex_);
    highest_request_id = num_write_requests_;
  }
  WriteToFile(std::move(str), highest_request_id);
}

void NodeTraceWriter::Flush(bool blocking) {
  Mutex::ScopedLock scoped_lock(request_mutex_);
  {
    // We need to lock the mutexes here in a nested fashion; stream_mutex_
    // protects json_trace_writer_, and without request_mutex_ there might be
    // a time window in which the stream state changes?
    Mutex::ScopedLock stream_mutex_lock(stream_mutex_);
    if (!json_trace_writer_)
      return;
  }
  int request_id = ++num_write_requests_;
  int err = uv_async_send(&flush_signal_);
  CHECK_EQ(err, 0);
  if (blocking) {
    // Wait until data associated with this request id has been written to disk.
    // This guarantees that data from all earlier requests have also been
    // written.
    while (request_id > highest_request_id_completed_) {
      request_cond_.Wait(scoped_lock);
    }
  }
}

void NodeTraceWriter::WriteToFile(std::string&& str, int highest_request_id) {
  if (fd_ == -1) return;

  uv_buf_t buf = uv_buf_init(nullptr, 0);
  {
    Mutex::ScopedLock lock(request_mutex_);
    write_req_queue_.emplace(WriteRequest {
      std::move(str), highest_request_id
    });
    if (write_req_queue_.size() == 1) {
      buf = uv_buf_init(
          const_cast<char*>(write_req_queue_.front().str.c_str()),
          write_req_queue_.front().str.length());
    }
  }
  // Only one write request for the same file descriptor should be active at
  // a time.
  if (buf.base != nullptr && fd_ != -1) {
    StartWrite(buf);
  }
}

void NodeTraceWriter::StartWrite(uv_buf_t buf) {
  int err = uv_fs_write(
      tracing_loop_, &write_req_, fd_, &buf, 1, -1,
      [](uv_fs_t* req) {
        NodeTraceWriter* writer =
            ContainerOf(&NodeTraceWriter::write_req_, req);
        writer->AfterWrite();
      });
  CHECK_EQ(err, 0);
}

void NodeTraceWriter::AfterWrite() {
  CHECK_GE(write_req_.result, 0);
  uv_fs_req_cleanup(&write_req_);

  uv_buf_t buf = uv_buf_init(nullptr, 0);
  {
    Mutex::ScopedLock scoped_lock(request_mutex_);
    int highest_request_id = write_req_queue_.front().highest_request_id;
    write_req_queue_.pop();
    highest_request_id_completed_ = highest_request_id;
    request_cond_.Broadcast(scoped_lock);
    if (!write_req_queue_.empty()) {
      buf = uv_buf_init(
          const_cast<char*>(write_req_queue_.front().str.c_str()),
          write_req_queue_.front().str.length());
    }
  }
  if (buf.base != nullptr && fd_ != -1) {
    StartWrite(buf);
  }
}

// static
void NodeTraceWriter::ExitSignalCb(uv_async_t* signal) {
  NodeTraceWriter* trace_writer =
      ContainerOf(&NodeTraceWriter::exit_signal_, signal);
  // Close both flush_signal_ and exit_signal_.
  uv_close(reinterpret_cast<uv_handle_t*>(&trace_writer->flush_signal_),
           [](uv_handle_t* signal) {
             NodeTraceWriter* trace_writer =
                 ContainerOf(&NodeTraceWriter::flush_signal_,
                             reinterpret_cast<uv_async_t*>(signal));
             uv_close(
                 reinterpret_cast<uv_handle_t*>(&trace_writer->exit_signal_),
                 [](uv_handle_t* signal) {
                   NodeTraceWriter* trace_writer =
                       ContainerOf(&NodeTraceWriter::exit_signal_,
                                   reinterpret_cast<uv_async_t*>(signal));
                   Mutex::ScopedLock scoped_lock(trace_writer->request_mutex_);
                   trace_writer->exited_ = true;
                   trace_writer->exit_cond_.Signal(scoped_lock);
                 });
           });
}
}  // namespace tracing
}  // namespace node