zold-io/zold

View on GitHub
lib/zold/remotes.rb

Summary

Maintainability
C
7 hrs
Test Coverage
# frozen_string_literal: true

# Copyright (c) 2018-2023 Zerocracy
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the 'Software'), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED 'AS IS', WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.

require 'concurrent'
require 'csv'
require 'uri'
require 'net/http'
require 'time'
require 'futex'
require 'fileutils'
require 'backtrace'
require 'zold/score'
require_relative 'age'
require_relative 'http'
require_relative 'hands'
require_relative 'thread_pool'
require_relative 'node/farm'
require_relative 'commands/fetch'

# The list of remotes.
# Author:: Yegor Bugayenko (yegor256@gmail.com)
# Copyright:: Copyright (c) 2018 Yegor Bugayenko
# License:: MIT
module Zold
  # One remote.
  class RemoteNode
    # When something is wrong with the assertion
    class CantAssert < StandardError; end

    attr_reader :touched, :idx

    def initialize(host:, port:, score:, idx:, master:, network: 'test', log: Log::NULL)
      @host = host
      @port = port
      @score = score
      @idx = idx
      @master = master
      @network = network
      @log = log
      @touched = false
    end

    def http(path = '/')
      @touched = true
      Http.new(uri: "http://#{@host}:#{@port}#{path}", score: @score, network: @network)
    end

    def master?
      @master
    end

    def to_s
      "#{@host}:#{@port}/#{@idx}"
    end

    def to_mnemo
      "#{@host}:#{@port}"
    end

    def assert_code(code, response)
      msg = response.status_line.strip
      return if response.status.to_i == code
      if response.headers && response.headers['X-Zold-Error']
        raise CantAssert, "Error ##{response.status} \"#{response.headers['X-Zold-Error']}\" \
at #{response.headers['X-Zold-Path']}"
      end
      raise CantAssert, "Unexpected HTTP code #{response.status}, instead of #{code}" if msg.empty?
      raise CantAssert, "#{msg} (HTTP code #{response.status}, instead of #{code})"
    end

    def assert_valid_score(score)
      raise CantAssert, "Invalid score #{score.reduced(4)}" unless score.valid?
      raise CantAssert, "Expired score (#{Age.new(score.time)}) #{score.reduced(4)}" if score.expired?
    end

    def assert_score_ownership(score)
      raise CantAssert, "Masqueraded host #{@host} as #{score.host}: #{score.reduced(4)}" if @host != score.host
      raise CantAssert, "Masqueraded port #{@port} as #{score.port}: #{score.reduced(4)}" if @port != score.port
    end

    def assert_score_strength(score)
      return if score.strength >= Score::STRENGTH
      raise CantAssert, "Score #{score.strength} is too weak (<#{Score::STRENGTH}): #{score.reduced(4)}
(use --ignore-score-strength to suppress this check)"
    end

    def assert_score_value(score, min)
      return if score.value >= min
      raise CantAssert, "Score #{score.value} is too small (<#{min}): #{score.reduced(4)}
