tomorrowhq/redis-stream

View on GitHub
lib/redis_stream/consumer.rb

Summary

Maintainability
A
0 mins
Test Coverage
module RedisStream
  class Consumer
    include Enumerable

    attr_reader :name, :group, :stream

    def initialize(name:, group:, stream:)
      @name = name
      @group = group
      @stream = stream
    end

    def each(&block)
      while
        result = read_next
        break if result.empty?

        message = result[stream.name].first
        id, content = message
        block.call(load(content))
        Redis.current.xack(stream.name, name, id)
      end
    end

    private

    def read_next
      Redis.current.xreadgroup(group.name, name, stream.name, '>', count: 1)
    end

    def load(message_content)
      Marshal.load(message_content["value"])
    end
  end
end