enclose-io/compiler

View on GitHub
lts/src/inspector_io.cc

Summary

Maintainability
Test Coverage
#include "inspector_io.h"

#include "inspector_socket_server.h"
#include "inspector/main_thread_interface.h"
#include "inspector/node_string.h"
#include "base_object-inl.h"
#include "debug_utils-inl.h"
#include "node.h"
#include "node_crypto.h"
#include "node_internals.h"
#include "node_mutex.h"
#include "v8-inspector.h"
#include "util-inl.h"
#include "zlib.h"

#include <deque>
#include <cstring>
#include <vector>

namespace node {
namespace inspector {
namespace {
using v8_inspector::StringBuffer;
using v8_inspector::StringView;

// kKill closes connections and stops the server, kStop only stops the server
enum class TransportAction { kKill, kSendMessage, kStop };

std::string ScriptPath(uv_loop_t* loop, const std::string& script_name) {
  std::string script_path;

  if (!script_name.empty()) {
    uv_fs_t req;
    req.ptr = nullptr;
    if (0 == uv_fs_realpath(loop, &req, script_name.c_str(), nullptr)) {
      CHECK_NOT_NULL(req.ptr);
      script_path = std::string(static_cast<char*>(req.ptr));
    }
    uv_fs_req_cleanup(&req);
  }

  return script_path;
}

// UUID RFC: https://www.ietf.org/rfc/rfc4122.txt
// Used ver 4 - with numbers
std::string GenerateID() {
  uint16_t buffer[8];
  CHECK(crypto::EntropySource(reinterpret_cast<unsigned char*>(buffer),
                              sizeof(buffer)));

  char uuid[256];
  snprintf(uuid, sizeof(uuid), "%04x%04x-%04x-%04x-%04x-%04x%04x%04x",
           buffer[0],  // time_low
           buffer[1],  // time_mid
           buffer[2],  // time_low
           (buffer[3] & 0x0fff) | 0x4000,  // time_hi_and_version
           (buffer[4] & 0x3fff) | 0x8000,  // clk_seq_hi clk_seq_low
           buffer[5],  // node
           buffer[6],
           buffer[7]);
  return uuid;
}

class RequestToServer {
 public:
  RequestToServer(TransportAction action,
                  int session_id,
                  std::unique_ptr<v8_inspector::StringBuffer> message)
                  : action_(action),
                    session_id_(session_id),
                    message_(std::move(message)) {}

  void Dispatch(InspectorSocketServer* server) const {
    switch (action_) {
      case TransportAction::kKill:
        server->TerminateConnections();
        // Fallthrough
      case TransportAction::kStop:
        server->Stop();
        break;
      case TransportAction::kSendMessage:
        server->Send(
            session_id_,
            protocol::StringUtil::StringViewToUtf8(message_->string()));
        break;
    }
  }

 private:
  TransportAction action_;
  int session_id_;
  std::unique_ptr<v8_inspector::StringBuffer> message_;
};

class RequestQueueData {
 public:
  using MessageQueue = std::deque<RequestToServer>;

  explicit RequestQueueData(uv_loop_t* loop)
                            : handle_(std::make_shared<RequestQueue>(this)) {
    int err = uv_async_init(loop, &async_, [](uv_async_t* async) {
      RequestQueueData* wrapper =
          node::ContainerOf(&RequestQueueData::async_, async);
      wrapper->DoDispatch();
    });
    CHECK_EQ(0, err);
  }

  static void CloseAndFree(RequestQueueData* queue);

  void Post(int session_id,
            TransportAction action,
            std::unique_ptr<StringBuffer> message) {
    Mutex::ScopedLock scoped_lock(state_lock_);
    bool notify = messages_.empty();
    messages_.emplace_back(action, session_id, std::move(message));
    if (notify) {
      CHECK_EQ(0, uv_async_send(&async_));
      incoming_message_cond_.Broadcast(scoped_lock);
    }
  }

  void Wait() {
    Mutex::ScopedLock scoped_lock(state_lock_);
    if (messages_.empty()) {
      incoming_message_cond_.Wait(scoped_lock);
    }
  }

  void SetServer(InspectorSocketServer* server) {
    server_ = server;
  }

  std::shared_ptr<RequestQueue> handle() {
    return handle_;
  }

