app/models/agents/jq_agent.rb
require 'open3'
module Agents
class JqAgent < Agent
cannot_be_scheduled!
can_dry_run!
def self.should_run?
!!jq_version
end
def self.jq_command
ENV['USE_JQ'].presence
end
def self.jq_version
if command = jq_command
Open3.capture2(command, '--version', 2 => IO::NULL).first[/\Ajq-\K\S+/]
end
end
def self.jq_info
if version = jq_version
"jq version #{version} is installed"
else
"**This agent is not enabled on this server**"
end
end
gem_dependency_check { jq_version }
description <<~MD
The Jq Agent allows you to process incoming Events with [jq](https://stedolan.github.io/jq/) the JSON processor. (#{jq_info})
It allows you to filter, transform and restructure Events in the way you want using jq's powerful features.
You can specify a jq filter expression to apply to each incoming event in `filter`, and results it produces will become Events to be emitted.
You can optionally pass in variables to the filter program by specifying key-value pairs of a variable name and an associated value in the `variables` key, each of which becomes a predefined variable.
This Agent can be used to parse a complex JSON structure that is too hard to handle with JSONPath or Liquid templating.
For example, suppose that a Post Agent created an Event which contains a `body` key with a value of the JSON formatted string of the following response body:
{
"status": "1",
"since": "1245626956",
"list": {
"93817": {
"item_id": "93817",
"url": "http://url.com",
"title": "Page Title",
"time_updated": "1245626956",
"time_added": "1245626956",
"tags": "comma,seperated,list",
"state": "0"
},
"935812": {
"item_id": "935812",
"url": "http://google.com",
"title": "Google",
"time_updated": "1245635279",
"time_added": "1245635279",
"tags": "comma,seperated,list",
"state": "1"
}
}
}
Then you could have a Jq Agent with the following jq filter:
.body | fromjson | .list | to_entries | map(.value) | map(try(.tags |= split(",")) // .) | sort_by(.time_added | tonumber)
To get the following two Events emitted out of the said incoming Event from Post Agent:
[
{
"item_id": "93817",
"url": "http://url.com",
"title": "Page Title",
"time_updated": "1245626956",
"time_added": "1245626956",
"tags": ["comma", "seperated", "list"],
"state": "0"
},
{
"item_id": "935812",
"url": "http://google.com",
"title": "Google",
"time_updated": "1245626956",
"time_added": "1245626956",
"tags": ["comma", "seperated", "list"],
"state": "1"
}
]
MD
def validate_options
errors.add(:base, "filter needs to be present.") if !options['filter'].is_a?(String)
errors.add(:base,
"variables must be a hash if present.") if options.key?('variables') && !options['variables'].is_a?(Hash)
end
def default_options
{
'filter' => '.',
'variables' => {}
}
end
def working?
self.class.should_run? && !recent_error_logs?
end
def receive(incoming_events)
if !self.class.should_run?
log("Unable to run because this agent is not enabled. Edit the USE_JQ environment variable.")
return
end
incoming_events.each do |event|
interpolate_with(event) do
process_event(event)
end
end
end
private
def get_variables
variables = interpolated['variables']
return {} if !variables.is_a?(Hash)
variables.map { |name, value|
[name.to_s, value.to_json]
}.to_h
end
def process_event(event)
Tempfile.create do |file|
filter = interpolated['filter'].to_s
# There seems to be no way to force jq to treat an arbitrary
# string as a filter without being confused with a command
# line option, so pass one via file.
file.print filter
file.close
variables = get_variables
command_args = [
self.class.jq_command,
'--compact-output',
'--sort-keys',
'--from-file', file.path,
*variables.flat_map { |name, json|
['--argjson', name, json]
}
]
log [
"Running jq with filter: #{filter}",
*variables.map { |name, json| "variable: #{name} = #{json}" }
].join("\n")
Open3.popen3(*command_args) do |stdin, stdout, stderr, wait_thread|
stderr_reader = Thread.new { stderr.read }
stdout_reader = Thread.new { stdout.each_line.flat_map { |line| JSON.parse(line) } }
results, errout, status =
begin
JSON.dump(event.payload, stdin)
stdin.close
[
stdout_reader.value,
stderr_reader.value,
wait_thread.value
]
rescue Errno::EPIPE
end
if !status.success?
error "Error output from jq:\n#{errout}"
return
end
results.keep_if do |result|
if result.is_a?(Hash)
true
else
error "Ignoring a non-object result: #{result.to_json}"
false
end
end
log "Creating #{results.size} events"
results.each do |payload|
create_event(payload:)
end
end
end
end
end
end