(use --ignore-score-size to suppress this check)"
    end
  end

  # All remotes
  class Remotes
    # The default TCP port all nodes are supposed to use.
    PORT = 4096

    # At what amount of errors we delete the remote automatically
    TOLERANCE = 8

    # Default number of nodes to fetch.
    MAX_NODES = 16

    # Default nodes and their ports
    MASTERS = CSV.read(File.expand_path(File.join(File.dirname(__FILE__), '../../resources/masters'))).map do |r|
      {
        host: r[0].strip,
        port: r[1].to_i
      }
    end.freeze
    private_constant :MASTERS

    # Empty, for standalone mode
    class Empty
      def initialize
        # Nothing to init here
      end

      def all
        []
      end

      def master?(_, _)
        true
      end

      def iterate(_)
        # Nothing to do here
      end

      def mtime
        Time.now
      end
    end

    def initialize(file:, network: 'test', timeout: 60)
      @file = file
      @network = network
      @timeout = timeout
    end

    def all
      list = Futex.new(@file).open(false) { load }
      max_score = list.empty? ? 0 : list.max_by { |r| r[:score] }[:score]
      max_score = 1 if max_score.zero?
      max_errors = list.empty? ? 0 : list.max_by { |r| r[:errors] }[:errors]
      max_errors = 1 if max_errors.zero?
      list.sort_by do |r|
        ((1 - (r[:errors] / max_errors)) * 5) + (r[:score] / max_score)
      end.reverse
    end

    def clean
      modify { [] }
    end

    def masters
      MASTERS.each do |r|
        next if block_given? && !yield(r[:host], r[:port])
        add(r[:host], r[:port])
      end
    end

    def exists?(host, port = PORT)
      assert_host_info(host, port)
      list = Futex.new(@file).open(false) { load }
      !list.find { |r| r[:host] == host.downcase && r[:port] == port }.nil?
    end

    def add(host, port = PORT)
      assert_host_info(host, port)
      modify do |list|
        list + [{ host: host.downcase, port: port, score: 0, errors: 0 }]
      end
      unerror(host, port)
    end

    def remove(host, port = PORT)
      assert_host_info(host, port)
      modify do |list|
        list.reject { |r| r[:host] == host.downcase && r[:port] == port }
      end
    end

    # Go through the list of remotes and call a provided block for each
    # of them. See how it's used, for example, in fetch.rb.
    def iterate(log, farm: Farm::Empty.new, threads: 1)
      raise 'Log can\'t be nil' if log.nil?
      raise 'Farm can\'t be nil' if farm.nil?
      Hands.exec(threads, all) do |r, idx|
        Thread.current.name = "remotes-#{idx}@#{r[:host]}:#{r[:port]}"
        start = Time.now
        best = farm.best[0]
        node = RemoteNode.new(
          host: r[:host],
          port: r[:port],
          score: best.nil? ? Score::ZERO : best,
          idx: idx,
          master: master?(r[:host], r[:port]),
          log: log,
          network: @network
        )
        begin
          yield node
          raise 'Took too long to execute' if (Time.now - start).round > @timeout
          unerror(r[:host], r[:port]) if node.touched
        rescue StandardError => e
          error(r[:host], r[:port])
          if e.is_a?(RemoteNode::CantAssert) || e.is_a?(Fetch::Error)
            log.debug("#{Rainbow(node).red}: \"#{e.message.strip}\" in #{Age.new(start)}")
          else
            log.info("#{Rainbow(node).red}: \"#{e.message.strip}\" in #{Age.new(start)}")
            log.debug(Backtrace.new(e).to_s)
          end
          remove(r[:host], r[:port]) if r[:errors] > TOLERANCE
        end
      end
    end

    def error(host, port = PORT)
      assert_host_info(host, port)
      if_present(host, port) { |r| r[:errors] += 1 }
    end

    def unerror(host, port = PORT)
      assert_host_info(host, port)
      if_present(host, port) do |remote|
        remote[:errors] -= 1 if (remote[:errors]).positive?
      end
    end

    def rescore(host, port, score)
      assert_host_info(host, port)
      raise 'Score can\'t be nil' if score.nil?
      raise 'Score has to be of type Integer' unless score.is_a?(Integer)
      if_present(host, port) { |r| r[:score] = score }
      unerror(host, port)
    end

    def mtime
      File.exist?(@file) ? File.mtime(@file) : Time.now
    end

    def master?(host, port)
      !MASTERS.find { |r| r[:host] == host && r[:port] == port }.nil?
    end

    private

    def modify
      FileUtils.mkdir_p(File.dirname(@file))
      Futex.new(@file).open do
        list = yield(load)
        File.write(
          @file,
          list.uniq { |r| "#{r[:host]}:#{r[:port]}" }.map do |r|
            [
              r[:host],
              r[:port],
              r[:score],
              r[:errors]
            ].join(',')
          end.join("\n")
        )
      end
    end

    def if_present(host, port)
      modify do |list|
        remote = list.find { |r| r[:host] == host.downcase && r[:port] == port }
        return unless remote
        yield remote
        list
      end
    end

    def load
      if File.exist?(@file)
        raw = CSV.read(@file).map do |row|
          {
            host: row[0],
            port: row[1].to_i,
            score: row[2].to_i,
            errors: row[3].to_i,
            master: master?(row[0], row[1].to_i)
          }
        end
        raw.reject { |r| !r[:host] || r[:port].zero? || r[:host] == '0.0.0.0' }.map do |r|
          r[:home] = URI("http://#{r[:host]}:#{r[:port]}/")
          r
        end
      else
        []
      end
    end

    def assert_host_info(host, port)
      raise 'Host can\'t be nil' if host.nil?
      raise 'Host can\'t be empty' if host.empty?
      raise 'Host IP is wrong, can\'t be all zeros' if host == '0.0.0.0'
      raise 'Port can\'t be nil' if port.nil?
      raise 'Port has to be of type Integer' unless port.is_a?(Integer)
      raise 'Port can\'t be zero' if port.zero?
      raise 'Port can\'t be negative' if port.negative?
      raise 'Port can\'t be over 65536' if port > 0xffff
    end
  end
end