gooddata/gooddata-ruby

View on GitHub
lib/gooddata/models/process.rb

Summary

Maintainability
C
1 day
Test Coverage
#
# Copyright (c) 2010-2017 GoodData Corporation. All rights reserved.
# This source code is licensed under the BSD-style license found in the
# LICENSE file in the root directory of this source tree.

require 'zip'
require 'uri'

require_relative '../helpers/global_helpers'
require_relative '../rest/resource'

require_relative 'execution_detail'
require_relative 'schedule'

APP_STORE_URL ||= 'https://github.com/gooddata/app_store'

module GoodData
  class Process < Rest::Resource
    attr_reader :data

    alias_method :raw_data, :data
    alias_method :json, :data
    alias_method :to_hash, :data

    class << self
      def [](id, options = { :client => GoodData.connection })
        project = options[:project]
        client = options[:client] || (project && project.client)
        fail 'Client has to be specified in options' unless client

        if id == :all && project
          uri = "/gdc/projects/#{project.pid}/dataload/processes"
          data = client.get(uri)
          data['processes']['items'].map do |process_data|
            client.create(Process, process_data, project: project)
          end
        elsif id == :all
          uri = "/gdc/account/profile/#{client.user.obj_id}/dataload/processes"
          data = client.get(uri)
          pids = data['processes']['items'].map { |process_data| process_data['process']['links']['self'].match(%r{/gdc/projects/(\w*)/})[1] }.uniq
          projects_lookup = pids.pmap { |pid| client.projects(pid) }.reduce({}) do |a, e|
            a[e.pid] = e
            a
          end

          data['processes']['items'].map do |process_data|
            pid = process_data['process']['links']['self'].match(%r{/gdc/projects/(\w*)/})[1]
            client.create(Process, process_data, project: projects_lookup[pid])
          end
        else
          uri = "/gdc/projects/#{project.pid}/dataload/processes/#{id}"
          client.create(Process, client.get(uri), project: project)
        end
      end

      def all
        Process[:all]
      end

      def with_deploy(dir, options = {}, &block)
        _client, project = GoodData.get_client_and_project(options)

        GoodData.with_project(project) do
          params = options[:params].nil? ? [] : [options[:params]]
          if block
            begin
              res = GoodData::Process.deploy(dir, options.merge(:files_to_exclude => params))
              block.call(res)
            rescue StandardError => e
              GoodData.logger.error(e.inspect)
            ensure
              res.delete if res
            end
          else
            GoodData::Process.deploy(dir, options.merge(:files_to_exclude => params))
          end
        end
      end

      def upload_package(path, files_to_exclude, opts = { :client => GoodData.connection })
        GoodData.get_client_and_project(opts)
        zip_and_upload(path, files_to_exclude, opts)
      end

      # Deploy a new process or redeploy existing one.
      #
      # @param path [String] Path to ZIP archive or to a directory containing files that should be ZIPed
      # @option options [String] :files_to_exclude
      # @option options [String] :type ('GRAPH') Type of process - GRAPH or RUBY
      # @option options [String] :name Readable name of the process
      # @option options [String] :process_id ID of a process to be redeployed (do not set if you want to create a new process)
      # @option options [Boolean] :verbose (false) Switch on verbose mode for detailed logging
      def deploy(path, options = { client: GoodData.client, project: GoodData.project })
        if path.is_a?(Hash) && path[:component]
          deploy_component path, options
        elsif path.to_s.start_with?(APP_STORE_URL)
          deploy_brick path, options
        elsif path.to_s =~ %r{\${.*}:(.*)\/(.*):\/}
          deploy_from_appstore path.to_s, options
        else
          deploy_simple_process path, options
        end
      end

      def deploy_simple_process(path, options = { client: GoodData.client, project: GoodData.project })
        client, project = GoodData.get_client_and_project(options)

        fail 'Path is not specified' unless path

        path = Pathname(path) || fail('Path is not a valid pathname')
        files_to_exclude = options[:files_to_exclude].nil? ? [] : options[:files_to_exclude].map { |pname| Pathname(pname) }

        type = options[:type] || 'GRAPH'
        deploy_name = options[:name] || "Process of #{path} script"
        fail ArgumentError, 'options[:name] can not be nil or empty!' if deploy_name.nil? || deploy_name.empty?

        verbose = options[:verbose] || false
        GoodData.logger.info("Deploying #{path}") if verbose

        deployed_path = Process.upload_package(path, files_to_exclude, client: client, project: project)
        data_sources = options[:data_sources] || []
        data = {
          :process => {
            :name => deploy_name,
            :path => "/uploads/#{File.basename(deployed_path)}",
            :type => type,
            :dataSources => data_sources
          }
        }

        save(data, options)
      end

      def deploy_brick(path, options = { :client => GoodData.client, :project => GoodData.project })
        client, project = GoodData.get_client_and_project(options)

        brick_uri_parts = URI(path).path.split('/')
        ref = brick_uri_parts[4]
        brick_name = brick_uri_parts.last
        brick_path = brick_uri_parts[5..-1].join('/')

        Dir.mktmpdir do |dir|
          Dir.chdir(dir) do
            `git clone #{APP_STORE_URL}`
          end

          Dir.chdir(File.join(dir, 'app_store')) do
            if ref
              `git checkout #{ref}`

              fail 'Wrong branch or tag specified!' if $CHILD_STATUS.to_i.nonzero?
            end

            opts = {
              :client => client,
              :project => project,
              :name => brick_name,
              :type => 'RUBY'
            }

            full_brick_path = File.join(dir, 'app_store', brick_path)

            fail "Invalid brick name specified - '#{brick_name}'" unless File.exist?(full_brick_path)

            return deploy(full_brick_path, opts)
          end
        end
      end

      def deploy_from_appstore(path, options = { :client => GoodData.client, :project => GoodData.project })
        deploy_name = options[:name] || "Process of #{path}"

        verbose = options[:verbose] || false
        GoodData.logger.info("Deploying #{path}") if verbose

        data_sources = options[:data_sources] || []
        data = {
          process: {
            name: deploy_name,
            path: path,
            dataSources: data_sources,
            type: 'RUBY'
          }
        }

        save(data, options)
      end

      def deploy_component(data, options = { client: GoodData.client, project: GoodData.project })
        client, project = GoodData.get_client_and_project(options)
        data = { process: data } unless data[:process]
        data[:process] = GoodData::Helpers.symbolize_keys(data[:process]).select { |k| %i[type name component dataSources].include? k }
        data[:process][:component] = GoodData::Helpers.symbolize_keys(data[:process][:component]).select { |k| %i[name version configLocation config].include? k }

        save(data, options)
      end

      private

      def save(data, options = { client: GoodData.client, project: GoodData.project })
        client, project = GoodData.get_client_and_project(options)
        process_id = options[:process_id]
        res =
          if process_id.nil?
            client.post("/gdc/projects/#{project.pid}/dataload/processes", data)
          else
            client.put("/gdc/projects/#{project.pid}/dataload/processes/#{process_id}", data)
          end
        res = JSON.parse(client.poll_on_code(res['asyncTask']['links']['poll'], options.merge(process: false))) if res.keys.first == 'asyncTask'

        client.create(Process, res, project: project)
      end

      def with_zip(opts = {})
        client = opts[:client]
        temp = Tempfile.new(['deploy-graph-archive', '.zip'])
        zip_filename = temp.path

        temp.close!
        Zip::File.open(zip_filename, Zip::File::CREATE) do |zipfile|
          yield zipfile
        end
        client.upload_to_user_webdav(zip_filename, opts)
        zip_filename
      end

      def zip_and_upload(path, files_to_exclude, opts = {})
        client = opts[:client]
        GoodData.logger.info('Creating package for upload')
        if !path.directory? && (path.extname == '.grf' || path.extname == '.rb')
          with_zip(opts) do |zipfile|
            zipfile.add(File.basename(path), path)
          end
        elsif !path.directory?
          # this branch expects a zipped file. Since the filename on webdav is by default
          # equal to the filename of a local file. I happened often that the name clashed
          # if ran in parallel. Create a randomized name to mitigate that
          randomized_filename = (0...16).map { (rand(65..90)).chr }.join
          client.upload_to_user_webdav(path, { filename: randomized_filename }.merge(opts))
          randomized_filename
        else
          with_zip(opts) do |zipfile|
            files_to_upload = Dir[File.join(path, '**', '**')].reject { |f| files_to_exclude.include?(Pathname(path) + f) }
            GoodData.logger.info("Uploading #{files_to_upload.count} files.")
            files_to_upload.each do |file|
              file_pathname = Pathname.new(file)
              file_relative_pathname = file_pathname.relative_path_from(Pathname.new(path))
              zipfile.add(file_relative_pathname, file)
            end
          end
        end
      end
      # -----------------------------
    end

    def initialize(data)
      @data = data
    end

    def delete
      client.delete(uri)
    end

    # Redeploy existing process.
    #
    # @param path [String] Path to ZIP archive or to a directory containing files that should be ZIPed
    # @option options [String] :files_to_exclude
    # @option options [String] :process_id ('nobody') From address
    # @option options [String] :type ('GRAPH') Type of process - GRAPH or RUBY
    # @option options [String] :name Readable name of the process
    # @option options [Boolean] :verbose (false) Switch on verbose mode for detailed logging
    def deploy(path, options = {})
      Process.deploy(path, { client: client, process_id: process_id, :project => project, :name => name, :type => type, :data_sources => data_sources }.merge(options))
    end

    # Downloads the process from S3 in a zipped form.
    #
    # @return [IO] The stream of data that represents a zipped deployed process.
    def download
      link = links['source']
      client.connection.refresh_token
      client.get(link, process: false) { |_, _, result| RestClient.get(result.to_hash['location'].first) }
    end

    def process
      data['process']
    end

    def name
      process['name']
    end

    def type
      process['type'].downcase.to_sym
    end

    def links
      process['links']
    end

    def link
      links['self']
    end

    alias_method :uri, :link

    def obj_id
      uri.split('/').last
    end

    alias_method :process_id, :obj_id

    def executions_link
      links['executions']
    end

    def graphs
      process['graphs']
    end

    def executables
      process['executables']
    end

    def path
      process['path']
    end

    def component
      process['component']
    end

    def data_sources
      process['dataSources']
    end

    # Determines whether the process is an ADDv2 component.
    # @return [Bool] True if the process is an ADDv2 component.
    def add_v2_component?
      process['component'] && process['component']['name'] == 'gdc-data-distribution'
    end

    def schedules
      project.schedules.select { |schedule| schedule.process_id == obj_id }
    end

    def create_manual_schedule(options = {})
      create_schedule(nil, nil, options)
    end

    def create_schedule(cron, executable, options = {})
      project.create_schedule(process_id, cron, executable, options.merge(client: client, project: project))
    end

    def execute(executable, options = {})
      result = start_execution(executable, options)
      begin
        client.poll_on_code(result['executionTask']['links']['poll'], options)
      rescue RestClient::RequestFailed => e
        raise(e)
      ensure
        result = client.get(result['executionTask']['links']['detail'])
        fail "Runing process failed. You can look at a log here #{result['executionDetail']['logFileName']}" if result['executionDetail']['status'] == 'ERROR'
      end
      client.create(GoodData::ExecutionDetail, result, client: client, project: project)
    end

    def start_execution(executable, options = {})
      params = options[:params] || {}
      hidden_params = options[:hidden_params] || {}
      client.post(executions_link,
                  :execution => {
                    :graph => executable.to_s,
                    :params => GoodData::Helpers.encode_public_params(params),
                    :hiddenParams => GoodData::Helpers.encode_hidden_params(hidden_params)
                  })
    end

    def notification_rules
      NotificationRule.all(project: project, process: self, client: client)
    end

    def create_notification_rule(opts = {})
      NotificationRule.create(opts.merge(project: project, process: self, client: client))
    end
  end
end