lib/redis_counters/unique_values_lists/blocking.rb
# coding: utf-8
require 'redis_counters/unique_values_lists/base'
module RedisCounters
module UniqueValuesLists
# Список уникальных значений, на основе механизма оптимистических блокировок.
#
# смотри Optimistic locking using check-and-set:
# http://redis.io/topics/transactions
#
# Особенности:
# * Значения сохраняет в партициях;
# * Ведет список партиций;
# * Полностью транзакционен.
class Blocking < Base
PARTITIONS_LIST_POSTFIX = :partitions
# Public: Проверяет существует ли заданное значение.
#
# params - Hash - параметры кластера и значения.
#
# Returns Boolean.
#
def has_value?(params)
set_params(params)
reset_partitions_cache
value_already_exists?
end
# Public: Нетранзакционно удаляет данные конкретной конечной партиции.
#
# params - Hash - хеш параметров, определяющий кластер и партицию.
# write_session - Redis - соединение с Redis, в рамках которого
# будет производится удаление (опционально).
# По умолчанию - основное соединение счетчика.
#
# Если передан блок, то вызывает блок, после удаления всех данных, в транзакции.
#
# Returns Nothing.
#
def delete_partition_direct!(params = {}, write_session = redis)
super(params, write_session)
# удаляем партицию из списка
return unless use_partitions?
cluster = ::RedisCounters::Cluster.new(self, params).params
partition = ::RedisCounters::Partition.new(self, params).params(:only_leaf => true)
partition = partition.flatten.join(key_delimiter)
write_session.lrem(partitions_list_key(cluster), 0, partition)
end
protected
def key(partition = partition_params, cluster = cluster_params)
return super if use_partitions?
raise 'Array required' if partition && !partition.is_a?(Array)
raise 'Array required' if cluster && !cluster.is_a?(Array)
[counter_name, cluster, partition].flatten.compact.join(key_delimiter)
end
def process_value
loop do
reset_partitions_cache
watch_partitions_list
watch_all_partitions
if value_already_exists?
redis.unwatch
return false
end
result = transaction do
add_value
add_partition
yield redis if block_given?
end
return true if result.present?
end
end
def reset_partitions_cache
@partitions = nil
end
def watch_partitions_list
return unless use_partitions?
redis.watch(partitions_list_key)
end
def watch_all_partitions
all_partitions.each do |partition|
redis.watch(key(partition))
end
end
def value_already_exists?
all_partitions.reverse.any? do |partition|
redis.sismember(key(partition), value)
end
end
def add_value
redis.sadd(key, value)
end
def all_partitions(cluster = cluster_params)
return @partitions unless @partitions.nil?
return (@partitions = [nil]) unless use_partitions?
@partitions = redis.lrange(partitions_list_key(cluster), 0, -1)
@partitions = @partitions.map do |partition|
partition.split(key_delimiter, -1)
end.delete_if(&:empty?)
end
def add_partition
return unless use_partitions?
return unless new_partition?
redis.rpush(partitions_list_key, current_partition)
end
def partitions_list_key(cluster = cluster_params)
raise 'Array required' if cluster && !cluster.is_a?(Array)
[counter_name, cluster, PARTITIONS_LIST_POSTFIX].flatten.join(key_delimiter)
end
def current_partition
partition_params.flatten.join(key_delimiter)
end
def new_partition?
!all_partitions.include?(current_partition.split(key_delimiter))
end
# Protected: Возвращает массив листовых партиций в виде ключей.
#
# Если кластер не указан и нет кластеризации в счетчике, то возвращает все партиции.
# Если партиция не указана, возвращает все партиции кластера (все партиции, если нет кластеризации).
#
# params - Hash - хеш параметров, определяющий кластер и партицию.
#
# Returns Array of Hash.
#
def partitions_keys(params = {})
reset_partitions_cache
cluster = ::RedisCounters::Cluster.new(self, params).params
partition = ::RedisCounters::Partition.new(self, params).params
partitions_keys = all_partitions(cluster).map { |part| key(part, cluster) }
fuzzy_pattern = key(partition, cluster)
partitions_keys.select { |part| part.start_with?(fuzzy_pattern) }
end
end
end
end