rubinius/rubinius

View on GitHub
machine/thread_state.hpp

Summary

Maintainability
Test Coverage
#ifndef RBX_STATE_HPP
#define RBX_STATE_HPP

#include "missing/time.h"

#include "diagnostics.hpp"
#include "globals.hpp"
#include "type_info.hpp"
#include "unwind_info.hpp"

#include "memory/root.hpp"
#include "memory/root_buffer.hpp"
#include "memory/thca.hpp"
#include "memory/variable_buffer.hpp"

#include "sodium/randombytes.h"

#include <atomic>
#include <chrono>
#include <condition_variable>
#include <functional>
#include <mutex>
#include <regex>
#include <setjmp.h>
#include <stdint.h>
#include <string.h>
#include <string>
#include <thread>
#include <vector>

namespace rubinius {
  struct CallFrame;

  class Fiber;
  class NativeMethodEnvironment;
  class VariableScope;
  class C_API;
  class Channel;
  class Class;
  class Console;
  class Environment;
  class Exception;
  class Object;
  class Machine;
  class MachineState;
  class Memory;
  class Park;
  class Profiler;
  class Signals;
  class String;
  class Symbol;
  class Threads;
  class UnwindState;

  namespace memory {
    class Collector;
  }

  enum MethodMissingReason {
    eNone,
    ePrivate,
    eProtected,
    eSuper,
    eVCall,
    eNormal
  };

  enum ConstantMissingReason {
    vFound,
    vPrivate,
    vNonExistent
  };

  class ThreadState {
  public:
    const static uint64_t cLockLimit = 5000000000;

    enum Kind {
      eThread,
      eFiber,
      eSystem
    };

    enum Phase {
      eManaged    = 0x01,
      eUnmanaged  = 0x81,
      eWaiting    = 0x82,
    };

    const static int cYieldingPhase = 0x80;

    enum ThreadStatus {
      eNoStatus = 0,
      eRun,
      eSleep,
      eDead,
      eException
    };

    // TODO: Thread
    enum FiberTransition {
      eSuspending,
      eSuspended,
      eResuming,
      eRunning,
      eCanceled,
      eFinished
    };

  private:
    jmp_buf thread_unwind_;
    bool thread_unwinding_;
    memory::Roots roots_;
    std::string name_;
    memory::VariableRootBuffers variable_root_buffers_;
    memory::RootBuffers root_buffers_;
    Kind kind_;
    diagnostics::MachineMetrics* metrics_;

  protected:
    pthread_t os_thread_;
    uint32_t id_;

    friend class ThreadState;

  private:
    static const int cWaitLimit = 100;

    Machine* _machine_;

    UnwindInfoSet unwinds_;

    CallFrame* call_frame_;
    memory::THCA* thca_;

    int8_t* stack_start_;
    int8_t* stack_barrier_start_;
    int8_t* stack_barrier_end_;

    size_t stack_size_;
    size_t stack_cushion_;
    ssize_t stack_probe_;

    bool interrupt_with_signal_;
    bool interrupt_by_kill_;

    std::atomic<bool> should_wakeup_;
    std::atomic<ThreadStatus> thread_status_;

    std::mutex lock_;
    std::mutex thread_lock_;
    std::mutex sleep_lock_;
    std::condition_variable sleep_cond_;
    std::mutex join_lock_;
    std::condition_variable join_cond_;
    std::mutex fiber_mutex_;

    std::mutex fiber_wait_mutex_;
    std::condition_variable fiber_wait_condition_;

    std::atomic<FiberTransition> fiber_transition_flag_;

    MethodMissingReason method_missing_reason_;
    ConstantMissingReason constant_missing_reason_;

    bool main_thread_;

    std::atomic<Phase> thread_phase_;

    uint64_t sample_interval_;
    uint64_t sample_counter_;

    bool checkpoint_;

    diagnostics::metric checkpoints_;
    diagnostics::metric stops_;

  public:
    /* Data members */
    Channel* waiting_channel_;
    Exception* interrupted_exception_;
    /// The Thread object for this VM state
    Thread* thread_;
    Fiber* fiber_;

    /// Object that waits for inflation
    Object* waiting_object_;

    uint64_t start_time_;

    NativeMethodEnvironment* native_method_environment;

    void (*custom_wakeup_)(void*);
    void* custom_wakeup_data_;

