lib/embulk/input/google_analytics/plugin.rb
module Embulk
module Input
module GoogleAnalytics
class Plugin < InputPlugin
::Embulk::Plugin.register_input("google_analytics", self)
AUTH_TYPE_JSON_KEY = 'json_key'.freeze
AUTH_TYPE_REFRESH_TOKEN = 'refresh_token'.freeze
# https://developers.google.com/analytics/devguides/reporting/core/dimsmets
def self.transaction(config, &control)
task = task_from_config(config)
unless %w(ga:date ga:dateHour).include?(task["time_series"])
raise ConfigError.new("Unknown time_series '#{task["time_series"]}'. Use 'ga:dateHour' or 'ga:date'")
end
raise ConfigError.new("Unknown Authentication method '#{task['auth_method']}'.") unless task['auth_method']
if task['auth_method'] == Plugin::AUTH_TYPE_REFRESH_TOKEN
unless task['client_id'] && task['client_secret'] && task['refresh_token']
raise ConfigError.new("client_id, client_secret and refresh_token are required when using Oauth authentication")
end
elsif task['auth_method'] == Plugin::AUTH_TYPE_JSON_KEY
if !valid_json?(task["json_key_content"])
raise ConfigError.new("json_key_content is not a valid JSON object")
end
end
columns_list = Client.new(task).get_columns_list
columns = columns_from_task(task).map do |col_name|
col_info = columns_list.find{|col| col[:id] == col_name}
raise ConfigError.new("Unknown metric/dimension '#{col_name}'") unless col_info
col_type =
if col_info[:attributes]
# standard dimension
case col_info[:attributes][:dataType]
when "STRING"
:string
when "INTEGER"
:long
when "PERCENT", "FLOAT", "CURRENCY", "TIME"
:double
end
else
# custom dimension
:string
end
# time_series column should be timestamp
if col_name == task["time_series"]
col_type = :timestamp
end
Column.new(nil, canonicalize_column_name(col_name), col_type)
end
columns << Column.new(nil, "view_id", :string)
resume(task, columns, 1, &control)
end
def self.resume(task, columns, count, &control)
task_reports = yield(task, columns, count)
next_config_diff = task_reports.first
return next_config_diff
end
def self.task_from_config(config)
refresh_token = config.param('refresh_token', :string, default: nil)
json_key_content = config.param("json_key_content", :string, default: nil)
auth_method = Plugin::AUTH_TYPE_REFRESH_TOKEN if refresh_token
auth_method = Plugin::AUTH_TYPE_JSON_KEY if json_key_content && auth_method == nil
{
"auth_method" => auth_method,
"client_id" => config.param("client_id", :string, default: nil),
"client_secret" => config.param("client_secret", :string, default: nil),
"refresh_token" => refresh_token,
"json_key_content" => json_key_content,
"view_id" => config.param("view_id", :string),
"dimensions" => config.param("dimensions", :array, default: []),
"metrics" => config.param("metrics", :array, default: []),
"time_series" => config.param("time_series", :string),
"start_date" => config.param("start_date", :string, default: nil),
"end_date" => config.param("end_date", :string, default: nil),
"incremental" => config.param("incremental", :bool, default: true),
"last_record_time" => config.param("last_record_time", :string, default: nil),
"retry_limit" => config.param("retry_limit", :integer, default: 5),
"retry_initial_wait_sec" => config.param("retry_initial_wait_sec", :integer, default: 2),
}
end
def self.columns_from_task(task)
[
task["time_series"],
task["dimensions"],
task["metrics"],
].flatten.uniq
end
def self.canonicalize_column_name(name)
# ga:dateHour -> date_hour
name.gsub(/^ga:/, "").gsub(/[A-Z]+/, "_\\0").gsub(/^_/, "").downcase
end
def self.guess(config)
Embulk.logger.warn "Don't needed to guess for this plugin"
return {}
end
def self.valid_json?(json_object)
# 'null' string is a valid string for parse function
# However in our case, json_content_key could not be 'null' therefore this check is added
if json_object == "null"
return false
end
begin
JSON.parse(json_object)
return true
rescue JSON::ParserError => e
return false
end
end
def init
if task["start_date"] && !task["end_date"]
task["end_date"] = "today"
end
end
def run
client = Client.new(task, preview?)
columns = self.class.columns_from_task(task) + ["view_id"]
last_record_time = Time.parse(task["last_record_time"]) if task['incremental'] && !task["last_record_time"].blank?
latest_time_series = nil
skip_counter, total_counter = 0, 0
client.each_report_row do |row|
time = row[task["time_series"]]
total_counter += 1
if !preview? && last_record_time && time <= last_record_time
skip_counter += 1
next
end
values = row.values_at(*columns)
page_builder.add values
latest_time_series = [
latest_time_series,
time,
].compact.max
end
page_builder.finish
Embulk.logger.info "Total: #{total_counter} rows."
if skip_counter > 0
Embulk.logger.info "#{skip_counter} rows were ignored because the rows' date is " +
"before \"last_record_time\": #{last_record_time}."
end
if task["incremental"]
calculate_next_times(client.get_profile[:timezone], latest_time_series)
else
{}
end
end
def preview?
org.embulk.spi.Exec.isPreview()
rescue java.lang.NullPointerException
false
end
def calculate_next_times(client_time_zone, fetched_latest_time)
task_report = {}
if fetched_latest_time
# Convert fetched_last_time to user timezone
timezone = ActiveSupport::TimeZone[client_time_zone]
task_report[:start_date] = timezone.nil? ? fetched_latest_time.strftime("%Y-%m-%d") : timezone.parse(fetched_latest_time.to_s).strftime("%Y-%m-%d")
# if end_date specified as statically YYYY-MM-DD, it will be conflict with start_date (end_date < start_date)
# Or when end_date is nil, only start_date will be filled on next run but it is illegal API request.
# Modify end_date as "today" to be safe
if task["end_date"].nil? || task["end_date"].match(/[0-9]{4}-[0-9]{2}-[0-9]{2}/)
task_report[:end_date] = "today" # "today" means now. running at 03:30 AM, will got 3 o'clock data.
else
task_report[:end_date] = task["end_date"]
end
# "start_date" format is YYYY-MM-DD, but ga:dateHour will return records by hourly.
# If run at 2016-07-03 05:00:00, start_date will set "2016-07-03" and got records until 2016-07-03 05:00:00.
# Then next run at 2016-07-04 05:00, will got records between 2016-07-03 00:00:00 and 2016-07-04 05:00:00.
# It will evantually duplicated between 2016-07-03 00:00:00 and 2016-07-03 05:00:00
#
# Date| 2016-07-03 | 2016-07-04
# Hour| 5 | 5
# 1st run ------|----| |
# 2nd run |------------------------|-----
# ^^^^^ duplicated
#
# "last_record_time" option solves that problem
#
# Date| 2016-07-03 | 2016-07-04
# Hour| 5 | 5
# 1st run ------|----| |
# 2nd run #####|-------------------|-----
# ^^^^^ ignored (skipped)
#
task_report[:last_record_time] = fetched_latest_time.strftime("%Y-%m-%d %H:%M:%S %z")
else
# no records fetched, don't modify config_diff
task_report = {
start_date: task["start_date"],
end_date: task["end_date"]
}
# write last_record_time only when last_record_time is not nil and not empty
unless task["last_record_time"].blank?
task_report[:last_record_time] = task["last_record_time"]
end
end
task_report
end
end
end
end
end