mixan946/em-sqs-dispatcher

View on GitHub
lib/em-sqs/queue.rb

Summary

Maintainability
A
0 mins
Test Coverage
module EM::SQS
  class Queue

    # Exceptions
    class RequestSizeExceeded < StandardError;end

    # Constans
    SEND_LIMIT_SIZE = 256.kilobytes.freeze
    WAIT_TIME_SECONDS = 20 # Long poll timeout 
    POOL_SIZE = 10

    attr_reader :url, :name

    def initialize(name)
      @url ||= SqsWorker::SQS_CLIENT.get_queue_url(queue_name: name)["queue_url"]
    end

    def send_message(attributes)
      message = attributes.is_a?(Message) ? attributes : Message.new(attributes)

      if message.bytesize > SEND_LIMIT_SIZE
        raise RequestSizeExceeded.new("Send request size exceeded! Limit size is 256KB.")
      end

      SqsWorker::SQS_CLIENT.send_message({queue_url: @url}.merge(message.to_hash))
    end

    def send_message_batch(array)

      all_messages = if array.first.is_a?(Message) 
        array 
      else 
        array.map{|el| Message.new(el) }
      end

      all_messages.each_slice(10).with_index do |messages|
        SqsWorker::SQS_CLIENT.send_message_batch queue_url: @url, 
          entries: messages.map{|m| m.to_hash(with_id: true)}
      end

    end

    def receive_messages(*attributes)
      request = {
        queue_url: @url, 
        wait_time_seconds: WAIT_TIME_SECONDS,
        max_number_of_messages: POOL_SIZE
      }
      request[:message_attribute_names] = attributes if attributes.present?
      messages = SqsWorker::SQS_CLIENT.receive_message(request).messages
      if messages.present?
        messages.map do |struct|  
          ReceivedMessage.new struct
        end
      end
    end

    class Message

      attr_reader :attributes, :name, :bytesize

      # Only String messages implemented now
      def initialize(attributes)
        @name, attributes = attributes.to_a.flatten
        @id = SecureRandom.uuid
        @attributes = attributes.map do |key, value|
          [
            key, 
            {
              string_value: value, 
              data_type: "String"
            }
          ]
        end.to_h
        @bytesize = to_hash.to_json.bytesize
      end

      def to_hash(attrs)
        attrs[:with_id] ||= false
        res = {
          message_body: name,
          delay_seconds: 1,
          message_attributes: attributes
        }
        with_id ? res.merge(id: @id) : res
      end

    end

    class ReceivedMessage

      attr_reader :struct

      def initialize(struct)
        @struct = struct
      end

      def body
        @struct.body
      end

      def [](key)
        @struct.message_attributes[key.to_s].try(:string_value)
      end

    end

  end
end