    UnwindState* unwind_state_;

  public:
    ThreadState(uint32_t id, Machine* m, const char* name = nullptr);
    ~ThreadState();

    MachineState* const machine_state();

    Machine* const machine() {
      return _machine_;
    }

    jmp_buf& get_thread_unwind() {
      return thread_unwind_;
    }

    bool thread_unwinding_p() {
      return thread_unwinding_;
    }

    Configuration* const configuration();
    Environment* const environment();
    Threads* const threads();
    Diagnostics* const diagnostics();
    memory::Collector* const collector();
    Signals* const signals();
    Memory* const memory();
    C_API* const c_api();
    Profiler* const profiler();
    Console* const console();

    // -*-*-*
    static ThreadState* current();

    memory::Roots& roots() {
      return roots_;
    }

    memory::VariableRootBuffers& variable_root_buffers() {
      return variable_root_buffers_;
    }

    memory::RootBuffers& root_buffers() {
      return root_buffers_;
    }

    const char* kind_name() const {
      switch(kind_) {
        case eThread:
          return "Thread";
        case eFiber:
          return "Fiber";
        case eSystem:
          return "MachineThread";
      }

      /* GCC cannot determine that the above switch covers the enum and hence
       * every exit from this function is covered.
       */
      return "unknown kind";
    }

    Kind kind() const {
      return kind_;
    }

    void set_kind(Kind kind) {
      kind_ = kind;
    }

    std::string name() const {
      return name_;
    }

    void set_name(STATE, const char* name);

    pthread_t& os_thread() {
      return os_thread_;
    }

    diagnostics::MachineMetrics* metrics() {
      return metrics_;
    }

    UnwindInfoSet& unwinds() {
      return unwinds_;
    }

    uint32_t thread_id() const {
      return id_;
    }

    const char* phase_name();

    Phase thread_phase() {
      return thread_phase_.load(std::memory_order_acquire);
    }

    void set_thread_phase(Phase thread_phase) {
      thread_phase_.store(thread_phase, std::memory_order_release);
    }

    bool wakeup_p() {
      return should_wakeup_;
    }

    void unset_wakeup() {
      should_wakeup_ = false;
    }

    void set_wakeup() {
      should_wakeup_ = true;
    }

    std::mutex& thread_lock() {
      return thread_lock_;
    }

    std::mutex& sleep_lock() {
      return sleep_lock_;
    }

    std::condition_variable& sleep_cond() {
      return sleep_cond_;
    }

    std::mutex& join_lock() {
      return join_lock_;
    }

    std::condition_variable& join_cond() {
      return join_cond_;
    }

    std::mutex& fiber_mutex() {
      return fiber_mutex_;
    }

    std::mutex& fiber_wait_mutex() {
      return fiber_wait_mutex_;
    }

    std::condition_variable& fiber_wait_condition() {
      return fiber_wait_condition_;
    }

    FiberTransition fiber_transition_flag() {
      return fiber_transition_flag_;
    }

    bool suspending_p() const {
      return fiber_transition_flag_ == eSuspending;
    }

    bool suspended_p() const {
      return fiber_transition_flag_ == eSuspended;
    }

    bool resuming_p() const {
      return fiber_transition_flag_ == eResuming;
    }

    bool running_p() const {
      return fiber_transition_flag_ == eRunning;
    }

    bool canceled_p() const {
      return fiber_transition_flag_ == eCanceled;
    }

    bool finished_p() const {
      return fiber_transition_flag_ == eFinished;
    }

    void set_suspending() {
      fiber_transition_flag_ = eSuspending;
    }

    void set_suspended() {
      fiber_transition_flag_ = eSuspended;
    }

    void set_resuming() {
      fiber_transition_flag_ = eResuming;
    }

    void set_running() {
      fiber_transition_flag_ = eRunning;
    }

    void set_canceled() {
      fiber_transition_flag_ = eCanceled;
    }

    void set_finished() {
      fiber_transition_flag_ = eFinished;
    }

    void set_thread(Thread* thread);
    void set_fiber(Fiber* fiber);

    Thread* thread() {
      return thread_;
    }

    Fiber* fiber() {
      return fiber_;
    }

    ThreadStatus thread_status() {
      return thread_status_;
    }

    void set_thread_run() {
      thread_status_ = eRun;
    }

