sensu/sensu-transport

View on GitHub
lib/sensu/transport/redis.rb

Summary

Maintainability
A
3 hrs
Test Coverage
require "sensu/redis"

require File.join(File.dirname(__FILE__), "base")

module Sensu
  module Transport
    class Redis < Base
      # The Redis keyspace to use for the transport.
      REDIS_KEYSPACE = "transport"

      def initialize
        @options = {}
        @connections = {}
        super
      end

      # Redis transport connection setup. This method sets `@options`,
      # creates a named Redis connection "redis", and sets the deferred
      # status to `:succeeded` via `succeed()`.
      #
      # @param options [Hash, String]
      def connect(options={})
        @options = options || {}
        redis_connection("redis") do |connection|
          connection.callback do
            succeed
          end
        end
      end

      # Reconnect to the Redis transport. The Redis connections used
      # by the transport have auto-reconnect disabled; if a single
      # connection is unhealthy, all connections are closed, the
      # transport is reset, and new connections are made. If the
      # transport is not already reconnecting to Redis, the
      # `@before_reconnect` transport callback is called.
      #
      # @param force [Boolean] the reconnect.
      def reconnect(force=false)
        @before_reconnect.call unless @reconnecting
        unless @reconnecting && !force
          @reconnecting = true
          close
          reset
          connect(@options)
        end
      end

      # Indicates if ALL Redis connections are connected.
      #
      # @return [TrueClass, FalseClass]
      def connected?
        !@connections.empty? && @connections.values.all? do |connection|
          connection.connected?
        end
      end

      # Close ALL Redis connections.
      def close
        @connections.each_value do |connection|
          connection.close
        end
      end

      # Publish a message to the Redis transport. The transport pipe
      # type determines the method of sending messages to consumers
      # using Redis, either using PubSub or a list. The appropriate
      # publish method is call for the pipe type given. The Redis
      # transport ignores publish options.
      #
      # @param type [Symbol] the transport pipe type, possible values
      #   are: :direct and :fanout.
      # @param pipe [String] the transport pipe name.
      # @param message [String] the message to be published to the transport.
      # @param options [Hash] IGNORED by this transport.
      # @yield [info] passes publish info to an optional callback/block.
      # @yieldparam info [Hash] contains publish information, which
      #   may contain an error object.
      def publish(type, pipe, message, options={}, &callback)
        case type.to_sym
        when :fanout
          pubsub_publish(pipe, message, &callback)
        when :direct
          list_publish(pipe, message, &callback)
        end
      end

      # Subscribe to a Redis transport pipe. The transport pipe
      # type determines the method of consuming messages from Redis,
      # either using PubSub or a list. The appropriate subscribe
      # method is call for the pipe type given. The Redis transport
      # ignores subscribe options and the funnel name.
      #
      # @param type [Symbol] the transport pipe type, possible values
      #   are: :direct and :fanout.
      # @param pipe [String] the transport pipe name.
      # @param funnel [String] IGNORED by this transport.
      # @param options [Hash] IGNORED by this transport.
      # @yield [info, message] passes message info and content to
      #   the consumer callback/block.
      # @yieldparam info [Hash] contains message information.
      # @yieldparam message [String] message.
      def subscribe(type, pipe, funnel=nil, options={}, &callback)
        case type.to_sym
        when :fanout
          pubsub_subscribe(pipe, &callback)
        when :direct
          list_subscribe(pipe, &callback)
        end
      end

      # Unsubscribe from all transport pipes. This method iterates
      # through the current named Redis connections, unsubscribing the
      # "pubsub" connection from Redis channels, and closing/deleting
      # BLPOP connections.
      #
      # @yield [info] passes info to an optional callback/block.
      # @yieldparam info [Hash] empty hash.
      def unsubscribe
        @connections.each do |name, connection|
          case name
          when "pubsub"
            connection.unsubscribe
          when /^#{REDIS_KEYSPACE}/
            connection.close
            @connections.delete(name)
          end
        end
        super
      end

      # Redis transport pipe/funnel stats, such as message and
      # consumer counts. This method is currently unable to determine
      # the consumer count for a Redis list.
      #
      # @param funnel [String] the transport funnel to get stats for.
      # @param options [Hash] IGNORED by this transport.
      # @yield [info] passes list stats to the callback/block.
      # @yieldparam info [Hash] contains list stats.
      def stats(funnel, options={})
        redis_connection("redis") do |connection|
          connection.llen(funnel) do |messages|
            info = {
              :messages => messages,
              :consumers => 0
            }
            yield(info)
          end
        end
      end

      private

      # Reset instance variables, called when reconnecting.
      def reset
        @connections = {}
      end

      # Monitor current Redis connections, the connection "pool". A
      # timer is used to check on the connections, every `3` seconds.
      # If one or more connections is not connected, a forced
      # `reconnect()` is triggered. If all connections are connected
      # after reconnecting, the transport `@after_reconnect`
      # callback is called. If a connection monitor (timer) already
      # exists, it is canceled.
      def monitor_connections
        @connection_monitor.cancel if @connection_monitor
        @connection_monitor = EM::PeriodicTimer.new(3) do
          if !connected?
            reconnect(true)
          elsif @reconnecting
            @after_reconnect.call
            @reconnecting = false
          end
        end
      end

      # Return or setup a named Redis connection. This method creates
      # a Redis connection object using the provided Redis transport
      # options. Redis auto-reconnect is disabled as the connection
      # "pool" is monitored as a whole. The transport `@on_error`
      # callback is called when Redis errors are encountered. This
      # method creates/replaces the connection monitor after setting
      # up the connection and before adding it to the pool.
      #
      # @param name [String] the Redis connection name.
      # @yield [Object] passes the named connection object to the
      #   callback/block.
      def redis_connection(name)
        if @connections[name]
          yield(@connections[name])
        else
          Sensu::Redis.connect(@options) do |connection|
            connection.auto_reconnect = false
            connection.reconnect_on_error = false
            connection.on_error do |error|
              @on_error.call(error)
            end
            monitor_connections
            @connections[name] = connection
            yield(connection)
          end
        end
      end

      # Create a Redis key within the defined Redis keyspace. This
      # method is used to create keys that are unlikely to collide.
      # The Redis connection database number is included in the Redis
      # key as pubsub is not scoped to the selected database.
      #
      # @param type [String]
      # @param name [String]
      # @return [String]
      def redis_key(type, name)
        db = @options.is_a?(Hash) ? (@options[:db] || 0) : 0
        [REDIS_KEYSPACE, db, type, name].join(":")
      end

      # Publish a message to a Redis channel (PubSub). The
      # `redis_key()` method is used to create a Redis channel key,
      # using the transport pipe name. The publish callback info
      # includes the current subscriber count for the Redis channel.
      #
      # http://redis.io/topics/pubsub
      #
      # @param pipe [String] the transport pipe name.
      # @param message [String] the message to be published to the transport.
      # @yield [info] passes publish info to an optional callback/block.
      # @yieldparam info [Hash] contains publish information.
      # @yieldparam subscribers [String] current subscriber count.
      def pubsub_publish(pipe, message)
        channel = redis_key("channel", pipe)
        redis_connection("redis") do |connection|
          connection.publish(channel, message) do |subscribers|
            info = {:subscribers => subscribers}
            yield(info) if block_given?
          end
        end
      end

      # Subscribe to a Redis channel (PubSub). The `redis_key()`
      # method is used to create a Redis channel key, using the
      # transport pipe name. The named Redis connection "pubsub" is
      # used for the Redis SUBSCRIBE command set, as the Redis context
      # is limited and enforced for the connection. The subscribe
      # callback is called whenever a message is published to the
      # Redis channel. Channel messages with the type "subscribe" and
      # "unsubscribe" are ignored, only messages with type "message"
      # are passsed to the provided consumer/method callback/block.
      #
      # http://redis.io/topics/pubsub
      #
      # @param pipe [String] the transport pipe name.
      # @yield [info, message] passes message info and content to
      #   the consumer/method callback/block.
      # @yieldparam info [Hash] contains the channel name.
      # @yieldparam message [String] message content.
      def pubsub_subscribe(pipe)
        channel = redis_key("channel", pipe)
        redis_connection("pubsub") do |connection|
          connection.subscribe(channel) do |type, channel, message|
            case type
            when "subscribe"
              @logger.debug("subscribed to redis channel: #{channel}") if @logger
            when "unsubscribe"
              @logger.debug("unsubscribed from redis channel: #{channel}") if @logger
            when "message"
              info = {:channel => channel}
              yield(info, message)
            end
          end
        end
      end

      # Push (publish) a message onto a Redis list. The `redis_key()`
      # method is used to create a Redis list key, using the transport
      # pipe name. The publish callback info includes the current list
      # size (queued).
      #
      # @param pipe [String] the transport pipe name.
      # @param message [String] the message to be published to the transport.
      # @yield [info] passes publish info to an optional callback/block.
      # @yieldparam info [Hash] contains publish information.
      # @yieldparam queued [String] current list size.
      def list_publish(pipe, message)
        list = redis_key("list", pipe)
        redis_connection("redis") do |connection|
          connection.rpush(list, message) do |queued|
            info = {:queued => queued}
            yield(info) if block_given?
          end
        end
      end

      # Shift a message off of a Redis list and schedule another shift
      # on the next tick of the event loop (reactor). Redis BLPOP is a
      # connection blocking Redis command, this method creates a named
      # Redis connection for each list. Multiple Redis connections for
      # BLPOP commands is far more efficient than timer or next tick
      # polling with LPOP.
      #
      # @param list [String]
      # @yield [info, message] passes message info and content to
      #   the consumer/method callback/block.
      # @yieldparam info [Hash] an empty hash.
      # @yieldparam message [String] message content.
      def list_blpop(list, &callback)
        redis_connection(list) do |connection|
          connection.blpop(list, 0) do |_, message|
            EM::next_tick { list_blpop(list, &callback) }
            callback.call({}, message)
          end
        end
      end

      # Subscribe to a Redis list, shifting message off as they become
      # available. The `redis_key()` method is used to create a Redis
      # list key, using the transport pipe name. The `list_blpop()`
      # method is used to do the actual work.
      #
      # @param pipe [String] the transport pipe name.
      # @yield [info, message] passes message info and content to
      #   the consumer/method callback/block.
      # @yieldparam info [Hash] an empty hash.
      # @yieldparam message [String] message content.
      def list_subscribe(pipe, &callback)
        list = redis_key("list", pipe)
        list_blpop(list, &callback)
      end
    end
  end
end