treasure-data/embulk-input-mixpanel

View on GitHub
lib/embulk/input/service/jql_service.rb

Summary

Maintainability
C
1 day
Test Coverage
require 'embulk/input/service/base_service'

module Embulk
  module Input
    module Service
      class JqlService < BaseService

        FROM_DATE_PARAM = "params.from_date"
        TO_DATE_PARAM = "params.to_date"

        def validate_config
          super

          validate_jql_script
          validate_fetch_days
        end

        def create_task
          {
            timezone: @config.param(:timezone, :string, default: ""),
            api_secret: @config.param(:api_secret, :string),
            jql_endpoint: endpoint,
            dates: range,
            incremental: @config.param(:incremental, :bool, default: true),
            slice_range: @config.param(:slice_range, :integer, default: 7),
            schema: @config.param(:columns, :array),
            retry_initial_wait_sec: @config.param(:retry_initial_wait_sec, :integer, default: 1),
            retry_limit: @config.param(:retry_limit, :integer, default: 5),
            incremental_column: @config.param(:incremental_column, :string, default: nil),
            latest_fetched_time: @config.param(:latest_fetched_time, :integer, default: 0),
            jql_mode: true,
            jql_script: @config.param(:jql_script, :string, nil)
          }
        end

        def guess_columns
          giveup_when_mixpanel_is_down
          range = guess_range
          Embulk.logger.info "Guessing schema using #{range.first}..#{range.last}"

          client = create_client

          sample_records = client.send_jql_script_small_dataset(parameters(@config.param(:jql_script, :string, nil), range.first, range.last))

          validate_result(sample_records)

          @incremental = @config.param(:incremental, :bool, default: true)
          @incremental_column = @config.param(:incremental_column, :string, default: nil)
          validate_result_contain_incremental_column(sample_records)

          guess_from_records(sample_records)
        end

        def ingest(task, page_builder)
          @dates = task[:dates]
          @schema = task[:schema]
          @timezone = task[:timezone]
          @incremental_column = task[:incremental_column]
          unless @incremental_column
            Embulk.logger.warn "incremental_column should be specified when running in incremental mode to avoid duplicated"
            Embulk.logger.warn "Use default value #{DEFAULT_TIME_COLUMN}"
            @incremental_column = DEFAULT_TIME_COLUMN
          end

          @incremental = task[:incremental]
          latest_fetched_time = task[:latest_fetched_time]

          client = create_client

          ignored_fetched_record_count = 0
          next_fetched_time = latest_fetched_time
          @dates.each_slice(task[:slice_range]) do |slice_dates|
            Embulk.logger.info "Fetching date from #{slice_dates.first}..#{slice_dates.last}"
            if preview?
              records = client.send_jql_script_small_dataset(parameters(@config.param(:jql_script, :string, default: nil), slice_dates.first, slice_dates.last))
            else
              records = client.send_jql_script(parameters(task[:jql_script], slice_dates.first, slice_dates.last))
            end
            validate_result(records)
            records.each do |record|
              if @incremental
                if @schema.map {|col| col["name"]}.include?(@incremental_column)
                  record_incremental_column = record[@incremental_column]
                  if record_incremental_column
                    if record_incremental_column <= latest_fetched_time
                      ignored_fetched_record_count += 1
                      next
                    else
                      next_fetched_time = [record_incremental_column, next_fetched_time].max
                    end
                  end
                else
                  raise Embulk::ConfigError.new("Missing Incremental Field (<incremental_column>) in the returned dataset. Specify the correct Incremental Field value.")
                end
              end
              values = extract_values(record)
              page_builder.add(values)
            end
            break if preview?
          end
          Embulk.logger.info "Skip #{ignored_fetched_record_count} rows"
          page_builder.finish

          if task[:incremental] && !preview?
            return create_task_report(next_fetched_time)
          end
          {}
        end

        def guess_range
          time_zone = @config.param(:timezone, :string, default: "")
          from_date = @config.param(:from_date, :string, default: default_guess_start_date(time_zone).to_s)
          fetch_days = @config.param(:fetch_days, :integer, default: DEFAULT_FETCH_DAYS)

          fetch_days = [fetch_days, DEFAULT_FETCH_DAYS].min

          range = RangeGenerator.new(from_date, fetch_days, time_zone).generate_range
          if range.empty?
            return default_guess_start_date(time_zone)..(today(time_zone) - 1)
          end
          range
        end

        def guess_from_records(sample_props)
          validate_result(sample_props)

          begin
            schema = Guess::SchemaGuess.from_hash_records(sample_props)
            schema.map do |col|
              result = {
                name: col.name,
                type: col.type,
              }
              if (col.name.eql? "time") || (col.eql? "last_seen")
                result["format"] = col.format if col.format
              end
              result
            end
          rescue DataError
            raise Embulk::ConfigError.new("Non-supported result #{sample_props}. Revise your JQL.")
          end
        end

        def parameters(script, from_date, to_date)
          {
            params: params(from_date, to_date),
            script: script
          }
        end

        def adjust_timezone(epoch)
          # Adjust timezone offset to get UTC time
          # c.f. https://mixpanel.com/docs/api-documentation/exporting-raw-data-you-inserted-into-mixpanel#export
          tz = TZInfo::Timezone.get(@timezone)

          begin
            if epoch.present?
              offset = tz.period_for_local(epoch, true).offset.utc_total_offset
              epoch - offset
            end
              # TZInfo::PeriodNotFound signals that there is no equivalent UTC time (for example,
              # during the transition from standard time to daylight savings time when the clocks are moved forward and an hour is skipped).
              # tz.local_time(2018, 3, 11, 2, 30, 0, 0)
          rescue TZInfo::PeriodNotFound
            epoch + 1.hour
            offset = tz.period_for_local(epoch, true).offset.utc_total_offset
            epoch - offset
          end
        end

        def next_from_date(task_report)
          next_to_date = Date.parse(task_report[:to_date])
          {
            from_date: next_to_date.to_s,
            latest_fetched_time: task_report[:latest_fetched_time],
          }
        end

        def endpoint
          @config.param(:jql_endpoint, :string, default: Embulk::Input::MixpanelApi::Client::DEFAULT_JQL_ENDPOINT)
        end

        private

        def create_task_report(next_fetched_time)
          {
            to_date: @dates.last || today(@timezone) - 1,
            latest_fetched_time: next_fetched_time.to_s
          }
        end

        def params(from_date, to_date)
          {
            from_date: from_date,
            to_date: to_date
          }
        end

        def extract_value(record, name)
          case name
          when NOT_PROPERTY_COLUMN
            record[NOT_PROPERTY_COLUMN]
          when "time"
            if record["time"].present?
              value = record["time"]
              if value > 0
                time = record["time"] / 1000
                adjust_timezone(time)
              else
                value
              end
            end
          when "last_seen"
            if record["last_seen"].present?
              value = record["last_seen"]
              if value > 0
                # last_seen format in ms
                time = record["last_seen"] / 1000
                adjust_timezone(time)
              else
                value
              end
            end
          when @incremental_column
            if record[@incremental_column].present?
              value = record[@incremental_column]
              if value > 0
                # format in ms
                time = record[@incremental_column] / 1000
                adjust_timezone(time)
              else
                value
              end
            end
          else
            record[name]
          end
        end

        def validate_result(records)
          if records.is_a?(Array) && records.first.is_a?(Integer)
            # incase using reduce, it only return the number of records
            raise Embulk::ConfigError.new("Non-supported result. Revise your JQL.")
          end
        end

        def validate_result_contain_incremental_column(records)
          unless @incremental_column
            Embulk.logger.warn "incremental_column should be specified when running in incremental mode to avoid duplicated"
            Embulk.logger.warn "Use default value #{DEFAULT_TIME_COLUMN}"
            @incremental_column = DEFAULT_TIME_COLUMN
          end

          if @incremental && records.length > 0 && !records[0].include?(@incremental_column)
            raise Embulk::ConfigError.new("Missing Incremental Field (<incremental_column>) in the returned dataset. Specify the correct Incremental Field value.")
          end
        end

        def validate_jql_script
          jql_script = @config.param(:jql_script, :string, default: nil)
          if jql_script.blank?
            raise Embulk::ConfigError.new("JQL script shouldn't be empty or null")
          end
        end

        def validate_fetch_days
          fetch_days = @config.param(:fetch_days, :integer, default: nil)
          if fetch_days && fetch_days <= 0
            raise Embulk::ConfigError.new("fetch_days should be larger than 0")
          end
        end

      end
    end
  end
end