    void set_thread_sleep() {
      thread_status_ = eSleep;
    }

    void set_thread_dead() {
      thread_status_ = eDead;
    }

    void set_thread_exception() {
      thread_status_ = eException;
    }

    bool sleeping_p() {
      return thread_status_ == eSleep;
    }

    bool zombie_p() {
      return thread_status_ == eDead || thread_status_ == eException;
    }

    void set_main_thread() {
      main_thread_ = true;
    }

    bool main_thread_p() {
      return main_thread_;
    }

    Object* allocate_object(STATE, intptr_t bytes, object_type type) {
      return thca_->allocate(state, bytes, type);
    }


    bool limited_wait_for(std::function<bool ()> f) {
      bool status = false;

      // TODO: randomize wait interval
      for(int i = 0; i < cWaitLimit && !(status = f()); i++) {
        std::this_thread::sleep_for(std::chrono::microseconds(10));
      }

      return status;
    }

    void set_start_time();
    double run_time();

    void raise_stack_error(STATE);

    void validate_stack_size(STATE, size_t size);

    size_t stack_size() {
      return stack_size_;
    }

    void set_stack_bounds(size_t size) {
      int8_t stack_address;

      stack_size_ = size - stack_cushion_;
      stack_start_ = &stack_address;

      // Determine the direction of the stack
      set_stack_barrier();
    }

    void set_stack_barrier() {
      int8_t barrier;

      if(stack_start_ - &barrier < 0) {
        // barrier = reinterpret_cast<int8_t*>(stack_start_ + stack_size_ - 2 * stack_cushion_);
        stack_probe_ = stack_cushion_ / 2;
        stack_barrier_start_ = stack_start_ + stack_size_ - 2 * stack_cushion_;
        stack_barrier_end_ = stack_barrier_start_ + stack_cushion_;
      } else {
        // barrier = reinterpret_cast<void*>(ss - stack_size_ + stack_cushion_);
        stack_probe_ = -(stack_cushion_ / 2);
        stack_barrier_end_ = stack_start_ - stack_size_ + stack_cushion_;
        stack_barrier_start_ = stack_barrier_end_ - stack_cushion_;
      }
    }

    bool stack_limit_p(void* address) {
      int8_t* probe = reinterpret_cast<int8_t*>(address) + stack_probe_;

      return probe > stack_barrier_start_ && probe <= stack_barrier_end_;
    }

    bool check_stack(STATE, void* address) {
      if(stack_limit_p(address)) {
        raise_stack_error(state);
        return false;
      }

      return true;
    }

    void set_previous_frame(CallFrame* frame);

    bool push_call_frame(STATE, CallFrame* frame) {
      if(!check_stack(state, frame)) return false;

      set_previous_frame(frame);
      call_frame_ = frame;

      return true;
    }

    bool pop_call_frame(STATE, CallFrame* frame) {
      call_frame_ = frame;

      return !thread_interrupted_p();
    }

    bool thread_interrupted_p() {
      return check_thread_raise_or_kill(this);
    }

    bool check_thread_raise_or_kill(STATE);

    // Do NOT de-duplicate
    void set_call_frame(CallFrame* frame) {
      call_frame_ = frame;
    }

    CallFrame* call_frame() {
      return call_frame_;
    }

    CallFrame* get_call_frame(ssize_t up=0);
    CallFrame* get_ruby_frame(ssize_t up=0);
    CallFrame* get_variables_frame(ssize_t up=0);
    CallFrame* get_scope_frame(ssize_t up=0);
    CallFrame* get_noncore_frame(STATE);
    CallFrame* get_filtered_frame(STATE, const std::regex& filter);

    bool scope_valid_p(VariableScope* scope);

    Globals& globals();

    MethodMissingReason method_missing_reason() const {
      return method_missing_reason_;
    }

    void set_method_missing_reason(MethodMissingReason reason) {
      method_missing_reason_ = reason;
    }

    ConstantMissingReason constant_missing_reason() const {
      return constant_missing_reason_;
    }

    void set_constant_missing_reason(ConstantMissingReason reason) {
      constant_missing_reason_ = reason;
    }

    void after_fork_child();

    bool interrupt_by_kill() const {
      return interrupt_by_kill_;
    }

    void clear_interrupt_by_kill() {
      interrupt_by_kill_ = false;
    }

