fluent/fluentd

View on GitHub
lib/fluent/counter/client.rb

Summary

Maintainability
A
1 hr
Test Coverage
#
# Fluentd
#
#    Licensed under the Apache License, Version 2.0 (the "License");
#    you may not use this file except in compliance with the License.
#    You may obtain a copy of the License at
#
#        http://www.apache.org/licenses/LICENSE-2.0
#
#    Unless required by applicable law or agreed to in writing, software
#    distributed under the License is distributed on an "AS IS" BASIS,
#    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#    See the License for the specific language governing permissions and
#    limitations under the License.
#

require 'cool.io'
require 'fluent/counter/base_socket'
require 'fluent/counter/error'
require 'timeout'

module Fluent
  module Counter
    class Client
      DEFAULT_PORT = 24321
      DEFAULT_ADDR = '127.0.0.1'
      DEFAULT_TIMEOUT = 5
      ID_LIMIT_COUNT = 1 << 31

      def initialize(loop = nil, opt = {})
        @loop = loop || Coolio::Loop.new
        @port = opt[:port] || DEFAULT_PORT
        @host = opt[:host] || DEFAULT_ADDR
        @log = opt[:log] || $log
        @timeout = opt[:timeout] || DEFAULT_TIMEOUT
        @conn = Connection.connect(@host, @port, method(:on_message))
        @responses = {}
        @id = 0
        @id_mutex = Mutex.new
        @loop_mutex = Mutex.new
      end

      def start
        @loop.attach(@conn)
        @log.debug("starting counter client: #{@host}:#{@port}")
        self
      rescue => e
        if @log
          @log.error e
        else
          STDERR.puts e
        end
      end

      def stop
        @conn.close
        @log.debug("calling stop in counter client: #{@host}:#{@port}")
      end

      def establish(scope)
        scope = Timeout.timeout(@timeout) {
          response = send_request('establish', nil, [scope])
          Fluent::Counter.raise_error(response.errors.first) if response.errors?
          data = response.data
          data.first
        }
        @scope = scope
      rescue Timeout::Error
        raise "Can't establish the connection to counter server due to timeout"
      end

      # === Example
      # `init` receives various arguments.
      #
      # 1. init(name: 'name')
      # 2. init({ name: 'name',reset_interval: 20 }, options: {})
      # 3. init([{ name: 'name1',reset_interval: 20 }, { name: 'name2',reset_interval: 20 }])
      # 4. init([{ name: 'name1',reset_interval: 20 }, { name: 'name2',reset_interval: 20 }], options: {})
      # 5. init([{ name: 'name1',reset_interval: 20 }, { name: 'name2',reset_interval: 20 }]) { |res| ... }
      def init(params, options: {})
        exist_scope!
        params = [params] unless params.is_a?(Array)
        res = send_request('init', @scope, params, options)

        # if `async` is false or missing, block at this method and return a Future::Result object.
        if block_given?
          Thread.start do
            yield res.get
          end
        else
          res
        end
      end

      def delete(*params, options: {})
        exist_scope!
        res = send_request('delete', @scope, params, options)

        if block_given?
          Thread.start do
            yield res.get
          end
        else
          res
        end
      end

      # === Example
      # `inc` receives various arguments.
      #
      # 1. inc(name: 'name')
      # 2. inc({ name: 'name',value: 20 }, options: {})
      # 3. inc([{ name: 'name1',value: 20 }, { name: 'name2',value: 20 }])
      # 4. inc([{ name: 'name1',value: 20 }, { name: 'name2',value: 20 }], options: {})
      def inc(params, options: {})
        exist_scope!
        params = [params] unless params.is_a?(Array)
        res = send_request('inc', @scope, params, options)

        if block_given?
          Thread.start do
            yield res.get
          end
        else
          res
        end
      end

      def get(*params, options: {})
        exist_scope!
        res = send_request('get', @scope, params, options)

        if block_given?
          Thread.start do
            yield res.get
          end
        else
          res
        end
      end

      def reset(*params, options: {})
        exist_scope!
        res = send_request('reset', @scope, params, options)

        if block_given?
          Thread.start do
            yield res.get
          end
        else
          res
        end
      end

      private

      def exist_scope!
        raise 'Call `establish` method to get a `scope` before calling this method' unless @scope
      end

      def on_message(data)
        if response = @responses.delete(data['id'])
          response.set(data)
        else
          @log.warn("Receiving missing id data: #{data}")
        end
      end

      def send_request(method, scope, params, opt = {})
        id = generate_id
        res = Future.new(@loop, @loop_mutex)
        @responses[id] = res # set a response value to this future object at `on_message`
        request = build_request(method, id, scope, params, opt)
        @log.debug(request)
        @conn.send_data request
        res
      end

      def build_request(method, id, scope = nil, params = nil, options = nil)
        r = { id: id, method: method }
        r[:scope] = scope if scope
        r[:params] = params if params
        r[:options] = options if options
        r
      end

      def generate_id
        id = 0
        @id_mutex.synchronize do
          id = @id
          @id += 1
          @id = 0 if ID_LIMIT_COUNT < @id
        end
        id
      end
    end

    class Connection < Fluent::Counter::BaseSocket
      def initialize(io, on_message)
        super(io)
        @connection = false
        @buffer = ''
        @on_message = on_message
      end

      def send_data(data)
        if @connection
          packed_write data
        else
          @buffer += pack(data)
        end
      end

      def on_connect
        @connection = true
        write @buffer
        @buffer = ''
      end

      def on_close
        @connection = false
      end

      def on_message(data)
        @on_message.call(data)
      end
    end

    class Future
      class Result
        attr_reader :data, :errors

        def initialize(result)
          @errors = result['errors']
          @data = result['data']
        end

        def success?
          @errors.nil? || @errors.empty?
        end

        def error?
          !success?
        end
      end

      def initialize(loop, mutex)
        @set = false
        @result = nil
        @mutex = mutex
        @loop = loop
      end

      def set(v)
        @result = Result.new(v)
        @set = true
      end

      def errors
        get.errors
      end

      def errors?
        es = errors
        es && !es.empty?
      end

      def data
        get.data
      end

      def get
        # Block until `set` method is called and @result is set
        join if @result.nil?
        @result
      end

      def wait
        res = get
        if res.error?
          Fluent::Counter.raise_error(res.errors.first)
        end
        res
      end

      private

      def join
        until @set
          @mutex.synchronize do
            @loop.run_once(0.0001) # return a lock as soon as possible
          end
        end
      end
    end
  end
end