enclose-io/compiler

View on GitHub
lts/src/node_platform.cc

Summary

Maintainability
Test Coverage
#include "node_platform.h"
#include "node_internals.h"

#include "env-inl.h"
#include "debug_utils-inl.h"
#include <algorithm>  // find_if(), find(), move()
#include <cmath>  // llround()
#include <memory>  // unique_ptr(), shared_ptr(), make_shared()

namespace node {

using v8::Isolate;
using v8::Object;
using v8::Platform;
using v8::Task;
using node::tracing::TracingController;

namespace {

struct PlatformWorkerData {
  TaskQueue<Task>* task_queue;
  Mutex* platform_workers_mutex;
  ConditionVariable* platform_workers_ready;
  int* pending_platform_workers;
  int id;
};

static void PlatformWorkerThread(void* data) {
  std::unique_ptr<PlatformWorkerData>
      worker_data(static_cast<PlatformWorkerData*>(data));

  TaskQueue<Task>* pending_worker_tasks = worker_data->task_queue;
  TRACE_EVENT_METADATA1("__metadata", "thread_name", "name",
                        "PlatformWorkerThread");

  // Notify the main thread that the platform worker is ready.
  {
    Mutex::ScopedLock lock(*worker_data->platform_workers_mutex);
    (*worker_data->pending_platform_workers)--;
    worker_data->platform_workers_ready->Signal(lock);
  }

  while (std::unique_ptr<Task> task = pending_worker_tasks->BlockingPop()) {
    task->Run();
    pending_worker_tasks->NotifyOfCompletion();
  }
}

}  // namespace

class WorkerThreadsTaskRunner::DelayedTaskScheduler {
 public:
  explicit DelayedTaskScheduler(TaskQueue<Task>* tasks)
    : pending_worker_tasks_(tasks) {}

  std::unique_ptr<uv_thread_t> Start() {
    auto start_thread = [](void* data) {
      static_cast<DelayedTaskScheduler*>(data)->Run();
    };
    std::unique_ptr<uv_thread_t> t { new uv_thread_t() };
    uv_sem_init(&ready_, 0);
    CHECK_EQ(0, uv_thread_create(t.get(), start_thread, this));
    uv_sem_wait(&ready_);
    uv_sem_destroy(&ready_);
    return t;
  }

  void PostDelayedTask(std::unique_ptr<Task> task, double delay_in_seconds) {
    tasks_.Push(std::make_unique<ScheduleTask>(this, std::move(task),
                                               delay_in_seconds));
    uv_async_send(&flush_tasks_);
  }

  void Stop() {
    tasks_.Push(std::make_unique<StopTask>(this));
    uv_async_send(&flush_tasks_);
  }

 private:
  void Run() {
    TRACE_EVENT_METADATA1("__metadata", "thread_name", "name",
                          "WorkerThreadsTaskRunner::DelayedTaskScheduler");
    loop_.data = this;
    CHECK_EQ(0, uv_loop_init(&loop_));
    flush_tasks_.data = this;
    CHECK_EQ(0, uv_async_init(&loop_, &flush_tasks_, FlushTasks));
    uv_sem_post(&ready_);

    uv_run(&loop_, UV_RUN_DEFAULT);
    CheckedUvLoopClose(&loop_);
  }

  static void FlushTasks(uv_async_t* flush_tasks) {
    DelayedTaskScheduler* scheduler =
        ContainerOf(&DelayedTaskScheduler::loop_, flush_tasks->loop);
    while (std::unique_ptr<Task> task = scheduler->tasks_.Pop())
      task->Run();
  }

  class StopTask : public Task {
   public:
    explicit StopTask(DelayedTaskScheduler* scheduler): scheduler_(scheduler) {}

    void Run() override {
      std::vector<uv_timer_t*> timers;
      for (uv_timer_t* timer : scheduler_->timers_)
        timers.push_back(timer);
      for (uv_timer_t* timer : timers)
        scheduler_->TakeTimerTask(timer);
      uv_close(reinterpret_cast<uv_handle_t*>(&scheduler_->flush_tasks_),
               [](uv_handle_t* handle) {});
    }

   private:
     DelayedTaskScheduler* scheduler_;
  };

