bin/query-cloudwatch
#!/usr/bin/env ruby
Dir.chdir(__dir__) { require 'bundler/setup' }
require 'active_support'
require 'active_support/core_ext/integer/time'
require 'active_support/core_ext/object/blank'
require 'active_support/time'
require 'aws-sdk-cloudwatchlogs'
require 'concurrent-ruby'
require 'csv'
require 'json'
require 'sqlite3'
require 'optparse'
require 'optparse/time'
$LOAD_PATH.unshift(File.expand_path(File.join(__dir__, '../lib')))
require 'reporting/cloudwatch_client'
require 'reporting/cloudwatch_query_time_slice'
class QueryCloudwatch
Config = Struct.new(
:log,
:group,
:slice,
:env,
:app,
:from,
:to,
:time_slices,
:query,
:format,
:complete,
:progress,
:num_threads,
:wait_duration,
:count_distinct,
:sqlite_database_file,
keyword_init: true,
)
class JsonOutput
attr_reader :stdout
def initialize(stdout:)
@stdout = stdout
end
def start
yield
end
def handle_row(row)
stdout.puts row.to_json
end
end
class CsvOutput
attr_reader :stdout
def initialize(stdout:)
@stdout = stdout
end
def start
yield
end
def handle_row(row)
stdout.puts row.values.to_csv
end
end
class SqliteOutput
attr_reader :filename, :invalid_message_json_count, :stderr
def initialize(filename:, stderr:)
@filename = filename
@stderr = stderr
@invalid_message_json_count = 0
end
def start
create_tables_if_needed
db.transaction do
yield
end
rescue
db&.interrupt
raise
ensure
if db
if invalid_message_json_count > 0
stderr.puts <<~END
WARNING: For #{invalid_message_json_count} event#{invalid_message_json_count == 1 ? '' : 's'}, @message did not contain valid JSON
END
end
stderr.puts <<~END
Wrote #{db.total_changes.inspect} rows to the 'events' table in #{filename}
END
end
close_database
end
def handle_row(row)
message = row['@message']
raise "Query must include @message in output when using --sqlite" unless message
timestamp = row['@timestamp']
raise "Query must include @timestamp in output when using --sqlite" unless timestamp
json = begin
JSON.parse(message)
rescue
@invalid_message_json_count += 1
nil
end
success = json&.dig('properties', 'event_properties', 'success')
success = success ? 1 : (success.nil? ? nil : 0)
# NOTE: Order matters here
row = {
id: json&.dig('id') || generate_id(timestamp:,message:),
timestamp:,
name:json&.dig('name'),
user_id: json&.dig('properties', 'user_id'),
success:,
message:,
log_stream: row.dig('@logStream'),
log: row.dig('@log'),
}
insert_statement.execute(row.values)
end
def insert_statement
@insert_statement ||= db.prepare(
<<~SQL
INSERT INTO
events (id, timestamp, name, user_id, success, message, log_stream, log)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT (id) DO NOTHING
SQL
)
end
def db
@db ||= SQLite3::Database.new(filename)
end
def close_database
db&.close
@db = nil
end
def create_tables_if_needed
db.execute_batch(
<<~SQL
CREATE TABLE IF NOT EXISTS events (
id TEXT PRIMARY KEY NOT NULL,
timestamp TEXT NOT NULL,
name TEXT NULL,
user_id TEXT NULL,
success INTEGER NULL,
message TEXT NOT NULL,
log_stream TEXT NULL,
log TEXT NULL
);
SQL
)
end
def generate_id(timestamp:,message:)
hash = Digest::SHA256.hexdigest("#{timestamp}|#{message}")
"NOID-#{hash}"
end
end
attr_reader :config
def initialize(config)
@config = config
end
def run(stdout: STDOUT, stderr: STDERR)
if config.count_distinct
values = {}
fetch { |row| values[row[config.count_distinct]] = true }
stdout.puts values.size
return
end
output = create_output(stdout:, stderr:)
output.start do
fetch { |row| output.handle_row(row) }
end
rescue Interrupt
# catch interrupts (ctrl-c) and directly exit, this skips printing the backtrace
exit 1
end
def fetch(&block)
cloudwatch_client.fetch(
query: config.query,
from: config.time_slices.blank? ? config.from : nil,
to: config.time_slices.blank? ? config.to : nil,
time_slices: config.time_slices.presence,
&block
)
end
def cloudwatch_client
@cloudwatch_client ||= Reporting::CloudwatchClient.new(
ensure_complete_logs: config.complete,
log_group_name: config.group,
progress: config.progress,
slice_interval: config.slice,
num_threads: config.num_threads,
**config.to_h.slice(:wait_duration).compact,
)
end
def create_output(stdout:, stderr:)
case config.format
when :csv
CsvOutput.new(stdout:)
when :json
JsonOutput.new(stdout:)
when :sqlite
SqliteOutput.new(filename: config.sqlite_database_file, stderr:)
else
raise "unknown format #{config.format}"
end
end
# @return [Config]
def self.parse!(argv:, stdin:, stdout:, now: Time.now)
config = Config.new(
log: nil,
group: nil,
slice: parse_duration('1w'),
format: :csv,
to: now,
time_slices: [],
complete: false,
progress: true,
num_threads: Reporting::CloudwatchClient::DEFAULT_NUM_THREADS,
count_distinct: nil,
)
# rubocop:disable Metrics/BlockLength
OptionParser.new do |opts|
opts.banner = <<~TXT
Usage
=======================
#{$PROGRAM_NAME} [OPTIONS]
Script to query cloudwatch, splits a query up across multiple timeframes and combines
results (useful for querying a log period of time)
Examples
=======================
Query last 7 days, shorthand log, group query in STDIN
#{$PROGRAM_NAME} --from 7d --env int --app idp --log events.log <<QUERY
fields @timestamp
| limit 9999
QUERY
Query full log group, full timestamps, query as an arugment
#{$PROGRAM_NAME} \\
--group int_/srv/idp/shared/log/production.log \\
--start "2020-01-01T00:00:00-00:00" \\
--to "2020-12-31T23:59:59-00:00" \\
--query "fields @timestamp | limit 9999"
Query disjoint time slices via --date
#{$PROGRAM_NAME} \\
--date 2023-01-01,2023-02-01
--env int --app idp --log events.log
--query "fields @timestamp | limit 9999"
Timestamps
=======================
* Can be anything Ruby's Time.parse understands:
"2021-01-01T00:00:00-07:00"
"2021-01-01 00:00:00 -0700"
* Can be represented as duration shorthands:
10min - 10 minutes ago
9h - 9 hours ago
8d - 8 days ago
7w - 7 weeks ago
6mon - 6 months ago
5y - 5 years ago
Options
=======================
TXT
opts.on('--env ENV', '(optional)') do |env|
config.env = env
end
opts.on('--app APP', '(optional)') do |app|
config.app = app
end
opts.on('--log LOG', 'shorthand log group (ex "events.log"), needs --app and --env') do |log|
config.log = log
end
opts.on(
'--group GROUP',
'shorthand log group (ex "int_/srv/idp/shared/log/production.log")',
) do |group|
config.group = group
end
opts.on(
'--from STR', '--start STR',
'query start, see Timestamps above for examples'
) do |str|
if (duration = parse_duration(str))
config.from = duration.ago(now)
else
config.from = Time.parse(str) # rubocop:disable Rails/TimeZone
end
end
opts.on(
'--to STR', '--end STR',
'(optional, defaults to now) query end, see Timestamps above for examples'
) do |str|
if (duration = parse_duration(str))
config.to = duration.ago(now)
else
config.to = Time.parse(str) # rubocop:disable Rails/TimeZone
end
end
opts.on(
'--num-threads NUM',
"number of threads, defaults to #{Reporting::CloudwatchClient::DEFAULT_NUM_THREADS}",
) do |str|
config.num_threads = Integer(str, 10)
end
opts.on(
'--date DATE,DATE',
'(optional) dates to query, can be disjoint, to provide time slices to query'
) do |dates|
dates.split(',').each do |date_str|
config.time_slices << Date.parse(date_str).in_time_zone('UTC').all_day
end
end
opts.on('--query QUERY', 'Cloudwatch Insights query') do |query|
config.query = query
end
opts.on('--slice SLICE', '(optional) query slice size duration, defaults to 1w') do |slice|
config.slice = parse_duration(slice)
end
opts.on('--json', 'format output as newline-delimited JSON (nd-json)') do
config.format = :json
end
opts.on('--csv', '(default) format output as CSV') do
config.format = :csv
end
opts.on('--sqlite [FILENAME]', 'load output into the given Sqlite database (events.db by default). Output MUST include @message and @timestamp') do |filename|
config.format = :sqlite
config.sqlite_database_file = filename || 'events.db'
end
opts.on(
'--[no-]complete',
'whether or not to split query slices if exactly 10k rows are returned, defaults to off',
) do |complete|
config.complete = complete
end
opts.on('--[no-]progress', 'whether or not show a progress bar, defaults to on') do |progress|
config.progress = progress
end
opts.on('--count-distinct FIELD', 'get distinct count for given field') do |count_distinct|
config.count_distinct = count_distinct
end
opts.on('-h', '--help', 'Prints this help') do
stdout.puts opts
exit
return # rubocop:disable Lint/NonLocalExitFromIterator, Lint/UnreachableCode
end
end.parse!(argv)
# rubocop:enable Metrics/BlockLength
if (app = config.app) && (env = config.env) && !(log = config.log).blank?
config.group = build_group(app: app, env: env, log: log)
end
errors = []
if config.group.blank?
errors << 'ERROR: missing log group --group or (--app and --env and --log)'
end
if !config.from && config.time_slices.empty?
errors << 'ERROR: missing time range --from or --start, or --date'
end
if !config.query
if stdin.tty?
errors << 'ERROR: missing query, either STDIN or --query)'
else
config.query = stdin.read
end
end
if config.format == :sqlite
errors << "ERROR: can't do --count-distinct with --sqlite" if config.count_distinct
end
if config.count_distinct
config.complete = true
config.query = [
config.query,
"| stats count(*) by #{config.count_distinct}",
"| limit #{Reporting::CloudwatchClient::MAX_RESULTS_LIMIT}",
].join("\n")
end
if !errors.empty?
stdout.puts errors
exit 1
return # rubocop:disable Lint/UnreachableCode
end
config
end
# @param [String] log ex production.log
# @param [String] app ex idp
# @param [String] env ex int
def self.build_group(log:, app:, env:)
"#{env}_/srv/#{app}/shared/log/#{log}"
end
# @param [String] value a string such as 1min, 2h, 3d, 4w, 5mon, 6y
# @return [ActiveSupport::Duration]
def self.parse_duration(value)
Reporting::CloudwatchQueryTimeSlice.parse_duration(value)
end
end
if __FILE__ == $PROGRAM_NAME
config = QueryCloudwatch.parse!(argv: ARGV, stdin: STDIN, stdout: STDOUT)
QueryCloudwatch.new(config).run
end