jstotz/jstreams

View on GitHub
lib/jstreams/publisher.rb

Summary

Maintainability
A
0 mins
Test Coverage
# frozen_string_literal: true

# :nodoc:
module Jstreams
  ##
  # Publishes messages to the given stream.
  class Publisher
    ##
    # @param [ConnectionPool] redis_pool Redis connection pool
    # @param [Serializer] serializer Serializer
    # @param [TaggedLogging] logger Logger
    def initialize(redis_pool:, serializer:, logger:)
      @redis_pool = redis_pool
      @serializer = serializer
      @logger = logger
    end

    ##
    # Publishes a message to the given stream
    #
    # @param [String] stream Destination stream name
    # @param [Hash] message Message payload
    def publish(stream, message)
      @logger.tagged('publisher') do
        @redis_pool.with do |redis|
          redis.xadd(stream, payload: @serializer.serialize(message, stream))
        end
        @logger.debug { "published to stream #{stream}: #{message.inspect}" }
      end
    end
  end

  private_constant :Publisher
end