    void set_interrupt_by_kill() {
      interrupt_by_kill_ = true;
    }

    Exception* interrupted_exception() const {
      return interrupted_exception_;
    }

    void clear_interrupted_exception();

    memory::VariableRootBuffers& current_root_buffers();

  public:
    void discard();

    void bootstrap_class(STATE);
    void bootstrap_ontology(STATE);
    void bootstrap_symbol(STATE);

    void sample(STATE);

#define RBX_PROFILE_MAX_SHIFT     0xf
#define RBX_PROFILE_MAX_INTERVAL  0x1fff

    void set_sample_interval() {
      sample_interval_ = randombytes_random();
      sample_interval_ >>= (sample_interval_ & RBX_PROFILE_MAX_SHIFT);
      sample_interval_ &= RBX_PROFILE_MAX_INTERVAL;
      sample_counter_ = 0;
    }

    void set_checkpoint() {
      checkpoint_ = true;
    }

    void checkpoint(STATE) {
      if(!checkpoint_) return;

      ++checkpoints_;

      if(check_stop()) {
        ++stops_;
        checkpoint_ = false;
      }

      // TODO: profiler
      if(sample_counter_++ >= sample_interval_) {
        sample(state);
        set_sample_interval();
      }
    }

#define SET_THREAD_UNWIND(ts) /*ep.file = __FILE__; ep.line = __LINE__; */ _setjmp(ts->get_thread_unwind())

    void halt_thread();

    void managed_phase();
    void unmanaged_phase();

    void set_current_thread();

    void setup_errno(STATE, int num, const char* name, Class* sce, Module* ern);
    void bootstrap_exceptions(STATE);
    void initialize_fundamental_constants(STATE);
    void initialize_builtin_classes(STATE);
    void initialize_platform_data(STATE);
    Object* ruby_lib_version();

    static void init_ffi(STATE);

    void raise_from_errno(const char* reason);
    void raise_exception(Exception* exc);
    Exception* new_exception(Class* cls, const char* msg);
    Object* current_block();

    Object* path2class(STATE, const char* name);

    void wait_on_channel(Channel* channel);
    void wait_on_custom_function(STATE, void (*func)(void*), void* data);
    void clear_waiter();

    void sleep(Object* duration);
    bool wakeup();

    void interrupt_with_signal() {
      interrupt_with_signal_ = true;
    }

    void register_raise(STATE, Exception* exc);
    void register_kill(STATE);

    void visit_objects(STATE, std::function<void (STATE, Object**)> f);
    void trace_objects(STATE, std::function<void (STATE, Object**)> f);

    // -*-*-*

    Symbol* const symbol(const char* str);
    Symbol* const symbol(const char* str, size_t len);
    Symbol* const symbol(std::string str);
    Symbol* const symbol(String* str);

    const uint32_t hash_seed();

    UnwindState* unwind_state();

    void raise_stack_error();

    // TODO ThreadNexus
    bool stop_p();
    bool set_stop();
    void unset_stop();
    bool halt_p();
    void set_halt();

    bool try_managed_phase();
    void waiting_phase();

    void set_managed();

    bool valid_thread_p(unsigned int thread_id);

#ifdef RBX_GC_STACK_CHECK
    void check_stack(STATE);
#endif

    bool yielding_p();

    void yield();

    uint64_t wait();
    void wait_for_all();

    bool lock_owned_p();

    bool try_lock();
    bool try_lock_wait();

    bool check_stop() {
      if(!can_stop_p()) return false;

      while(stop_p()) {
        yield();

        return true;
      }

      return false;
    }

    bool can_stop_p();

    void stop() {
      while(set_stop()) {
        if(try_lock_wait()) {
          wait_for_all();

          return;
        }
      }
    }

    void halt() {
      set_halt();
      stop();
      unset_stop();
      unlock();
    }

    void lock(std::function<void ()> process) {
      lock();
      process();
      unlock();
    }

    void lock() {
      try_lock_wait();
    }

    bool try_lock(std::function<void ()> process) {
      if(try_lock()) {
        process();
        unlock();
        return true;
      } else {
        return false;
      }
    }

    void unlock();

    void detect_deadlock(uint64_t nanoseconds);
    void detect_deadlock(uint64_t nanoseconds, ThreadState* thread_state);

    // TODO ThreadNexus end
  };
}

#endif