abak-press/redis_counters

View on GitHub
lib/redis_counters/unique_values_lists/blocking.rb

Summary

Maintainability
A
35 mins
Test Coverage
# coding: utf-8
require 'redis_counters/unique_values_lists/base'

module RedisCounters
  module UniqueValuesLists

    # Список уникальных значений, на основе механизма оптимистических блокировок.
    #
    # смотри Optimistic locking using check-and-set:
    # http://redis.io/topics/transactions
    #
    # Особенности:
    #   * Значения сохраняет в партициях;
    #   * Ведет список партиций;
    #   * Полностью транзакционен.

    class Blocking < Base
      PARTITIONS_LIST_POSTFIX = :partitions

      # Public: Проверяет существует ли заданное значение.
      #
      # params - Hash - параметры кластера и значения.
      #
      # Returns Boolean.
      #
      def has_value?(params)
        set_params(params)
        reset_partitions_cache
        value_already_exists?
      end

      # Public: Нетранзакционно удаляет данные конкретной конечной партиции.
      #
      # params        - Hash - хеш параметров, определяющий кластер и партицию.
      # write_session - Redis - соединение с Redis, в рамках которого
      #                 будет производится удаление (опционально).
      #                 По умолчанию - основное соединение счетчика.
      #
      # Если передан блок, то вызывает блок, после удаления всех данных, в транзакции.
      #
      # Returns Nothing.
      #
      def delete_partition_direct!(params = {}, write_session = redis)
        super(params, write_session)

        # удаляем партицию из списка
        return unless use_partitions?

        cluster = ::RedisCounters::Cluster.new(self, params).params
        partition = ::RedisCounters::Partition.new(self, params).params(:only_leaf => true)

        partition = partition.flatten.join(key_delimiter)
        write_session.lrem(partitions_list_key(cluster), 0, partition)
      end

      protected

      def key(partition = partition_params, cluster = cluster_params)
        return super if use_partitions?

        raise 'Array required' if partition && !partition.is_a?(Array)
        raise 'Array required' if cluster && !cluster.is_a?(Array)

        [counter_name, cluster, partition].flatten.compact.join(key_delimiter)
      end

      def process_value
        loop do
          reset_partitions_cache

          watch_partitions_list
          watch_all_partitions

          if value_already_exists?
            redis.unwatch
            return false
          end

          result = transaction do
            add_value
            add_partition
            yield redis if block_given?
          end

          return true if result.present?
        end
      end

      def reset_partitions_cache
        @partitions = nil
      end

      def watch_partitions_list
        return unless use_partitions?
        redis.watch(partitions_list_key)
      end

      def watch_all_partitions
        all_partitions.each do |partition|
          redis.watch(key(partition))
        end
      end

      def value_already_exists?
        all_partitions.reverse.any? do |partition|
          redis.sismember(key(partition), value)
        end
      end

      def add_value
        redis.sadd(key, value)
      end

      def all_partitions(cluster = cluster_params)
        return @partitions unless @partitions.nil?
        return (@partitions = [nil]) unless use_partitions?

        @partitions = redis.lrange(partitions_list_key(cluster), 0, -1)
        @partitions = @partitions.map do |partition|
          partition.split(key_delimiter, -1)
        end.delete_if(&:empty?)
      end

      def add_partition
        return unless use_partitions?
        return unless new_partition?
        redis.rpush(partitions_list_key, current_partition)
      end

      def partitions_list_key(cluster = cluster_params)
        raise 'Array required' if cluster && !cluster.is_a?(Array)

        [counter_name, cluster, PARTITIONS_LIST_POSTFIX].flatten.join(key_delimiter)
      end

      def current_partition
        partition_params.flatten.join(key_delimiter)
      end

      def new_partition?
        !all_partitions.include?(current_partition.split(key_delimiter))
      end


      # Protected: Возвращает массив листовых партиций в виде ключей.
      #
      # Если кластер не указан и нет кластеризации в счетчике, то возвращает все партиции.
      # Если партиция не указана, возвращает все партиции кластера (все партиции, если нет кластеризации).
      #
      # params  - Hash - хеш параметров, определяющий кластер и партицию.
      #
      # Returns Array of Hash.
      #
      def partitions_keys(params = {})
        reset_partitions_cache

        cluster = ::RedisCounters::Cluster.new(self, params).params
        partition = ::RedisCounters::Partition.new(self, params).params

        partitions_keys = all_partitions(cluster).map { |part| key(part, cluster) }

        fuzzy_pattern = key(partition, cluster)
        partitions_keys.select { |part| part.start_with?(fuzzy_pattern) }
      end
    end
  end
end