deliveroo/routemaster

View on GitHub
routemaster/services/ingest.rb

Summary

Maintainability
A
1 hr
Test Coverage
require 'routemaster/services'
require 'routemaster/mixins/assert'
require 'routemaster/mixins/counters'
require 'routemaster/mixins/newrelic'
require 'routemaster/models/batch'
require 'routemaster/models/message'
require 'routemaster/models/job'


module Routemaster
  module Services
    # Enqueue an event for all topic subscribers and update statistics.

    class Ingest
      DEFAULT_PUBLISHER_TAG = 'null'.freeze
      MALFORMED_PUBLISHER_TAG = 'malformed'.freeze
      PUBLISHER_NAME_UUID_SEPARATOR = '--'.freeze

      include Mixins::Assert
      include Mixins::Counters
      include Mixins::Newrelic

      def initialize(topic:, event:, queue:)
        _assert(event.topic == topic.name)
        @topic = topic
        @event = event
        @queue = queue
        @publisher = extract_publisher_name
      end

      def call
        trace_with_newrelic('Custom/Services/ingest') do
          data = Services::Codec.new.dump(@event)

          @topic.subscribers.each do |s|
            trace_with_newrelic("Custom/Services/ingest-#{s.name}") do
              batch = nil

              trace_with_newrelic('Custom/Services/batch-ingest') do
                batch = Models::Batch.ingest(data: data, timestamp: @event.timestamp, subscriber: s)
              end

              job = Models::Job.new(name: 'batch', args: batch.uid, run_at: batch.deadline)

              trace_with_newrelic('Custom/Services/queue-push') do
                @queue.push(job)
              end

              if batch.full?
                trace_with_newrelic('Custom/Services/queue-promote') do
                  @queue.promote(job)
                end
              end
            end
          end

          _counters.incr('events.published', topic: @topic.name, publisher: @publisher)
          _counters.incr('events.bytes', topic: @topic.name, count: data.length, publisher: @publisher)

          trace_with_newrelic('Custom/Services/topic-increment') do
            @topic.increment_count
          end
        end

        self
      end

      private

      def extract_publisher_name
        return DEFAULT_PUBLISHER_TAG unless @topic.publisher
        return MALFORMED_PUBLISHER_TAG unless @topic.publisher.include?(PUBLISHER_NAME_UUID_SEPARATOR)
        @topic.publisher.split(PUBLISHER_NAME_UUID_SEPARATOR).first
      end
    end
  end
end