fluent/fluentd

View on GitHub
lib/fluent/counter/server.rb

Summary

Maintainability
A
25 mins
Test Coverage
#
# Fluentd
#
#    Licensed under the Apache License, Version 2.0 (the "License");
#    you may not use this file except in compliance with the License.
#    You may obtain a copy of the License at
#
#        http://www.apache.org/licenses/LICENSE-2.0
#
#    Unless required by applicable law or agreed to in writing, software
#    distributed under the License is distributed on an "AS IS" BASIS,
#    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#    See the License for the specific language governing permissions and
#    limitations under the License.
#

require 'cool.io'
require 'fluent/counter/base_socket'
require 'fluent/counter/validator'
require 'fluent/counter/store'
require 'fluent/counter/mutex_hash'

module Fluent
  module Counter
    class Server
      DEFAULT_ADDR = '127.0.0.1'
      DEFAULT_PORT = 24321

      def initialize(name, opt = {})
        raise 'Counter server name is invalid' unless Validator::VALID_NAME.match?(name)
        @name = name
        @opt = opt
        @addr = @opt[:addr] || DEFAULT_ADDR
        @port = @opt[:port] || DEFAULT_PORT
        @loop = @opt[:loop] || Coolio::Loop.new
        @log =  @opt[:log] || $log

        @store = Fluent::Counter::Store.new(opt)
        @mutex_hash = MutexHash.new(@store)

        @server = Coolio::TCPServer.new(@addr, @port, Handler, method(:on_message))
        @thread = nil
        @running = false
      end

      def start
        @server.attach(@loop)
        @thread = Thread.new do
          @running = true
          @loop.run(0.5)
          @running = false
        end
        @log.debug("starting counter server #{@addr}:#{@port}")
        @mutex_hash.start
        self
      end

      def stop
        # This `sleep` for a test to wait for a `@loop` to begin to run
        sleep 0.1
        @server.close
        @loop.stop if @running
        @mutex_hash.stop
        @thread.join if @thread
        @log.debug("calling stop in counter server #{@addr}:#{@port}")
      end

      def on_message(data)
        errors = Validator.request(data)
        unless errors.empty?
          return { 'id' => data['id'], 'data' => [], 'errors' => errors }
        end

        result = safe_run do
          send(data['method'], data['params'], data['scope'], data['options'])
        end
        result.merge('id' => data['id'])
      rescue => e
        @log.error e.to_s
      end

      private

      def establish(params, _scope, _options)
        validator = Fluent::Counter::ArrayValidator.new(:empty, :scope)
        valid_params, errors = validator.call(params)
        res = Response.new(errors)

        if scope = valid_params.first
          new_scope = "#{@name}\t#{scope}"
          res.push_data new_scope
          @log.debug("Establish new key: #{new_scope}")
        end

        res.to_hash
      end

      def init(params, scope, options)
        validator = Fluent::Counter::HashValidator.new(:empty, :name, :reset_interval)
        valid_params, errors = validator.call(params)
        res = Response.new(errors)
        key_hash = valid_params.reduce({}) do |acc, vp|
          acc.merge(Store.gen_key(scope, vp['name']) => vp)
        end

        do_init = lambda do |store, key|
          begin
            param = key_hash[key]
            v = store.init(key, param, ignore: options['ignore'])
            @log.debug("Create new key: #{param['name']}")
            res.push_data v
          rescue => e
            res.push_error e
          end
        end

        if options['random']
          @mutex_hash.synchronize_keys(*(key_hash.keys), &do_init)
        else
          @mutex_hash.synchronize(*(key_hash.keys), &do_init)
        end

        res.to_hash
      end

      def delete(params, scope, options)
        validator = Fluent::Counter::ArrayValidator.new(:empty, :key)
        valid_params, errors = validator.call(params)
        res = Response.new(errors)
        keys = valid_params.map { |vp| Store.gen_key(scope, vp) }

        do_delete = lambda do |store, key|
          begin
            v = store.delete(key)
            @log.debug("delete a key: #{key}")
            res.push_data v
          rescue => e
            res.push_error e
          end
        end

        if options['random']
          @mutex_hash.synchronize_keys(*keys, &do_delete)
        else
          @mutex_hash.synchronize(*keys, &do_delete)
        end

        res.to_hash
      end

      def inc(params, scope, options)
        validate_param = [:empty, :name, :value]
        validate_param << :reset_interval if options['force']
        validator = Fluent::Counter::HashValidator.new(*validate_param)
        valid_params, errors = validator.call(params)
        res = Response.new(errors)
        key_hash = valid_params.reduce({}) do |acc, vp|
          acc.merge(Store.gen_key(scope, vp['name']) => vp)
        end

        do_inc = lambda do |store, key|
          begin
            param = key_hash[key]
            v = store.inc(key, param, force: options['force'])
            @log.debug("Increment #{key} by #{param['value']}")
            res.push_data v
          rescue => e
            res.push_error e
          end
        end

        if options['random']
          @mutex_hash.synchronize_keys(*(key_hash.keys), &do_inc)
        else
          @mutex_hash.synchronize(*(key_hash.keys), &do_inc)
        end

        res.to_hash
      end

      def reset(params, scope, options)
        validator = Fluent::Counter::ArrayValidator.new(:empty, :key)
        valid_params, errors = validator.call(params)
        res = Response.new(errors)
        keys = valid_params.map { |vp| Store.gen_key(scope, vp) }

        do_reset = lambda do |store, key|
          begin
            v = store.reset(key)
            @log.debug("Reset #{key}'s' counter value")
            res.push_data v
          rescue => e
            res.push_error e
          end
        end

        if options['random']
          @mutex_hash.synchronize_keys(*keys, &do_reset)
        else
          @mutex_hash.synchronize(*keys, &do_reset)
        end

        res.to_hash
      end

      def get(params, scope, _options)
        validator = Fluent::Counter::ArrayValidator.new(:empty, :key)
        valid_params, errors = validator.call(params)
        res = Response.new(errors)

        keys = valid_params.map { |vp| Store.gen_key(scope, vp) }
        keys.each do |key|
          begin
            v = @store.get(key, raise_error: true)
            @log.debug("Get counter value: #{key}")
            res.push_data v
          rescue => e
            res.push_error e
          end
        end
        res.to_hash
      end

      def safe_run
        yield
      rescue => e
        {
          'errors' => [InternalServerError.new(e).to_hash],
          'data' => []
        }
      end

      class Response
        def initialize(errors = [], data = [])
          @errors = errors
          @data = data
        end

        def push_error(error)
          @errors << error
        end

        def push_data(data)
          @data << data
        end

        def to_hash
          if @errors.empty?
            { 'data' => @data }
          else
            errors = @errors.map do |e|
              error = e.respond_to?(:to_hash) ? e : InternalServerError.new(e.to_s)
              error.to_hash
            end
            { 'data' => @data, 'errors' => errors }
          end
        end
      end
    end

    class Handler < Fluent::Counter::BaseSocket
      def initialize(io, on_message)
        super(io)
        @on_message = on_message
      end

      def on_message(data)
        res = @on_message.call(data)
        packed_write res if res
      end
    end
  end
end