cantino/huginn

View on GitHub
app/models/agents/jq_agent.rb

Summary

Maintainability
A
0 mins
Test Coverage
require 'open3'

module Agents
  class JqAgent < Agent
    cannot_be_scheduled!
    can_dry_run!

    def self.should_run?
      !!jq_version
    end

    def self.jq_command
      ENV['USE_JQ'].presence
    end

    def self.jq_version
      if command = jq_command
        Open3.capture2(command, '--version', 2 => IO::NULL).first[/\Ajq-\K\S+/]
      end
    end

    def self.jq_info
      if version = jq_version
        "jq version #{version} is installed"
      else
        "**This agent is not enabled on this server**"
      end
    end

    gem_dependency_check { jq_version }

    description <<~MD
      The Jq Agent allows you to process incoming Events with [jq](https://stedolan.github.io/jq/) the JSON processor. (#{jq_info})

      It allows you to filter, transform and restructure Events in the way you want using jq's powerful features.

      You can specify a jq filter expression to apply to each incoming event in `filter`, and results it produces will become Events to be emitted.

      You can optionally pass in variables to the filter program by specifying key-value pairs of a variable name and an associated value in the `variables` key, each of which becomes a predefined variable.

      This Agent can be used to parse a complex JSON structure that is too hard to handle with JSONPath or Liquid templating.

      For example, suppose that a Post Agent created an Event which contains a `body` key with a value of the JSON formatted string of the following response body:

          {
            "status": "1",
            "since": "1245626956",
            "list": {
              "93817": {
                "item_id": "93817",
                "url": "http://url.com",
                "title": "Page Title",
                "time_updated": "1245626956",
                "time_added": "1245626956",
                "tags": "comma,seperated,list",
                "state": "0"
              },
              "935812": {
                "item_id": "935812",
                "url": "http://google.com",
                "title": "Google",
                "time_updated": "1245635279",
                "time_added": "1245635279",
                "tags": "comma,seperated,list",
                "state": "1"
              }
            }
          }

      Then you could have a Jq Agent with the following jq filter:

          .body | fromjson | .list | to_entries | map(.value) | map(try(.tags |= split(",")) // .) | sort_by(.time_added | tonumber)

      To get the following two Events emitted out of the said incoming Event from Post Agent:

          [
            {
              "item_id": "93817",
              "url": "http://url.com",
              "title": "Page Title",
              "time_updated": "1245626956",
              "time_added": "1245626956",
              "tags": ["comma", "seperated", "list"],
              "state": "0"
            },
            {
              "item_id": "935812",
              "url": "http://google.com",
              "title": "Google",
              "time_updated": "1245626956",
              "time_added": "1245626956",
              "tags": ["comma", "seperated", "list"],
              "state": "1"
            }
          ]
    MD

    def validate_options
      errors.add(:base, "filter needs to be present.") if !options['filter'].is_a?(String)
      errors.add(:base,
                 "variables must be a hash if present.") if options.key?('variables') && !options['variables'].is_a?(Hash)
    end

    def default_options
      {
        'filter' => '.',
        'variables' => {}
      }
    end

    def working?
      self.class.should_run? && !recent_error_logs?
    end

    def receive(incoming_events)
      if !self.class.should_run?
        log("Unable to run because this agent is not enabled.  Edit the USE_JQ environment variable.")
        return
      end

      incoming_events.each do |event|
        interpolate_with(event) do
          process_event(event)
        end
      end
    end

    private

    def get_variables
      variables = interpolated['variables']
      return {} if !variables.is_a?(Hash)

      variables.map { |name, value|
        [name.to_s, value.to_json]
      }.to_h
    end

    def process_event(event)
      Tempfile.create do |file|
        filter = interpolated['filter'].to_s

        # There seems to be no way to force jq to treat an arbitrary
        # string as a filter without being confused with a command
        # line option, so pass one via file.
        file.print filter
        file.close

        variables = get_variables

        command_args = [
          self.class.jq_command,
          '--compact-output',
          '--sort-keys',
          '--from-file', file.path,
          *variables.flat_map { |name, json|
            ['--argjson', name, json]
          }
        ]

        log [
          "Running jq with filter: #{filter}",
          *variables.map { |name, json| "variable: #{name} = #{json}" }
        ].join("\n")

        Open3.popen3(*command_args) do |stdin, stdout, stderr, wait_thread|
          stderr_reader = Thread.new { stderr.read }
          stdout_reader = Thread.new { stdout.each_line.flat_map { |line| JSON.parse(line) } }

          results, errout, status =
            begin
              JSON.dump(event.payload, stdin)
              stdin.close

              [
                stdout_reader.value,
                stderr_reader.value,
                wait_thread.value
              ]
            rescue Errno::EPIPE
            end

          if !status.success?
            error "Error output from jq:\n#{errout}"
            return
          end

          results.keep_if do |result|
            if result.is_a?(Hash)
              true
            else
              error "Ignoring a non-object result: #{result.to_json}"
              false
            end
          end

          log "Creating #{results.size} events"

          results.each do |payload|
            create_event(payload:)
          end
        end
      end
    end
  end
end