cantino/huginn

View on GitHub
app/models/agents/csv_agent.rb

Summary

Maintainability
A
3 hrs
Test Coverage
module Agents
  class CsvAgent < Agent
    include FormConfigurable
    include FileHandling

    cannot_be_scheduled!
    consumes_file_pointer!

    def default_options
      {
        'mode' => 'parse',
        'separator' => ',',
        'use_fields' => '',
        'output' => 'event_per_row',
        'with_header' => 'true',
        'data_path' => '$.data',
        'data_key' => 'data'
      }
    end

    description do
      <<~MD
        The `CsvAgent` parses or serializes CSV data. When parsing, events can either be emitted for the entire CSV, or one per row.

        Set `mode` to `parse` to parse CSV from incoming event, when set to `serialize` the agent serilizes the data of events to CSV.

        ### Universal options

        Specify the `separator` which is used to seperate the fields from each other (default is `,`).

        `data_key` sets the key which contains the serialized CSV or parsed CSV data in emitted events.

        ### Parsing

        If `use_fields` is set to a comma seperated string and the CSV file contains field headers the agent will only extract the specified fields.

        `output` determines wheather one event per row is emitted or one event that includes all the rows.

        Set `with_header` to `true` if first line of the CSV file are field names.

        #{receiving_file_handling_agent_description}

        When receiving the CSV data in a regular event use [JSONPath](http://goessner.net/articles/JsonPath/) to select the path in `data_path`. `data_path` is only used when the received event does not contain a 'file pointer'.

        ### Serializing

        If `use_fields` is set to a comma seperated string and the first received event has a object at the specified `data_path` the generated CSV will only include the given fields.

        Set `with_header` to `true` to include a field header in the CSV.

        Use [JSONPath](http://goessner.net/articles/JsonPath/) in `data_path` to select with part of the received events should be serialized.
      MD
    end

    event_description do
      data =
        if interpolated['mode'] == 'parse'
          rows =
            if boolify(interpolated['with_header'])
              [
                { 'column' => 'row1 value1', 'column2' => 'row1 value2' },
                { 'column' => 'row2 value3', 'column2' => 'row2 value4' },
              ]
            else
              [
                ['row1 value1', 'row1 value2'],
                ['row2 value1', 'row2 value2'],
              ]
            end
          if interpolated['output'] == 'event_per_row'
            rows[0]
          else
            rows
          end
        else
          <<~EOS
            "generated","csv","data"
            "column1","column2","column3"
          EOS
        end

      "Events will looks like this:\n\n    " +
        Utils.pretty_print({
          interpolated['data_key'] => data
        })
    end

    form_configurable :mode, type: :array, values: %w[parse serialize]
    form_configurable :separator, type: :string
    form_configurable :data_key, type: :string
    form_configurable :with_header, type: :boolean
    form_configurable :use_fields, type: :string
    form_configurable :output, type: :array, values: %w[event_per_row event_per_file]
    form_configurable :data_path, type: :string

    def validate_options
      if options['with_header'].blank? || ![true, false].include?(boolify(options['with_header']))
        errors.add(:base, "The 'with_header' options is required and must be set to 'true' or 'false'")
      end
      if options['mode'] == 'serialize' && options['data_path'].blank?
        errors.add(:base, "When mode is set to serialize data_path has to be present.")
      end
    end

    def working?
      received_event_without_error?
    end

    def receive(incoming_events)
      case options['mode']
      when 'parse'
        parse(incoming_events)
      when 'serialize'
        serialize(incoming_events)
      end
    end

    private

    def serialize(incoming_events)
      mo = interpolated(incoming_events.first)
      rows = rows_from_events(incoming_events, mo)
      csv = CSV.generate(col_sep: separator(mo), force_quotes: true) do |csv|
        if boolify(mo['with_header']) && rows.first.is_a?(Hash)
          csv << if mo['use_fields'].present?
                   extract_options(mo)
                 else
                   rows.first.keys
                 end
        end
        rows.each do |data|
          csv << if data.is_a?(Hash)
                   if mo['use_fields'].present?
                     data.extract!(*extract_options(mo)).values
                   else
                     data.values
                   end
                 else
                   data
                 end
        end
      end
      create_event payload: { mo['data_key'] => csv }
    end

    def rows_from_events(incoming_events, mo)
      [].tap do |rows|
        incoming_events.each do |event|
          data = Utils.value_at(event.payload, mo['data_path'])
          if data.is_a?(Array) && (data[0].is_a?(Array) || data[0].is_a?(Hash))
            data.each { |row| rows << row }
          else
            rows << data
          end
        end
      end
    end

    def parse(incoming_events)
      incoming_events.each do |event|
        mo = interpolated(event)
        next unless io = local_get_io(event)

        if mo['output'] == 'event_per_row'
          parse_csv(io, mo) do |payload|
            create_event payload: { mo['data_key'] => payload }
          end
        else
          create_event payload: { mo['data_key'] => parse_csv(io, mo, []) }
        end
      end
    end

    def local_get_io(event)
      if io = get_io(event)
        io
      else
        Utils.value_at(event.payload, interpolated['data_path'])
      end
    end

    def parse_csv_options(mo)
      options = {
        col_sep: separator(mo),
        headers: boolify(mo['with_header']),
      }
      options[:liberal_parsing] = true if CSV::DEFAULT_OPTIONS.key?(:liberal_parsing)
      options
    end

    def parse_csv(io, mo, array = nil)
      CSV.new(io, **parse_csv_options(mo)).each do |row|
        if block_given?
          yield get_payload(row, mo)
        else
          array << get_payload(row, mo)
        end
      end
      array
    end

    def separator(mo)
      mo['separator'] == '\\t' ? "\t" : mo['separator']
    end

    def get_payload(row, mo)
      if boolify(mo['with_header'])
        if mo['use_fields'].present?
          row.to_hash.extract!(*extract_options(mo))
        else
          row.to_hash
        end
      else
        row
      end
    end

    def extract_options(mo)
      mo['use_fields'].split(',').map(&:strip)
    end
  end
end