  class ScheduleTask : public Task {
   public:
    ScheduleTask(DelayedTaskScheduler* scheduler,
                 std::unique_ptr<Task> task,
                 double delay_in_seconds)
      : scheduler_(scheduler),
        task_(std::move(task)),
        delay_in_seconds_(delay_in_seconds) {}

    void Run() override {
      uint64_t delay_millis = llround(delay_in_seconds_ * 1000);
      std::unique_ptr<uv_timer_t> timer(new uv_timer_t());
      CHECK_EQ(0, uv_timer_init(&scheduler_->loop_, timer.get()));
      timer->data = task_.release();
      CHECK_EQ(0, uv_timer_start(timer.get(), RunTask, delay_millis, 0));
      scheduler_->timers_.insert(timer.release());
    }

   private:
    DelayedTaskScheduler* scheduler_;
    std::unique_ptr<Task> task_;
    double delay_in_seconds_;
  };

  static void RunTask(uv_timer_t* timer) {
    DelayedTaskScheduler* scheduler =
        ContainerOf(&DelayedTaskScheduler::loop_, timer->loop);
    scheduler->pending_worker_tasks_->Push(scheduler->TakeTimerTask(timer));
  }

  std::unique_ptr<Task> TakeTimerTask(uv_timer_t* timer) {
    std::unique_ptr<Task> task(static_cast<Task*>(timer->data));
    uv_timer_stop(timer);
    uv_close(reinterpret_cast<uv_handle_t*>(timer), [](uv_handle_t* handle) {
      delete reinterpret_cast<uv_timer_t*>(handle);
    });
    timers_.erase(timer);
    return task;
  }

  uv_sem_t ready_;
  TaskQueue<Task>* pending_worker_tasks_;

