adHawk/hanami-events-cloud_pubsub

View on GitHub
lib/hanami/events/cloud_pubsub.rb

Summary

Maintainability
A
0 mins
Test Coverage
# frozen_string_literal: true

require 'concurrent/async'
require 'hanami/events'
require 'hanami/events/cloud_pubsub/version'
require 'hanami/events/cloud_pubsub/middleware/stack'
require 'hanami/events/cloud_pubsub/middleware/logging'
require 'hanami/events/cloud_pubsub/runner'
require 'hanami/events/cloud_pubsub/errors'
require 'dry-configurable'

module Hanami
  module Events
    # CloudPubsub
    module CloudPubsub
      extend Dry::Configurable

      setting :namespace, reader: true

      setting :subscriber, reader: true do
        setting :streams, default: 4
        setting :threads do
          setting :callback, default: 8
          setting :push, default: 4
        end
      end

      setting :pubsub, default: {}, reader: true

      setting :project_id, reader: true
      setting :auto_create_subscriptions, default: false, reader: true
      setting :auto_create_topics, default: false, reader: true
      setting :logger, default: Logger.new($stdout), reader: true
      setting :subscriptions_loader, default: proc {
        abort <<~MSG
          ┌────────────────────────────────────────────────────────────────────────────────┐
          │ You must configure subscriptions_loader param in order to be able to subscribe │
          │ to events. When the worker is setup and ready to subscribe to events, this     │
          │ loader will be called. It must respond to `#call`.                             │
          │                                                                                │
          │ Example                                                                        │
          │ ═══════                                                                        │
          │                                                                                │
          │ Hanami::Events::CloudPubsub.configure do │config│                              │
          │   config.subscriptions_loader = proc do                                        │
          │     Hanami::Utils.require! 'apps/web/subscriptions'                            │
          │   end                                                                          │
          │ end                                                                            │
          └────────────────────────────────────────────────────────────────────────────────┘
        MSG
      }, reader: true
      setting :error_handlers, default: [
        ->(err, msg) do
          logger.error "Message(#{msg}) failed with exception #{err.inspect}"
        end
      ], reader: true

      middleware_stack = Middleware::Stack.new(
        Middleware::Logging.new
      )

      if defined?(Yabeda::Prometheus::Exporter)
        begin
          require 'hanami/events/cloud_pubsub/middleware/prometheus'
          middleware_stack << Middleware::Prometheus.new
        rescue LoadError
          # ok
        end
      end

      setting :middleware, default: middleware_stack

      client_middleware_stack = Middleware::Stack.new

      begin
        require 'request_id'
        require 'hanami/events/cloud_pubsub/middleware/client/request_id'
        require 'hanami/events/cloud_pubsub/middleware/request_id'

        client_middleware_stack.prepend(Middleware::Client::RequestId.new)
        middleware_stack.prepend(Middleware::RequestId.new)
      rescue LoadError
        # ok
      end

      setting :client_middleware, default: client_middleware_stack

      setting :on_shutdown_handlers, default: [], reader: true

      setting :auto_retry do
        setting :enabled, default: false
        setting :max_attempts, default: 1200
        setting :dead_letter_topic_name
        setting :minimum_backoff, default: 30
        setting :maximum_backoff, default: 600
      end

      def self.finalize_settings!
        require 'google/cloud/pubsub'
        conf_hash = config.pubsub
        conf_hash.each { |key, val| Google::Cloud::Pubsub.configure[key] = val }
      end
    end
  end
end