lib/snowplow-tracker/emitters.rb
# Copyright (c) 2013-2021 Snowplow Analytics Ltd. All rights reserved.
#
# This program is licensed to you under the Apache License Version 2.0,
# and you may not use this file except in compliance with the Apache License Version 2.0.
# You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0.
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the Apache License Version 2.0 is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the Apache License Version 2.0 for the specific language governing permissions and limitations there under.
# Author:: Snowplow Analytics Ltd
# Copyright:: Copyright (c) 2013-2021 Snowplow Analytics Ltd
# License:: Apache License Version 2.0
require 'net/https'
require 'set'
require 'logger'
module SnowplowTracker
# @see Emitter
# For logging Emitter activity messages
LOGGER = Logger.new(STDERR)
LOGGER.level = Logger::INFO
# This class sends events to the event collector. All {Tracker}s must have at
# least one associated Emitter or the subclass AsyncEmitter.
#
# The network settings are defined as part of the Emitter initalization. This
# table displays the default Emitter settings:
#
# | Property | Default setting |
# | --- | --- |
# | Protocol | HTTP |
# | Method | GET |
# | Buffer size | 1 |
# | Path | `/i` |
#
# The buffer size is the number of events which will be buffered before they
# are all sent simultaneously. The process of sending all buffered events is
# called "flushing". The default buffer size is 1 because GET requests can
# only contain one event.
#
# If you choose to use POST requests, the buffer_size defaults to 10, and the
# buffered events are all sent together in a single request. The default path
# is '/com.snowplowanalytics.snowplow/tp2' for Emitters using POST.
#
# # Logging
# Emitters log their activity to STDERR by default, using the Ruby standard
# library Logger class. A different logger can be configured during Emitter
# initialization. For example, to disable logging, you could provide
# `Logger.new(IO::NULL)` in the options hash.
#
# By default, only messages with priority "INFO" or higher will be logged.
# This can be changed at any time for the default logger, which is saved as a
# module constant (`LOGGER = Logger.new(STDERR)`). If you are not using the
# default logger, set the message level before initializing your Emitter.
#
# @see https://ruby-doc.org/stdlib-2.7.2/libdoc/logger/rdoc/Logger.html Logger documentation
#
# @example Changing the logger message level.
# require 'logger'
# SnowplowTracker::LOGGER.level = Logger::DEBUG
class Emitter
# Default Emitter settings
DEFAULT_CONFIG = {
protocol: 'http',
method: 'get'
}
# @private
attr_reader :logger
# Create a new Emitter instance. The endpoint is required.
#
# @example Initializing an Emitter with all the possible extra configuration.
# success_callback = ->(success_count) { puts "#{success_count} events sent successfully" }
# failure_callback = ->(success_count, failures) do
# puts "#{success_count} events sent successfully, #{failures.size} sent unsuccessfully"
# end
#
# SnowplowTracker::Emitter.new(endpoint: 'collector.example.com',
# options: { path: '/my-pipeline/1',
# protocol: 'https',
# port: 443,
# method: 'post',
# buffer_size: 5,
# on_success: success_callback,
# on_failure: failure_callback,
# logger: Logger.new(STDOUT) })
#
# The options hash can have any of these optional parameters:
#
# | Parameter | Description | Type |
# | --- | --- | --- |
# | path | Override the default path for appending to the endpoint | String |
# | protocol | 'http' or 'https' | String |
# | port | The port for the connection | Integer |
# | method | 'get' or 'post' | String |
# | buffer_size | Number of events to send at once | Integer |
# | on_success | A method to call if events were sent successfully | Method |
# | on_failure | A method to call if events did not send | Method |
# | thread_count | Number of threads to use | Integer |
# | logger | Log somewhere other than STDERR | Logger |
#
# Note that `thread_count` is relevant only to the subclass {AsyncEmitter},
# and will be ignored if provided to an Emitter.
#
# If you choose to use HTTPS, we recommend using port 443.
#
# Only 2xx and 3xx status codes are considered successes.
#
# The `on_success` callback should accept one argument: the number of
# requests sent this way. The `on_failure` callback should accept two
# arguments: the number of successfully sent events, and an array containing
# the unsuccessful events.
#
# @param endpoint [String] the endpoint to send the events to
# @param options [Hash] allowed configuration options
#
# @see AsyncEmitter#initialize
# @api public
def initialize(endpoint:, options: {})
config = DEFAULT_CONFIG.merge(options)
@lock = Monitor.new
path = confirm_path(config)
@collector_uri = create_collector_uri(endpoint, config[:protocol], config[:port], path)
@buffer = []
@buffer_size = confirm_buffer_size(config)
@method = config[:method]
@on_success = config[:on_success]
@on_failure = config[:on_failure]
@logger = config[:logger] || LOGGER
logger.info("#{self.class} initialized with endpoint #{@collector_uri}")
end
# Creates the `@buffer_size` variable during initialization. Unless
# otherwise defined, it's 1 for Emitters using GET and 10 for Emitters using
# POST requests.
# @private
def confirm_buffer_size(config)
return config[:buffer_size] unless config[:buffer_size].nil?
config[:method] == 'get' ? 1 : 10
end
# Creates the `@path` variable during initialization. Allows a non-standard
# path to be provided.
# @private
def confirm_path(config)
return config[:path] unless config[:path].nil?
config[:method] == 'get' ? '/i' : '/com.snowplowanalytics.snowplow/tp2'
end
# Creates the `@collector_uri` variable during initialization.
# The default is "http://{endpoint}/i".
# @private
def create_collector_uri(endpoint, protocol, port, path)
port_string = port.nil? ? '' : ":#{port}"
"#{protocol}://#{endpoint}#{port_string}#{path}"
end
# Add an event to the buffer and flush it if maximum size has been reached.
# This method is not required for standard Ruby tracker usage. A {Tracker}
# privately calls this method once the event payload is ready to send.
#
# We have included it as part of the public API for its possible use in the
# `on_failure` callback. This is the optional method, provided in the
# `options` Emitter initalization hash, that is called when events fail
# to send. You could use {#input} as part of your callback to immediately
# retry the failed event.
#
# The `on_failure` callback should accept two arguments: the number of
# successfully sent events, and an array containing the unsuccessful events.
#
# @example A possible `on_failure` method using `#input`
# def retry_on_failure(failed_event_count, failed_events)
# # possible backoff-and-retry timeout here
# failed_events.each do |event|
# my_emitter.input(event)
# end
# end
#
# @api public
def input(payload)
payload.each { |k, v| payload[k] = v.to_s }
@lock.synchronize do
@buffer.push(payload)
flush if @buffer.size >= @buffer_size
end
nil
end
# Flush the Emitter, forcing it to send all the events in its
# buffer, even if the buffer is not full. {Emitter} objects, unlike
# {AsyncEmitter}s, can only `flush` synchronously. A {Tracker} can manually flush all
# its Emitters by calling {Tracker#flush}, part of the public API which
# calls this method.
#
# The unused async parameter here is to avoid ArgumentError, since
# {AsyncEmitter#flush} does take an argument.
#
# @see AsyncEmitter#flush
# @private
def flush(_async = true)
@lock.synchronize do
send_requests(@buffer)
@buffer = []
end
nil
end
# Send all events in the buffer to the collector
# @private
def send_requests(events)
if events.empty?
logger.info('Skipping sending events since buffer is empty')
return
end
logger.info("Attempting to send #{events.size} request#{events.size == 1 ? '' : 's'}")
events.each do |event|
# add the sent timestamp, overwrite if already exists
event['stm'] = Timestamp.create.to_s
end
if @method == 'post'
send_requests_with_post(events)
elsif @method == 'get'
send_requests_with_get(events)
end
nil
end
# Part of {#send_requests}.
# @private
def send_requests_with_post(events)
post_succeeded = false
begin
request = http_post(SelfDescribingJson.new(
'iglu:com.snowplowanalytics.snowplow/payload_data/jsonschema/1-0-4',
events
).to_json)
post_succeeded = good_status_code?(request.code)
rescue StandardError => standard_error
logger.warn(standard_error)
end
if post_succeeded
@on_success.call(events.size) unless @on_success.nil?
else
@on_failure.call(0, events) unless @on_failure.nil?
end
nil
end
# Part of {#send_requests}.
# @private
def send_requests_with_get(events)
success_count = 0
unsent_requests = []
events.each do |event|
request = process_get_event(event)
request ? success_count += 1 : unsent_requests << event
end
if unsent_requests.size.zero?
@on_success.call(success_count) unless @on_success.nil?
else
@on_failure.call(success_count, unsent_requests) unless @on_failure.nil?
end
nil
end
# Part of {#send_requests_with_get}.
# @private
def process_get_event(event)
get_succeeded = false
begin
request = http_get(event)
get_succeeded = good_status_code?(request.code)
rescue StandardError => standard_error
logger.warn(standard_error)
end
get_succeeded
end
# Part of {#process_get_event}. This sends a GET request.
# @private
def http_get(payload)
destination = URI(@collector_uri + '?' + URI.encode_www_form(payload))
logger.info("Sending GET request to #{@collector_uri}...")
logger.debug("Payload: #{payload}")
http = Net::HTTP.new(destination.host, destination.port)
request = Net::HTTP::Get.new(destination.request_uri)
http.use_ssl = true if destination.scheme == 'https'
response = http.request(request)
logger.add(good_status_code?(response.code) ? Logger::INFO : Logger::WARN) do
"GET request to #{@collector_uri} finished with status code #{response.code}"
end
response
end
# Part of {#send_requests_with_post}. This sends a POST request.
# @private
def http_post(payload)
logger.info("Sending POST request to #{@collector_uri}...")
logger.debug("Payload: #{payload}")
destination = URI(@collector_uri)
http = Net::HTTP.new(destination.host, destination.port)
request = Net::HTTP::Post.new(destination.request_uri)
http.use_ssl = true if destination.scheme == 'https'
request.body = payload.to_json
request.set_content_type('application/json; charset=utf-8')
response = http.request(request)
logger.add(good_status_code?(response.code) ? Logger::INFO : Logger::WARN) do
"POST request to #{@collector_uri} finished with status code #{response.code}"
end
response
end
# Check if the response is good.
# Only 2xx and 3xx status codes are considered successes.
# @private
def good_status_code?(status_code)
status_code.to_i >= 200 && status_code.to_i < 400
end
private :create_collector_uri,
:http_get,
:http_post
end
# This {Emitter} subclass provides asynchronous event sending. Whenever the
# buffer is flushed, the AsyncEmitter places the flushed events in a work
# queue. The AsyncEmitter asynchronously sends events in this queue using a
# thread pool of a fixed size. The size of the thread pool is 1 by default,
# but can be configured as part of the options hash during initialization.
#
# @see Emitter
# @api public
class AsyncEmitter < Emitter
# Create a new AsyncEmitter object. The endpoint is required.
#
# @example Initializing an AsyncEmitter with all the possible extra configuration.
# success_callback = ->(success_count) { puts "#{success_count} events sent successfully" }
# failure_callback = ->(success_count, failures) do
# puts "#{success_count} events sent successfully, #{failures.size} sent unsuccessfully"
# end
#
# SnowplowTracker::Emitter.new(endpoint: 'collector.example.com',
# options: { path: '/my-pipeline/1',
# protocol: 'https',
# port: 443,
# method: 'post',
# buffer_size: 5,
# on_success: success_callback,
# on_failure: failure_callback,
# logger: Logger.new(STDOUT),
# thread_count: 5 })
#
# The options hash can have any of these optional parameters:
#
# | Parameter | Description | Type |
# | --- | --- | --- |
# | path | Override the default path for appending to the endpoint | String |
# | protocol | 'http' or 'https' | String |
# | port | The port for the connection | Integer |
# | method | 'get' or 'post' | String |
# | buffer_size | Number of events to send at once | Integer |
# | on_success | A function to call if events were sent successfully | Function |
# | on_failure | A function to call if events did not send | Function |
# | thread_count | Number of threads to use | Integer |
# | logger | Log somewhere other than STDERR | Logger |
#
# The `thread_count` determines the number of worker threads which will be
# used to send events.
#
# If you choose to use HTTPS, we recommend using port 443.
#
# Only 2xx and 3xx status codes are considered successes.
#
# The `on_success` callback should accept one argument: the number of
# requests sent this way. The `on_failure` callback should accept two
# arguments: the number of successfully sent events, and an array containing
# the unsuccessful events.
#
# @note if you test the AsyncEmitter by using a short script to send an
# event, you may find that the event fails to send. This is because the
# process exits before the flushing thread is finished. You can get round
# this either by adding a sleep(10) to the end of your script or by using
# the synchronous flush.
#
# @param endpoint [String] the endpoint to send the events to
# @param options [Hash] allowed configuration options
#
# @see Emitter#initialize
# @api public
def initialize(endpoint:, options: {})
@queue = Queue.new
# @all_processed_condition and @results_unprocessed are used to emulate Python's Queue.task_done()
@queue.extend(MonitorMixin)
@all_processed_condition = @queue.new_cond
@results_unprocessed = 0
(options[:thread_count] || 1).times { Thread.new { consume } }
super(endpoint: endpoint, options: options)
end
# AsyncEmitters use the MonitorMixin module, which provides the
# `synchronize` and `broadcast` methods.
# @private
def consume
loop do
work_unit = @queue.pop
send_requests(work_unit)
@queue.synchronize do
@results_unprocessed -= 1
@all_processed_condition.broadcast
end
end
end
# Flush the Emitter, forcing it to send all the events in its buffer, even
# if the buffer is not full.
#
# If `async` is true (the default), events are sent even if the queue is not
# empty. If `async` is false, it blocks until all queued events have been
# sent. Note that this method can be called by public API method
# {Tracker#flush}, which has a default of `async` being false.
#
# @param async [Bool] whether to flush asynchronously or not
#
# @see Emitter#flush
# @private
def flush(async = true)
loop do
@lock.synchronize do
@queue.synchronize { @results_unprocessed += 1 }
@queue << @buffer
@buffer = []
end
unless async
logger.info('Starting synchronous flush')
@queue.synchronize do
@all_processed_condition.wait_while { @results_unprocessed > 0 }
logger.info('Finished synchronous flush')
end
end
break if @buffer.empty?
end
end
end
end