treasure-data/embulk-input-mixpanel

View on GitHub
lib/embulk/input/service/export_service.rb

Summary

Maintainability
C
1 day
Test Coverage
require 'embulk/input/service/base_service'

module Embulk
  module Input
    module Service
      class ExportService < BaseService

        # https://mixpanel.com/help/questions/articles/special-or-reserved-properties
        # https://mixpanel.com/help/questions/articles/what-properties-do-mixpanels-libraries-store-by-default
        #
        # JavaScript to extract key names from HTML: run it on Chrome Devtool when opening their document
        # > Array.from(document.querySelectorAll("strong")).map(function(s){ return s.textContent.match(/[A-Z]/) ? s.parentNode.textContent.match(/\((.*?)\)/)[1] : s.textContent.split(",").join(" ") }).join(" ")
        # > Array.from(document.querySelectorAll("li")).map(function(s){ m = s.textContent.match(/\((.*?)\)/); return m && m[1] }).filter(function(k) { return k && !k.match("utm") }).join(" ")
        KNOWN_KEYS = %W(
        #{NOT_PROPERTY_COLUMN}
        distinct_id ip mp_name_tag mp_note token time mp_country_code length campaign_id $email $phone $distinct_id $ios_devices $android_devices $first_name  $last_name  $name $city $region $country_code $timezone $unsubscribed
        $city $region mp_country_code $browser $browser_version $device $current_url $initial_referrer $initial_referring_domain $os $referrer $referring_domain $screen_height $screen_width $search_engine $city $region $mp_country_code $timezone $browser_version $browser $initial_referrer $initial_referring_domain $os $last_seen $city $region mp_country_code $app_release $app_version $carrier $ios_ifa $os_version $manufacturer $lib_version $model $os $screen_height $screen_width $wifi $city $region $mp_country_code $timezone $ios_app_release $ios_app_version $ios_device_model $ios_lib_version $ios_version $ios_ifa $last_seen $city $region mp_country_code $app_version $bluetooth_enabled $bluetooth_version $brand $carrier $has_nfc $has_telephone $lib_version $manufacturer $model $os $os_version $screen_dpi $screen_height $screen_width $wifi $google_play_services $city $region mp_country_code $timezone $android_app_version $android_app_version_code $android_lib_version $android_os $android_os_version $android_brand $android_model $android_manufacturer $last_seen
          ).uniq.freeze

        def validate_config
          super

          incremental_column = @config.param(:incremental_column, :string, default: nil)
          latest_fetched_time = @config.param(:latest_fetched_time, :integer, default: 0)
          fetch_custom_properties = @config.param(:fetch_custom_properties, :bool, default: true)
          fetch_unknown_columns = @config.param(:fetch_unknown_columns, :bool, default: false)

          if !incremental_column.nil? && !latest_fetched_time.nil? && (incremental_column_upper_limit <= latest_fetched_time)
            raise Embulk::ConfigError.new("Incremental column upper limit (job_start_time - incremental_column_upper_limit_delay_in_seconds) can't be smaller or equal latest fetched time #{latest_fetched_time}")
          end

          if fetch_unknown_columns && fetch_custom_properties
            raise Embulk::ConfigError.new("Don't set true both `fetch_unknown_columns` and `fetch_custom_properties`.")
          end
        end

        def create_task
          {
            params: export_params,
            dates: range,
            timezone: @config.param(:timezone, :string, default: ""),
            export_endpoint: endpoint,
            api_secret: @config.param(:api_secret, :string),
            schema: @config.param(:columns, :array),
            fetch_unknown_columns: @config.param(:fetch_unknown_columns, :bool, default: false),
            fetch_custom_properties: @config.param(:fetch_custom_properties, :bool, default: true),
            retry_initial_wait_sec: @config.param(:retry_initial_wait_sec, :integer, default: 1),
            incremental_column: @config.param(:incremental_column, :string, default: nil),
            retry_limit: @config.param(:retry_limit, :integer, default: 5),
            latest_fetched_time: @config.param(:latest_fetched_time, :integer, default: 0),
            incremental: @config.param(:incremental, :bool, default: true),
            slice_range: @config.param(:slice_range, :integer, default: 7),
            job_start_time: Time.now.to_i * 1000,
            incremental_column_upper_limit: incremental_column_upper_limit,
            allow_partial_import: @config.param(:allow_partial_import, :bool, default: true)
          }
        end

        def next_from_date(task_report)
          next_to_date = Date.parse(task_report[:to_date])
          {
            from_date: next_to_date.to_s,
            latest_fetched_time: task_report[:latest_fetched_time],
          }
        end

        def ingest(task, page_builder)
          giveup_when_mixpanel_is_down

          @schema = task[:schema]
          @timezone = task[:timezone]

          Embulk.logger.info "Job start time is #{task[:job_start_time]}"

          dates = task[:dates]
          prev_latest_fetched_time = task[:latest_fetched_time] || 0
          prev_latest_fetched_time_format = Time.at(prev_latest_fetched_time).strftime("%F %T %z")
          current_latest_fetched_time = prev_latest_fetched_time
          incremental_column = task[:incremental_column]
          incremental = task[:incremental]
          fetch_unknown_columns = task[:fetch_unknown_columns]

          dates.each_slice(task[:slice_range]) do |slice_dates|
            ignored_fetched_record_count = 0
            # There is the issue with Mixpanel time field during the transition from standard to daylight saving time
            # in the US timezone i.e. 11 Mar 2018 2AM - 2:59AM, time within that period must not be existed,
            # due to daylight saving, time will be forwarded 1 hour from 2AM to 3AM.
            #
            # All of records with wrong timezone will be ignored instead of throw exception out
            ignored_wrong_daylight_tz_record_count = 0
            unless preview?
              Embulk.logger.info "Fetching data from #{slice_dates.first} to #{slice_dates.last} ..."
            end
            record_time_column = incremental_column || DEFAULT_TIME_COLUMN
            begin
              fetch(slice_dates, prev_latest_fetched_time, task).each do |record|
                if incremental
                  if !record["properties"].include?(record_time_column)
                    raise Embulk::ConfigError.new("Incremental column not exists in fetched data #{record_time_column}")
                  end
                  record_time = record["properties"][record_time_column]
                  if incremental_column.nil?
                    if record_time <= prev_latest_fetched_time
                      ignored_fetched_record_count += 1
                      next
                    end
                  end

                  current_latest_fetched_time = [
                    current_latest_fetched_time,
                    record_time,
                  ].max
                end
                begin
                  values = extract_values(record)
                  if fetch_unknown_columns
                    unknown_values = extract_unknown_values(record)
                    values << unknown_values.to_json
                  end
                  if task[:fetch_custom_properties]
                    values << collect_custom_properties(record)
                  end
                  page_builder.add(values)
                rescue TZInfo::PeriodNotFound
                  ignored_wrong_daylight_tz_record_count += 1
                end
              end
            rescue MixpanelApi::IncompleteExportResponseError
              if !task[:allow_partial_import]
                #   re raise the exception if we don't allow partial import
                raise
              end
            end
            if ignored_fetched_record_count > 0
              Embulk.logger.warn "Skipped already loaded #{ignored_fetched_record_count} records. These record times are older or equal than previous fetched record time (#{prev_latest_fetched_time} @ #{prev_latest_fetched_time_format})."
            end
            if ignored_wrong_daylight_tz_record_count > 0
              Embulk.logger.warn "Skipped #{ignored_wrong_daylight_tz_record_count} records due to corrupted Mixpanel time transition from standard to daylight saving"
            end
            break if preview?
          end
          page_builder.finish
          create_task_report(current_latest_fetched_time, dates.last, task[:timezone])
        end

        def create_task_report(current_latest_fetched_time, to_date, timezone)
          {
            latest_fetched_time: current_latest_fetched_time,
            to_date: to_date || today(timezone) - 1,
          }
        end

        def guess_columns
          giveup_when_mixpanel_is_down
          range = guess_range
          Embulk.logger.info "Guessing schema using #{range.first}..#{range.last} records"

          params = export_params.merge(
            "from_date"=>range.first,
            "to_date"=>range.last,
          )

          client = create_client
          guess_from_records(client.export_for_small_dataset(params))
        end

        def guess_range
          time_zone = @config.param(:timezone, :string, default: "")
          from_date = @config.param(:from_date, :string, default: default_guess_start_date(time_zone).to_s)
          fetch_days = @config.param(:fetch_days, :integer, default: DEFAULT_FETCH_DAYS)
          range = RangeGenerator.new(from_date, fetch_days, time_zone).generate_range
          if range.empty?
            return default_guess_start_date(time_zone)..(today(time_zone) - 1)
          end
          range
        end

        def export_params
          event = @config.param(:event, :array, default: nil)
          event = event.nil? ? nil : event.to_json
          {
            event: event,
            where: @config.param(:where, :string, default: nil),
            bucket: @config.param(:bucket, :string, default: nil),
          }
        end

        def guess_from_records(records)
          sample_props = records.map {|r| r["properties"]}
          schema = Guess::SchemaGuess.from_hash_records(sample_props)
          columns = schema.map do |col|
            next if col.name == "time"
            result = {
              name: col.name,
              type: col.type,
            }
            result["format"] = col.format if col.format
            result
          end.compact
          columns.unshift(name: NOT_PROPERTY_COLUMN, type: :string)
          # Shift incremental column to top
          columns.unshift(name: "time", type: :long)
        end

        def fetch(dates, last_fetch_time, task, &block)
          from_date = dates.first
          to_date = dates.last
          params = task[:params].merge(
            "from_date"=>from_date,
            "to_date"=>to_date
          )
          incremental_column = task[:incremental_column]
          if !incremental_column.nil? # can't do filter on time column, time column need to be filter manually.
            params = params.merge(
              "where"=>"#{params['where'].nil? ? '' : "(#{params['where']}) and " }properties[\"#{incremental_column}\"] > #{last_fetch_time || 0} and properties[\"#{incremental_column}\"] < #{task[:incremental_column_upper_limit]}"
            )
          end
          Embulk.logger.info "Where params is #{params["where"]}"

          client = create_client

          if preview?
            client.export_for_small_dataset(params)
          else
            Enumerator.new do |y|
              client.export(params) do |record|
                y << record
              end
            end
          end
        end

        def endpoint
          @config.param(:export_endpoint, :string, default: Embulk::Input::MixpanelApi::Client::DEFAULT_EXPORT_ENDPOINT)
        end

        private

        def incremental_column_upper_limit
          job_start_time = Time.now.to_i * 1000
          upper_limit_delay = @config.param(:incremental_column_upper_limit_delay_in_seconds, :integer, default: 0)
          job_start_time - (upper_limit_delay * 1000)
        end

        def extract_value(record, name)
          case name
          when NOT_PROPERTY_COLUMN
            record[NOT_PROPERTY_COLUMN]
          when "time"
            time = record["properties"]["time"]
            adjust_timezone(time)
          else
            record["properties"][name]
          end
        end

        def collect_custom_properties(record)
          specified_columns = @schema.map {|col| col["name"]}
          custom_keys = record["properties"].keys.find_all {|key| !KNOWN_KEYS.include?(key.to_s) && !specified_columns.include?(key.to_s)}
          custom_keys.inject({}) do |result, key|
            result.merge({
              key=>record["properties"][key]
            })
          end
        end

        def extract_unknown_values(record)
          record_keys = record["properties"].keys + [NOT_PROPERTY_COLUMN]
          schema_keys = @schema.map {|column| column["name"]}
          unknown_keys = record_keys - schema_keys

          unless unknown_keys.empty?
            Embulk.logger.warn("Unknown columns exists in record: #{unknown_keys.join(', ')}")
          end

          unknown_keys.inject({}) do |result, key|
            result[key] = extract_value(record, key)
            result
          end
        end
      end
    end
  end
end