  TaskQueue<Task> tasks_;
  uv_loop_t loop_;
  uv_async_t flush_tasks_;
  std::unordered_set<uv_timer_t*> timers_;
};

WorkerThreadsTaskRunner::WorkerThreadsTaskRunner(int thread_pool_size) {
  Mutex platform_workers_mutex;
  ConditionVariable platform_workers_ready;

  Mutex::ScopedLock lock(platform_workers_mutex);
  int pending_platform_workers = thread_pool_size;

  delayed_task_scheduler_ = std::make_unique<DelayedTaskScheduler>(
      &pending_worker_tasks_);
  threads_.push_back(delayed_task_scheduler_->Start());

  for (int i = 0; i < thread_pool_size; i++) {
    PlatformWorkerData* worker_data = new PlatformWorkerData{
      &pending_worker_tasks_, &platform_workers_mutex,
      &platform_workers_ready, &pending_platform_workers, i
    };
    std::unique_ptr<uv_thread_t> t { new uv_thread_t() };
    if (uv_thread_create(t.get(), PlatformWorkerThread,
                         worker_data) != 0) {
      break;
    }
    threads_.push_back(std::move(t));
  }

  // Wait for platform workers to initialize before continuing with the
  // bootstrap.
  while (pending_platform_workers > 0) {
    platform_workers_ready.Wait(lock);
  }
}

void WorkerThreadsTaskRunner::PostTask(std::unique_ptr<Task> task) {
  pending_worker_tasks_.Push(std::move(task));
}

void WorkerThreadsTaskRunner::PostDelayedTask(std::unique_ptr<Task> task,
                                              double delay_in_seconds) {
  delayed_task_scheduler_->PostDelayedTask(std::move(task), delay_in_seconds);
}

void WorkerThreadsTaskRunner::BlockingDrain() {
  pending_worker_tasks_.BlockingDrain();
}

void WorkerThreadsTaskRunner::Shutdown() {
  pending_worker_tasks_.Stop();
  delayed_task_scheduler_->Stop();
  for (size_t i = 0; i < threads_.size(); i++) {
    CHECK_EQ(0, uv_thread_join(threads_[i].get()));
  }
}

int WorkerThreadsTaskRunner::NumberOfWorkerThreads() const {
  return threads_.size();
}

PerIsolatePlatformData::PerIsolatePlatformData(
    Isolate* isolate, uv_loop_t* loop)
  : isolate_(isolate), loop_(loop) {
  flush_tasks_ = new uv_async_t();
  CHECK_EQ(0, uv_async_init(loop, flush_tasks_, FlushTasks));
  flush_tasks_->data = static_cast<void*>(this);
  uv_unref(reinterpret_cast<uv_handle_t*>(flush_tasks_));
}

void PerIsolatePlatformData::FlushTasks(uv_async_t* handle) {
  auto platform_data = static_cast<PerIsolatePlatformData*>(handle->data);
  platform_data->FlushForegroundTasksInternal();
}

void PerIsolatePlatformData::PostIdleTask(std::unique_ptr<v8::IdleTask> task) {
  UNREACHABLE();
}

void PerIsolatePlatformData::PostTask(std::unique_ptr<Task> task) {
  if (flush_tasks_ == nullptr) {
    // V8 may post tasks during Isolate disposal. In that case, the only
    // sensible path forward is to discard the task.
    return;
  }
  foreground_tasks_.Push(std::move(task));
  uv_async_send(flush_tasks_);
}

void PerIsolatePlatformData::PostDelayedTask(
    std::unique_ptr<Task> task, double delay_in_seconds) {
  if (flush_tasks_ == nullptr) {
    // V8 may post tasks during Isolate disposal. In that case, the only
    // sensible path forward is to discard the task.
    return;
  }
  std::unique_ptr<DelayedTask> delayed(new DelayedTask());
  delayed->task = std::move(task);
  delayed->platform_data = shared_from_this();
  delayed->timeout = delay_in_seconds;
  foreground_delayed_tasks_.Push(std::move(delayed));
  uv_async_send(flush_tasks_);
}

void PerIsolatePlatformData::PostNonNestableTask(std::unique_ptr<Task> task) {
  PostTask(std::move(task));
}

void PerIsolatePlatformData::PostNonNestableDelayedTask(
    std::unique_ptr<Task> task,
    double delay_in_seconds) {
  PostDelayedTask(std::move(task), delay_in_seconds);
}

PerIsolatePlatformData::~PerIsolatePlatformData() {
  Shutdown();
}

void PerIsolatePlatformData::AddShutdownCallback(void (*callback)(void*),
                                                 void* data) {
  shutdown_callbacks_.emplace_back(ShutdownCallback { callback, data });
}

void PerIsolatePlatformData::Shutdown() {
  if (flush_tasks_ == nullptr)
    return;

  // While there should be no V8 tasks in the queues at this point, it is
  // possible that Node.js-internal tasks from e.g. the inspector are still
  // lying around. We clear these queues and ignore the return value,
  // effectively deleting the tasks instead of running them.
  foreground_delayed_tasks_.PopAll();
  foreground_tasks_.PopAll();
  scheduled_delayed_tasks_.clear();

  // Both destroying the scheduled_delayed_tasks_ lists and closing
  // flush_tasks_ handle add tasks to the event loop. We keep a count of all
  // non-closed handles, and when that reaches zero, we inform any shutdown
  // callbacks that the platform is done as far as this Isolate is concerned.
  self_reference_ = shared_from_this();
  uv_close(reinterpret_cast<uv_handle_t*>(flush_tasks_),
           [](uv_handle_t* handle) {
    std::unique_ptr<uv_async_t> flush_tasks {
        reinterpret_cast<uv_async_t*>(handle) };
    PerIsolatePlatformData* platform_data =
        static_cast<PerIsolatePlatformData*>(flush_tasks->data);
    platform_data->DecreaseHandleCount();
    platform_data->self_reference_.reset();
  });
  flush_tasks_ = nullptr;
}

void PerIsolatePlatformData::DecreaseHandleCount() {
  CHECK_GE(uv_handle_count_, 1);
  if (--uv_handle_count_ == 0) {
    for (const auto& callback : shutdown_callbacks_)
      callback.cb(callback.data);
  }
}

NodePlatform::NodePlatform(int thread_pool_size,
                           TracingController* tracing_controller) {
  if (tracing_controller) {
    tracing_controller_ = tracing_controller;
  } else {
    tracing_controller_ = new TracingController();
  }
  worker_thread_task_runner_ =
      std::make_shared<WorkerThreadsTaskRunner>(thread_pool_size);
}

void NodePlatform::RegisterIsolate(Isolate* isolate, uv_loop_t* loop) {
  Mutex::ScopedLock lock(per_isolate_mutex_);
  std::shared_ptr<PerIsolatePlatformData> existing = per_isolate_[isolate];
  CHECK(!existing);
  per_isolate_[isolate] =
      std::make_shared<PerIsolatePlatformData>(isolate, loop);
}

void NodePlatform::UnregisterIsolate(Isolate* isolate) {
  Mutex::ScopedLock lock(per_isolate_mutex_);
  std::shared_ptr<PerIsolatePlatformData> existing = per_isolate_[isolate];
  CHECK(existing);
  existing->Shutdown();
  per_isolate_.erase(isolate);
}

void NodePlatform::AddIsolateFinishedCallback(Isolate* isolate,
                                              void (*cb)(void*), void* data) {
  Mutex::ScopedLock lock(per_isolate_mutex_);
  auto it = per_isolate_.find(isolate);
  if (it == per_isolate_.end()) {
    CHECK(it->second);
    cb(data);
    return;
  }
  it->second->AddShutdownCallback(cb, data);
}

void NodePlatform::Shutdown() {
  worker_thread_task_runner_->Shutdown();

  {
    Mutex::ScopedLock lock(per_isolate_mutex_);
    per_isolate_.clear();
  }
}

int NodePlatform::NumberOfWorkerThreads() {
  return worker_thread_task_runner_->NumberOfWorkerThreads();
}

void PerIsolatePlatformData::RunForegroundTask(std::unique_ptr<Task> task) {
  DebugSealHandleScope scope(isolate_);
  Environment* env = Environment::GetCurrent(isolate_);
  if (env != nullptr) {
    v8::HandleScope scope(isolate_);
    InternalCallbackScope cb_scope(env, Object::New(isolate_), { 0, 0 },
                                   InternalCallbackScope::kNoFlags);
    task->Run();
  } else {
    task->Run();
  }
}

void PerIsolatePlatformData::DeleteFromScheduledTasks(DelayedTask* task) {
  auto it = std::find_if(scheduled_delayed_tasks_.begin(),
                         scheduled_delayed_tasks_.end(),
                         [task](const DelayedTaskPointer& delayed) -> bool {
          return delayed.get() == task;
      });
  CHECK_NE(it, scheduled_delayed_tasks_.end());
  scheduled_delayed_tasks_.erase(it);
}

void PerIsolatePlatformData::RunForegroundTask(uv_timer_t* handle) {
  DelayedTask* delayed = ContainerOf(&DelayedTask::timer, handle);
  delayed->platform_data->RunForegroundTask(std::move(delayed->task));
  delayed->platform_data->DeleteFromScheduledTasks(delayed);
}

void NodePlatform::DrainTasks(Isolate* isolate) {
  std::shared_ptr<PerIsolatePlatformData> per_isolate = ForIsolate(isolate);

  do {
    // Worker tasks aren't associated with an Isolate.
    worker_thread_task_runner_->BlockingDrain();
  } while (per_isolate->FlushForegroundTasksInternal());
}

bool PerIsolatePlatformData::FlushForegroundTasksInternal() {
  bool did_work = false;

  while (std::unique_ptr<DelayedTask> delayed =
      foreground_delayed_tasks_.Pop()) {
    did_work = true;
    uint64_t delay_millis = llround(delayed->timeout * 1000);

    delayed->timer.data = static_cast<void*>(delayed.get());
    uv_timer_init(loop_, &delayed->timer);
    // Timers may not guarantee queue ordering of events with the same delay if
    // the delay is non-zero. This should not be a problem in practice.
    uv_timer_start(&delayed->timer, RunForegroundTask, delay_millis, 0);
    uv_unref(reinterpret_cast<uv_handle_t*>(&delayed->timer));
    uv_handle_count_++;

    scheduled_delayed_tasks_.emplace_back(delayed.release(),
                                          [](DelayedTask* delayed) {
      uv_close(reinterpret_cast<uv_handle_t*>(&delayed->timer),
               [](uv_handle_t* handle) {
        std::unique_ptr<DelayedTask> task {
            static_cast<DelayedTask*>(handle->data) };
        task->platform_data->DecreaseHandleCount();
      });
    });
  }
  // Move all foreground tasks into a separate queue and flush that queue.
  // This way tasks that are posted while flushing the queue will be run on the
  // next call of FlushForegroundTasksInternal.
  std::queue<std::unique_ptr<Task>> tasks = foreground_tasks_.PopAll();
  while (!tasks.empty()) {
    std::unique_ptr<Task> task = std::move(tasks.front());
    tasks.pop();
    did_work = true;
    RunForegroundTask(std::move(task));
  }
  return did_work;
}

void NodePlatform::CallOnWorkerThread(std::unique_ptr<Task> task) {
  worker_thread_task_runner_->PostTask(std::move(task));
}

void NodePlatform::CallDelayedOnWorkerThread(std::unique_ptr<Task> task,
                                             double delay_in_seconds) {
  worker_thread_task_runner_->PostDelayedTask(std::move(task),
                                              delay_in_seconds);
}


std::shared_ptr<PerIsolatePlatformData>
NodePlatform::ForIsolate(Isolate* isolate) {
  Mutex::ScopedLock lock(per_isolate_mutex_);
  std::shared_ptr<PerIsolatePlatformData> data = per_isolate_[isolate];
  CHECK(data);
  return data;
}

bool NodePlatform::FlushForegroundTasks(Isolate* isolate) {
  return ForIsolate(isolate)->FlushForegroundTasksInternal();
}

bool NodePlatform::IdleTasksEnabled(Isolate* isolate) { return false; }

std::shared_ptr<v8::TaskRunner>
NodePlatform::GetForegroundTaskRunner(Isolate* isolate) {
  return ForIsolate(isolate);
}

double NodePlatform::MonotonicallyIncreasingTime() {
  // Convert nanos to seconds.
  return uv_hrtime() / 1e9;
}

double NodePlatform::CurrentClockTimeMillis() {
  return SystemClockTimeMillis();
}

TracingController* NodePlatform::GetTracingController() {
  CHECK_NOT_NULL(tracing_controller_);
  return tracing_controller_;
}

Platform::StackTracePrinter NodePlatform::GetStackTracePrinter() {
  return []() {
    fprintf(stderr, "\n");
    DumpBacktrace(stderr);
    fflush(stderr);
  };
}

template <class T>
TaskQueue<T>::TaskQueue()
    : lock_(), tasks_available_(), tasks_drained_(),
      outstanding_tasks_(0), stopped_(false), task_queue_() { }

template <class T>
void TaskQueue<T>::Push(std::unique_ptr<T> task) {
  Mutex::ScopedLock scoped_lock(lock_);
  outstanding_tasks_++;
  task_queue_.push(std::move(task));
  tasks_available_.Signal(scoped_lock);
}

template <class T>
std::unique_ptr<T> TaskQueue<T>::Pop() {
  Mutex::ScopedLock scoped_lock(lock_);
  if (task_queue_.empty()) {
    return std::unique_ptr<T>(nullptr);
  }
  std::unique_ptr<T> result = std::move(task_queue_.front());
  task_queue_.pop();
  return result;
}

template <class T>
std::unique_ptr<T> TaskQueue<T>::BlockingPop() {
  Mutex::ScopedLock scoped_lock(lock_);
  while (task_queue_.empty() && !stopped_) {
    tasks_available_.Wait(scoped_lock);
  }
  if (stopped_) {
    return std::unique_ptr<T>(nullptr);
  }
  std::unique_ptr<T> result = std::move(task_queue_.front());
  task_queue_.pop();
  return result;
}

template <class T>
void TaskQueue<T>::NotifyOfCompletion() {
  Mutex::ScopedLock scoped_lock(lock_);
  if (--outstanding_tasks_ == 0) {
    tasks_drained_.Broadcast(scoped_lock);
  }
}

template <class T>
void TaskQueue<T>::BlockingDrain() {
  Mutex::ScopedLock scoped_lock(lock_);
  while (outstanding_tasks_ > 0) {
    tasks_drained_.Wait(scoped_lock);
  }
}

template <class T>
void TaskQueue<T>::Stop() {
  Mutex::ScopedLock scoped_lock(lock_);
  stopped_ = true;
  tasks_available_.Broadcast(scoped_lock);
}

template <class T>
std::queue<std::unique_ptr<T>> TaskQueue<T>::PopAll() {
  Mutex::ScopedLock scoped_lock(lock_);
  std::queue<std::unique_ptr<T>> result;
  result.swap(task_queue_);
  return result;
}

void MultiIsolatePlatform::CancelPendingDelayedTasks(Isolate* isolate) {}

}  // namespace node