 private:
  ~RequestQueueData() = default;

  MessageQueue GetMessages() {
    Mutex::ScopedLock scoped_lock(state_lock_);
    MessageQueue messages;
    messages_.swap(messages);
    return messages;
  }

  void DoDispatch() {
    if (server_ == nullptr)
      return;
    for (const auto& request : GetMessages()) {
      request.Dispatch(server_);
    }
  }

  std::shared_ptr<RequestQueue> handle_;
  uv_async_t async_;
  InspectorSocketServer* server_ = nullptr;
  MessageQueue messages_;
  Mutex state_lock_;  // Locked before mutating the queue.
  ConditionVariable incoming_message_cond_;
};
}  // namespace

class RequestQueue {
 public:
  explicit RequestQueue(RequestQueueData* data) : data_(data) {}

  void Reset() {
    Mutex::ScopedLock scoped_lock(lock_);
    data_ = nullptr;
  }

  void Post(int session_id,
            TransportAction action,
            std::unique_ptr<StringBuffer> message) {
    Mutex::ScopedLock scoped_lock(lock_);
    if (data_ != nullptr)
      data_->Post(session_id, action, std::move(message));
  }

  bool Expired() {
    Mutex::ScopedLock scoped_lock(lock_);
    return data_ == nullptr;
  }

 private:
  RequestQueueData* data_;
  Mutex lock_;
};

class IoSessionDelegate : public InspectorSessionDelegate {
 public:
  explicit IoSessionDelegate(std::shared_ptr<RequestQueue> queue, int id)
                             : request_queue_(queue), id_(id) { }
  void SendMessageToFrontend(const v8_inspector::StringView& message) override {
    request_queue_->Post(id_, TransportAction::kSendMessage,
                         StringBuffer::create(message));
  }

 private:
  std::shared_ptr<RequestQueue> request_queue_;
  int id_;
};

// Passed to InspectorSocketServer to handle WS inspector protocol events,
// mostly session start, message received, and session end.
class InspectorIoDelegate: public node::inspector::SocketServerDelegate {
 public:
  InspectorIoDelegate(std::shared_ptr<RequestQueueData> queue,
                      std::shared_ptr<MainThreadHandle> main_threade,
                      const std::string& target_id,
                      const std::string& script_path,
                      const std::string& script_name);
  ~InspectorIoDelegate() override = default;

  void StartSession(int session_id, const std::string& target_id) override;
  void MessageReceived(int session_id, const std::string& message) override;
  void EndSession(int session_id) override;

  std::vector<std::string> GetTargetIds() override;
  std::string GetTargetTitle(const std::string& id) override;
  std::string GetTargetUrl(const std::string& id) override;
  void AssignServer(InspectorSocketServer* server) override {
    request_queue_->SetServer(server);
  }

