openaustralia/morph

View on GitHub
app/lib/morph/runner.rb

Summary

Maintainability
D
1 day
Test Coverage
# typed: strict
# frozen_string_literal: true

module Morph
  # High level API for running morph scraper. Handles the setting up of default
  # configuration if things like Gemfiles are not included (for Ruby)
  class Runner
    extend T::Sig

    include RenderSync::Actions
    sig { returns(Run) }
    attr_accessor :run

    sig { params(run: Run).void }
    def initialize(run)
      @run = run
    end

    # TODO: Move this to a configuration somewhere
    sig { returns(Integer) }
    def self.default_max_lines
      10_000
    end

    sig { returns(Integer) }
    def self.total_slots
      SiteSetting.maximum_concurrent_scrapers
    end

    # This includes stopped containers too
    sig { returns(Integer) }
    def self.used_slots
      Morph::DockerUtils.find_all_containers_with_label(run_label_key).count
    end

    sig { returns(Integer) }
    def self.available_slots
      total_slots - used_slots
    end

    # The main section of the scraper running that is run in the background
    sig { params(block: T.nilable(T.proc.params(timestamp: T.nilable(Time), stream: Symbol, text: String).void)).void }
    def synch_and_go_with_logging!(&block)
      synch_and_go! do |timestamp, s, c|
        log(timestamp, s, c, &block)
      end
    end

    sig { params(max_lines: Integer, block: T.nilable(T.proc.params(timestamp: T.nilable(Time), stream: Symbol, text: String).void)).void }
    def go_with_logging(max_lines = Runner.default_max_lines, &block)
      go(max_lines) do |timestamp, s, c|
        log(timestamp, s, c, &block)
      end
    end

    sig { params(timestamp: T.nilable(Time), stream: Symbol, text: String, block: T.nilable(T.proc.params(timestamp: T.nilable(Time), stream: Symbol, text: String).void)).void }
    def log(timestamp, stream, text, &block)
      Rails.logger.info "#{stream}: #{text}" if Rails.env.development?
      # Not using create on association to try to avoid memory bloat
      # Truncate text so that it fits in the database
      # Note that mysql TEXT is limited to 65535 bytes so we have to be
      # particularly careful with unicode.
      line = LogLine.create!(run: run, timestamp: timestamp, stream: stream.to_s, text: text.mb_chars.limit(65535).to_s)
      sync_new line, scope: run unless Rails.env.test?
      block.call timestamp, stream, text if block_given?
    end

    sig { params(text: String, status_code: Integer, block: T.nilable(T.proc.params(timestamp: T.nilable(Time), stream: Symbol, text: String).void)).void }
    def error(text:, status_code:, &block)
      block.call(nil, :internalerr, text) if block_given?
      run.update(status_code: status_code, finished_at: Time.zone.now)
      sync_update run.scraper if run.scraper
    end

    sig { params(block: T.nilable(T.proc.params(timestamp: T.nilable(Time), stream: Symbol, text: String).void)).void }
    def synch_and_go!(&block)
      scraper = run.scraper
      # If this run belongs to a scraper that has just been deleted
      # or if the run has already been marked as finished then
      # don't do anything
      return if scraper.nil? || run.finished?

      # TODO: Indicate that scraper is running before we do the synching
      error = SynchroniseRepoService.call(scraper)
      case error
      when nil
        nil
      when Morph::GithubAppInstallation::NoAppInstallationForOwner
        error(
          status_code: 999,
          text: "Please install the Morph Github App on #{T.must(scraper.owner).nickname} so that Morph can access this repository on GitHub. Please go to #{T.must(scraper.owner).app_install_url}\n\nWhy? See #{Rails.application.routes.url_helpers.github_app_documentation_index_url}",
          &block
        )
        return
      when Morph::GithubAppInstallation::AppInstallationNoAccessToRepo
        error(
          status_code: 999,
          text: "The Morph Github App installed on #{T.must(scraper.owner).nickname} needs access to the repository #{scraper.name}. Please go to #{T.must(scraper.owner).app_install_url}\n\nWhy? See #{Rails.application.routes.url_helpers.github_app_documentation_index_url}",
          &block
        )
        return
      when Morph::GithubAppInstallation::SynchroniseRepoError, Morph::GithubAppInstallation::NoAccessToRepo
        error(text: "There was a problem getting the latest scraper code from GitHub", status_code: 999, &block)
        return
      when SynchroniseRepoService::RepoNeedsToBePublic
        error(
          status_code: 999,
          text: "The repository #{scraper.full_name} needs to be made public",
          &block
        )
        return
      when SynchroniseRepoService::RepoNeedsToBePrivate
        error(
          status_code: 999,
          text: "The repository #{scraper.full_name} needs to be made private",
          &block
        )
        return
      else
        T.absurd(error)
      end

      go(Runner.default_max_lines, &block)
    end

    sig { params(max_lines: Integer, block: T.nilable(T.proc.params(timestamp: T.nilable(Time), stream: Symbol, text: String).void)).void }
    def go(max_lines = Runner.default_max_lines, &block)
      # If container already exists we just attach to it
      c = container_for_run
      if c.nil?
        c = compile_and_start_run(max_lines, &block)
        since = nil
      else
        # The timestamp of the last log line we've already captured
        since = run.log_lines.maximum(:timestamp)
        # We add a microsecond to compensate for rounding error as
        # part of the time being stored in the database. The true time
        # gets truncated to the lower microsecond. So, likely the true
        # time happens *after* the recorded time. So, we add a microsecond
        # to compensate for this and ensure that the "since" time occurs
        # slightly after the true time.
        since += 1e-6 if since
      end
      attach_to_run_and_finish(c, since, &block)
    end

    # TODO: Could we get sensible timestamps out at this stage too?
    # Right now we're just returning nil for the timestamps here
    sig { params(max_lines: Integer, block: T.nilable(T.proc.params(timestamp: T.nilable(Time), stream: Symbol, text: String).void)).returns(T.nilable(Docker::Container)) }
    def compile_and_start_run(max_lines = Runner.default_max_lines, &block)
      # puts "Starting...\n"
      run.database.backup
      # If the run is not part of a scraper (e.g. through the api) then there won't be a git repository
      git_revision = Rugged::Repository.new(run.repo_path).head.target_id unless run.scraper.nil?
      run.update(started_at: Time.zone.now, git_revision: git_revision)
      sync_update run.scraper if run.scraper
      FileUtils.mkdir_p run.data_path
      FileUtils.chmod 0o777, run.data_path

      unless run.language&.supported?
        supported_scraper_files =
          Morph::Language.languages_supported.map(&:scraper_filename)
        supported_scraper_files_as_text = supported_scraper_files.to_sentence(last_word_connector: ", or ")
        error(text: "Can't find scraper code. Expected to find a file called #{supported_scraper_files_as_text} in the root directory", status_code: 999, &block)
        return
      end

      platform = run.scraper&.platform
      unless platform.nil? || Morph::DockerRunner::PLATFORMS.include?(platform)
        error(text: "Platform set to an invalid value. Valid values are #{Morph::DockerRunner::PLATFORMS.join(', ')}.", status_code: 999, &block)
        return
      end

      c = Dir.mktmpdir("morph") do |defaults|
        Morph::Runner.add_config_defaults_to_directory(run.repo_path, defaults)
        Morph::Runner.remove_hidden_directories(defaults)
        Morph::Runner.add_sqlite_db_to_directory(run.data_path, defaults)

        scraper = run.scraper
        memory_mb = scraper&.memory_mb
        Morph::DockerRunner.compile_and_start_run(
          repo_path: defaults,
          env_variables: run.env_variables,
          container_labels: docker_container_labels,
          max_lines: max_lines,
          platform: platform,
          # We're disabling the proxy for all scrapers
          disable_proxy: true,
          memory: (memory_mb * 1024 * 1024 if memory_mb), &block
        )
      end

      if c
        # Record ip address of running container
        ip_address = Morph::DockerUtils.ip_address_of_container(c)

        # Getting the image that this container was built from
        # Doing it in this way so that it is backwards compatible with
        # a short version of the id without "sha256:" at the beginning
        docker_image = c.json["Image"].split(":")[1][0..11]

        run.update(ip_address: ip_address, docker_image: docker_image)
      end
      c
    end

    sig { params(container: T.nilable(Docker::Container), since: T.nilable(Time), block: T.nilable(T.proc.params(timestamp: Time, stream: Symbol, text: String).void)).void }
    def attach_to_run_and_finish(container, since, &block)
      if container.nil?
        # TODO: Return the status for a compile error
        result = Morph::RunResult.new(255, {}, {})
      else
        Morph::DockerRunner.attach_to_run(container, since, &block)
        result = Morph::DockerRunner.finish(container, ["data.sqlite"])
      end

      status_code = result.status_code

      db_tempfile = result.files["data.sqlite"]
      # Only copy back database if it's there and has something in it
      if db_tempfile
        Morph::Runner.copy_sqlite_db_back(run.data_path, T.must(db_tempfile.path))
        db_tempfile.close!
      # Only show the error below if the scraper thinks it finished without problems
      elsif status_code.zero?
        m = <<~ERROR
          Scraper didn't create an SQLite database in your current working directory called
          data.sqlite. If you've just created your first scraper and not edited the code yet
          this is to be expected.

          To fix this make your scraper write to an SQLite database at data.sqlite.
        ERROR
        block.call Time.zone.now, :internalerr, m if block_given?
        status_code = 998
      end

      # Now collect and save the metrics
      T.must(run.metric).update(result.time_params) if result.time_params

      # Because SqliteDiff will actually create sqlite databases if they
      # don't exist we don't actually want that if there isn't actually
      # a database because it causes some very confusing behaviour
      if File.exist?(run.database.sqlite_db_path)
        # Update information about what changed in the database
        diffstat = Morph::SqliteDiff.diffstat_safe(
          run.database.sqlite_db_backup_path, run.database.sqlite_db_path
        )
        if diffstat
          tables = diffstat.tables.counts
          records = diffstat.records
          run.update(
            tables_added: tables.added,
            tables_removed: tables.removed,
            tables_changed: tables.changed,
            tables_unchanged: tables.unchanged,
            records_added: records.added,
            records_removed: records.removed,
            records_changed: records.changed,
            records_unchanged: records.unchanged
          )
        end
      end

      run.update(status_code: status_code, finished_at: Time.zone.now)

      return unless run.scraper

      run.finished!
      sync_update run.scraper
    end

    # Note that cleanup is automatically done by the process on the
    # background queue attached to the container. When the scraper process is
    # killed here, the attach block finishes and the container cleanup is done
    # as if the scraper had stopped on its own
    # TODO: Make this stop the compile stage
    sig { void }
    def stop!
      container = container_for_run
      if container
        container.kill
      else
        # If there is no container then there can't be a watch process to
        # do update the run so we must do it here
        run.update(status_code: 255, finished_at: Time.zone.now)
        # TODO: Do a sync_update?
      end
    end

    sig { params(data_path: String, dir: String).void }
    def self.add_sqlite_db_to_directory(data_path, dir)
      return unless File.exist?(File.join(data_path, "data.sqlite"))

      # Copy across the current sqlite database as well
      # TODO: Ensure that there isn't anything else writing to the db
      # while we make a copy of it. There's the backup API. Use that?
      FileUtils.cp(File.join(data_path, "data.sqlite"), dir)
    end

    sig { params(data_path: String, sqlite_file_path: String).void }
    def self.copy_sqlite_db_back(data_path, sqlite_file_path)
      # First write to a temporary file with the new sqlite data
      # Copying across just in case temp directory and data_path directory
      # not on the same filesystem (which would stop atomic rename from working)
      FileUtils.cp(sqlite_file_path, File.join(data_path, "data.sqlite.new"))
      # Then, rename the file to the "live" file overwriting the old data
      # This should happen atomically
      File.rename(File.join(data_path, "data.sqlite.new"),
                  File.join(data_path, "data.sqlite"))
    end

    sig { params(source: String, dest: String).void }
    def self.add_config_defaults_to_directory(source, dest)
      Morph::DockerUtils.copy_directory_contents(source, dest)
      # We don't need to check that the language is recognised because
      # the compiler is never called if the language isn't valid
      language = T.must(Morph::Language.language(dest))

      language.default_files_to_insert.each do |files|
        next unless files.all? { |file| !File.exist?(File.join(dest, file)) }

        files.each do |file|
          FileUtils.cp(language.default_config_file_path(file),
                       File.join(dest, file))
        end
      end

      # Special behaviour for Procfile. We don't allow the user to override this
      File.open(File.join(dest, "Procfile"), "w") { |f| f << language.procfile }
    end

    # Remove directories starting with "."
    # TODO: Make it just remove the .git directory in the root and not other
    # hidden directories which people might find useful
    sig { params(directory: String).void }
    def self.remove_hidden_directories(directory)
      Find.find(directory) do |path|
        FileUtils.rm_rf(path) if FileTest.directory?(path) && File.basename(path)[0] == "."
      end
    end

    sig { returns(String) }
    def self.run_label_key
      "io.morph.run"
    end

    sig { returns(String) }
    def run_label_value
      run.id.to_s
    end

    # How to label the container for the actually running scraper
    sig { returns(T::Hash[String, String]) }
    def docker_container_labels
      # Everything needs to be a string
      labels = { Morph::Runner.run_label_key => run_label_value }
      scraper = run.scraper
      labels["io.morph.scraper"] = scraper.full_name if scraper
      labels
    end

    sig { returns(T.nilable(Docker::Container)) }
    def container_for_run
      Morph::DockerUtils.find_container_with_label(
        Morph::Runner.run_label_key, run_label_value
      )
    end

    sig { params(container: Docker::Container).returns(T.nilable(Integer)) }
    def self.run_id_for_container(container)
      value = Morph::DockerUtils.label_value(container, run_label_key)
      value&.to_i
    end

    # Given a run return the associated run object
    # If run has been deleted for this container then also return nil
    sig { params(container: Docker::Container).returns(T.nilable(Run)) }
    def self.run_for_container(container)
      run_id = run_id_for_container(container)
      Run.find_by(id: run_id) if run_id
    end
  end
end