18F/identity-idp

View on GitHub
bin/query-cloudwatch

Summary

Maintainability
Test Coverage
#!/usr/bin/env ruby
Dir.chdir(__dir__) { require 'bundler/setup' }

require 'active_support'
require 'active_support/core_ext/integer/time'
require 'active_support/core_ext/object/blank'
require 'active_support/time'
require 'aws-sdk-cloudwatchlogs'
require 'concurrent-ruby'
require 'csv'
require 'json'
require 'sqlite3'
require 'optparse'
require 'optparse/time'

$LOAD_PATH.unshift(File.expand_path(File.join(__dir__, '../lib')))
require 'reporting/cloudwatch_client'
require 'reporting/cloudwatch_query_time_slice'

class QueryCloudwatch
  Config = Struct.new(
    :log,
    :group,
    :slice,
    :env,
    :app,
    :from,
    :to,
    :time_slices,
    :query,
    :format,
    :complete,
    :progress,
    :num_threads,
    :wait_duration,
    :count_distinct,
    :sqlite_database_file,
    keyword_init: true,
  )

  class JsonOutput
    attr_reader :stdout
    
    def initialize(stdout:)
      @stdout = stdout
    end

    def start
      yield
    end
    
    def handle_row(row)
      stdout.puts row.to_json
    end
  end

  class CsvOutput
    attr_reader :stdout
    
    def initialize(stdout:)
      @stdout = stdout
    end

    def start
      yield
    end
    
    def handle_row(row)
      stdout.puts row.values.to_csv
    end
  end

  class SqliteOutput
    attr_reader :filename, :invalid_message_json_count, :stderr

    def initialize(filename:, stderr:)
      @filename = filename
      @stderr = stderr
      @invalid_message_json_count = 0
    end

    def start
      create_tables_if_needed
      db.transaction do
        yield
      end
    rescue
      db&.interrupt
      raise
    ensure
      if db
        if invalid_message_json_count > 0
          stderr.puts <<~END
            WARNING: For #{invalid_message_json_count} event#{invalid_message_json_count == 1 ? '' : 's'}, @message did not contain valid JSON
          END
        end
        stderr.puts <<~END
          Wrote #{db.total_changes.inspect} rows to the 'events' table in #{filename}
        END
      end
      close_database
    end

    def handle_row(row)
      message = row['@message']
      raise "Query must include @message in output when using --sqlite" unless message

      timestamp = row['@timestamp']
      raise "Query must include @timestamp in output when using --sqlite" unless timestamp

      json = begin 
        JSON.parse(message)
      rescue
        @invalid_message_json_count += 1
        nil
      end

      success = json&.dig('properties', 'event_properties', 'success')
      success = success ? 1 : (success.nil? ? nil : 0)

      # NOTE: Order matters here
      row = {
        id: json&.dig('id') || generate_id(timestamp:,message:),
        timestamp:,
        name:json&.dig('name'),
        user_id: json&.dig('properties', 'user_id'),
        success:,
        message:,
        log_stream: row.dig('@logStream'),
        log: row.dig('@log'),
      }

      insert_statement.execute(row.values)
    end

    def insert_statement
      @insert_statement ||= db.prepare(
        <<~SQL
          INSERT INTO 
            events (id, timestamp, name, user_id, success, message, log_stream, log)
            VALUES (?, ?, ?, ?, ?, ?, ?, ?)
            ON CONFLICT (id) DO NOTHING
        SQL
      )
    end

    def db
      @db ||= SQLite3::Database.new(filename)
    end

    def close_database
      db&.close
      @db = nil
    end

    def create_tables_if_needed
      db.execute_batch(
        <<~SQL
          CREATE TABLE IF NOT EXISTS events (
            id TEXT PRIMARY KEY NOT NULL,
            timestamp TEXT NOT NULL,
            name TEXT NULL,
            user_id TEXT NULL,
            success INTEGER NULL,
            message TEXT NOT NULL,
            log_stream TEXT NULL,
            log TEXT NULL
          );
        SQL
      )
    end

    def generate_id(timestamp:,message:)
      hash = Digest::SHA256.hexdigest("#{timestamp}|#{message}")
      "NOID-#{hash}"
    end
  end

  attr_reader :config

  def initialize(config)
    @config = config
  end

  def run(stdout: STDOUT, stderr: STDERR)
    if config.count_distinct
      values = {}
      fetch { |row| values[row[config.count_distinct]] = true }
      stdout.puts values.size
      return
    end

    output = create_output(stdout:, stderr:)

    output.start do
      fetch { |row| output.handle_row(row) }
    end

  rescue Interrupt
    # catch interrupts (ctrl-c) and directly exit, this skips printing the backtrace
    exit 1
  end


  def fetch(&block)
    cloudwatch_client.fetch(
      query: config.query,
      from: config.time_slices.blank? ? config.from : nil,
      to: config.time_slices.blank? ? config.to : nil,
      time_slices: config.time_slices.presence,
      &block
    )
  end

  def cloudwatch_client
    @cloudwatch_client ||= Reporting::CloudwatchClient.new(
      ensure_complete_logs: config.complete,
      log_group_name: config.group,
      progress: config.progress,
      slice_interval: config.slice,
      num_threads: config.num_threads,
      **config.to_h.slice(:wait_duration).compact,
    )
  end

  def create_output(stdout:, stderr:)
    case config.format
    when :csv
      CsvOutput.new(stdout:)
    when :json
      JsonOutput.new(stdout:)
    when :sqlite
      SqliteOutput.new(filename: config.sqlite_database_file, stderr:)
    else
      raise "unknown format #{config.format}"
    end
  end

  # @return [Config]
  def self.parse!(argv:, stdin:, stdout:, now: Time.now)
    config = Config.new(
      log: nil,
      group: nil,
      slice: parse_duration('1w'),
      format: :csv,
      to: now,
      time_slices: [],
      complete: false,
      progress: true,
      num_threads: Reporting::CloudwatchClient::DEFAULT_NUM_THREADS,
      count_distinct: nil,
    )

    # rubocop:disable Metrics/BlockLength
    OptionParser.new do |opts|
      opts.banner = <<~TXT
        Usage
        =======================

          #{$PROGRAM_NAME} [OPTIONS]

        Script to query cloudwatch, splits a query up across multiple timeframes and combines
        results (useful for querying a log period of time)

        Examples
        =======================

        Query last 7 days, shorthand log, group query in STDIN

        #{$PROGRAM_NAME} --from 7d --env int --app idp --log events.log <<QUERY
          fields @timestamp
          | limit 9999
        QUERY

        Query full log group, full timestamps, query as an arugment

        #{$PROGRAM_NAME} \\
          --group int_/srv/idp/shared/log/production.log \\
          --start "2020-01-01T00:00:00-00:00" \\
          --to "2020-12-31T23:59:59-00:00" \\
          --query "fields @timestamp | limit 9999"

        Query disjoint time slices via --date

        #{$PROGRAM_NAME} \\
          --date 2023-01-01,2023-02-01
          --env int --app idp --log events.log
          --query "fields @timestamp | limit 9999"

        Timestamps
        =======================

        * Can be anything Ruby's Time.parse understands:

          "2021-01-01T00:00:00-07:00"
          "2021-01-01 00:00:00 -0700"

        * Can be represented as duration shorthands:

          10min - 10 minutes ago
          9h    - 9 hours ago
          8d    - 8 days ago
          7w    - 7 weeks ago
          6mon  - 6 months ago
          5y    - 5 years ago

        Options
        =======================
      TXT

      opts.on('--env ENV', '(optional)') do |env|
        config.env = env
      end

      opts.on('--app APP', '(optional)') do |app|
        config.app = app
      end

      opts.on('--log LOG', 'shorthand log group (ex "events.log"), needs --app and --env') do |log|
        config.log = log
      end

      opts.on(
        '--group GROUP',
        'shorthand log group (ex "int_/srv/idp/shared/log/production.log")',
      ) do |group|
        config.group = group
      end

      opts.on(
        '--from STR', '--start STR',
        'query start, see Timestamps above for examples'
      ) do |str|
        if (duration = parse_duration(str))
          config.from = duration.ago(now)
        else
          config.from = Time.parse(str) # rubocop:disable Rails/TimeZone
        end
      end

      opts.on(
        '--to STR', '--end STR',
        '(optional, defaults to now) query end, see Timestamps above for examples'
      ) do |str|
        if (duration = parse_duration(str))
          config.to = duration.ago(now)
        else
          config.to = Time.parse(str) # rubocop:disable Rails/TimeZone
        end
      end

      opts.on(
        '--num-threads NUM',
        "number of threads, defaults to #{Reporting::CloudwatchClient::DEFAULT_NUM_THREADS}",
      ) do |str|
        config.num_threads = Integer(str, 10)
      end

      opts.on(
        '--date DATE,DATE',
        '(optional) dates to query, can be disjoint, to provide time slices to query'
      ) do |dates|
        dates.split(',').each do |date_str|
          config.time_slices << Date.parse(date_str).in_time_zone('UTC').all_day
        end
      end

      opts.on('--query QUERY', 'Cloudwatch Insights query') do |query|
        config.query = query
      end

      opts.on('--slice SLICE', '(optional) query slice size duration, defaults to 1w') do |slice|
        config.slice = parse_duration(slice)
      end

      opts.on('--json', 'format output as newline-delimited JSON (nd-json)') do
        config.format = :json
      end

      opts.on('--csv', '(default) format output as CSV') do
        config.format = :csv
      end

      opts.on('--sqlite [FILENAME]', 'load output into the given Sqlite database (events.db by default). Output MUST include @message and @timestamp') do |filename|
        config.format = :sqlite
        config.sqlite_database_file = filename || 'events.db'
      end

      opts.on(
        '--[no-]complete',
        'whether or not to split query slices if exactly 10k rows are returned, defaults to off',
      ) do |complete|
        config.complete = complete
      end

      opts.on('--[no-]progress', 'whether or not show a progress bar, defaults to on') do |progress|
        config.progress = progress
      end

      opts.on('--count-distinct FIELD', 'get distinct count for given field') do |count_distinct|
        config.count_distinct = count_distinct
      end

      opts.on('-h', '--help', 'Prints this help') do
        stdout.puts opts
        exit
        return # rubocop:disable Lint/NonLocalExitFromIterator, Lint/UnreachableCode
      end
    end.parse!(argv)
    # rubocop:enable Metrics/BlockLength

    if (app = config.app) && (env = config.env) && !(log = config.log).blank?
      config.group = build_group(app: app, env: env, log: log)
    end

    errors = []

    if config.group.blank?
      errors << 'ERROR: missing log group --group or (--app and --env and --log)'
    end

    if !config.from && config.time_slices.empty?
      errors << 'ERROR: missing time range --from or --start, or --date'
    end

    if !config.query
      if stdin.tty?
        errors << 'ERROR: missing query, either STDIN or --query)'
      else
        config.query = stdin.read
      end
    end

    if config.format == :sqlite
      errors << "ERROR: can't do --count-distinct with --sqlite" if config.count_distinct
    end

    if config.count_distinct
      config.complete = true
      config.query = [
        config.query,
        "| stats count(*) by #{config.count_distinct}",
        "| limit #{Reporting::CloudwatchClient::MAX_RESULTS_LIMIT}",
      ].join("\n")
    end

    if !errors.empty?
      stdout.puts errors
      exit 1
      return # rubocop:disable Lint/UnreachableCode
    end

    config
  end

  # @param [String] log ex production.log
  # @param [String] app ex idp
  # @param [String] env ex int
  def self.build_group(log:, app:, env:)
    "#{env}_/srv/#{app}/shared/log/#{log}"
  end

  # @param [String] value a string such as 1min, 2h, 3d, 4w, 5mon, 6y
  # @return [ActiveSupport::Duration]
  def self.parse_duration(value)
    Reporting::CloudwatchQueryTimeSlice.parse_duration(value)
  end
end

if __FILE__ == $PROGRAM_NAME
  config = QueryCloudwatch.parse!(argv: ARGV, stdin: STDIN, stdout: STDOUT)

  QueryCloudwatch.new(config).run
end