 private:
  std::shared_ptr<RequestQueueData> request_queue_;
  std::shared_ptr<MainThreadHandle> main_thread_;
  std::unordered_map<int, std::unique_ptr<InspectorSession>> sessions_;
  const std::string script_name_;
  const std::string script_path_;
  const std::string target_id_;
};

// static
std::unique_ptr<InspectorIo> InspectorIo::Start(
    std::shared_ptr<MainThreadHandle> main_thread,
    const std::string& path,
    std::shared_ptr<ExclusiveAccess<HostPort>> host_port,
    const InspectPublishUid& inspect_publish_uid) {
  auto io = std::unique_ptr<InspectorIo>(
      new InspectorIo(main_thread,
                      path,
                      host_port,
                      inspect_publish_uid));
  if (io->request_queue_->Expired()) {  // Thread is not running
    return nullptr;
  }
  return io;
}

InspectorIo::InspectorIo(std::shared_ptr<MainThreadHandle> main_thread,
                         const std::string& path,
                         std::shared_ptr<ExclusiveAccess<HostPort>> host_port,
                         const InspectPublishUid& inspect_publish_uid)
    : main_thread_(main_thread),
      host_port_(host_port),
      inspect_publish_uid_(inspect_publish_uid),
      thread_(),
      script_name_(path),
      id_(GenerateID()) {
  Mutex::ScopedLock scoped_lock(thread_start_lock_);
  CHECK_EQ(uv_thread_create(&thread_, InspectorIo::ThreadMain, this), 0);
  thread_start_condition_.Wait(scoped_lock);
}

InspectorIo::~InspectorIo() {
  request_queue_->Post(0, TransportAction::kKill, nullptr);
  int err = uv_thread_join(&thread_);
  CHECK_EQ(err, 0);
}

void InspectorIo::StopAcceptingNewConnections() {
  request_queue_->Post(0, TransportAction::kStop, nullptr);
}

// static
void InspectorIo::ThreadMain(void* io) {
  static_cast<InspectorIo*>(io)->ThreadMain();
}

void InspectorIo::ThreadMain() {
  uv_loop_t loop;
  loop.data = nullptr;
  int err = uv_loop_init(&loop);
  CHECK_EQ(err, 0);
  std::shared_ptr<RequestQueueData> queue(new RequestQueueData(&loop),
                                          RequestQueueData::CloseAndFree);
  std::string script_path = ScriptPath(&loop, script_name_);
  std::unique_ptr<InspectorIoDelegate> delegate(
      new InspectorIoDelegate(queue, main_thread_, id_,
                              script_path, script_name_));
  std::string host;
  int port;
  {
    ExclusiveAccess<HostPort>::Scoped host_port(host_port_);
    host = host_port->host();
    port = host_port->port();
  }
  InspectorSocketServer server(std::move(delegate),
                               &loop,
                               std::move(host),
                               port,
                               inspect_publish_uid_);
  request_queue_ = queue->handle();
  // Its lifetime is now that of the server delegate
  queue.reset();
  {
    Mutex::ScopedLock scoped_lock(thread_start_lock_);
    if (server.Start()) {
      ExclusiveAccess<HostPort>::Scoped host_port(host_port_);
      host_port->set_port(server.Port());
    }
    thread_start_condition_.Broadcast(scoped_lock);
  }
  uv_run(&loop, UV_RUN_DEFAULT);
  CheckedUvLoopClose(&loop);
}

std::string InspectorIo::GetWsUrl() const {
  ExclusiveAccess<HostPort>::Scoped host_port(host_port_);
  return FormatWsAddress(host_port->host(), host_port->port(), id_, true);
}

InspectorIoDelegate::InspectorIoDelegate(
    std::shared_ptr<RequestQueueData> queue,
    std::shared_ptr<MainThreadHandle> main_thread,
    const std::string& target_id,
    const std::string& script_path,
    const std::string& script_name)
    : request_queue_(queue), main_thread_(main_thread),
      script_name_(script_name), script_path_(script_path),
      target_id_(target_id) {}

void InspectorIoDelegate::StartSession(int session_id,
                                       const std::string& target_id) {
  auto session = main_thread_->Connect(
      std::unique_ptr<InspectorSessionDelegate>(
          new IoSessionDelegate(request_queue_->handle(), session_id)), true);
  if (session) {
    sessions_[session_id] = std::move(session);
    fprintf(stderr, "Debugger attached.\n");
  }
}

void InspectorIoDelegate::MessageReceived(int session_id,
                                          const std::string& message) {
  auto session = sessions_.find(session_id);
  if (session != sessions_.end())
    session->second->Dispatch(Utf8ToStringView(message)->string());
}

void InspectorIoDelegate::EndSession(int session_id) {
  sessions_.erase(session_id);
}

std::vector<std::string> InspectorIoDelegate::GetTargetIds() {
  return { target_id_ };
}

std::string InspectorIoDelegate::GetTargetTitle(const std::string& id) {
  return script_name_.empty() ? GetHumanReadableProcessName() : script_name_;
}

std::string InspectorIoDelegate::GetTargetUrl(const std::string& id) {
  return "file://" + script_path_;
}

// static
void RequestQueueData::CloseAndFree(RequestQueueData* queue) {
  queue->handle_->Reset();
  queue->handle_.reset();
  uv_close(reinterpret_cast<uv_handle_t*>(&queue->async_),
           [](uv_handle_t* handle) {
    uv_async_t* async = reinterpret_cast<uv_async_t*>(handle);
    RequestQueueData* wrapper =
        node::ContainerOf(&RequestQueueData::async_, async);
    delete wrapper;
  });
}
}  // namespace inspector
}  // namespace node