lib/jstreams/publisher.rb
# frozen_string_literal: true
# :nodoc:
module Jstreams
##
# Publishes messages to the given stream.
class Publisher
##
# @param [ConnectionPool] redis_pool Redis connection pool
# @param [Serializer] serializer Serializer
# @param [TaggedLogging] logger Logger
def initialize(redis_pool:, serializer:, logger:)
@redis_pool = redis_pool
@serializer = serializer
@logger = logger
end
##
# Publishes a message to the given stream
#
# @param [String] stream Destination stream name
# @param [Hash] message Message payload
def publish(stream, message)
@logger.tagged('publisher') do
@redis_pool.with do |redis|
redis.xadd(stream, payload: @serializer.serialize(message, stream))
end
@logger.debug { "published to stream #{stream}: #{message.inspect}" }
end
end
end
private_constant :Publisher
end