lib/fluent/plugin_helper/service_discovery/manager.rb
#
# Fluentd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
require 'fluent/plugin/service_discovery'
require 'fluent/plugin_helper/service_discovery/round_robin_balancer'
module Fluent
module PluginHelper
module ServiceDiscovery
class Manager
def initialize(log:, load_balancer: nil, custom_build_method: nil)
@log = log
@load_balancer = load_balancer || RoundRobinBalancer.new
@custom_build_method = custom_build_method
@discoveries = []
@services = {}
@queue = Queue.new
@static_config = true
end
def configure(configs, parent: nil)
configs.each do |config|
type, conf = if config.has_key?(:conf) # for compatibility with initial API
[config[:type], config[:conf]]
else
[config['@type'], config]
end
sd = Fluent::Plugin.new_sd(type, parent: parent)
sd.configure(conf)
sd.services.each do |s|
@services[s.discovery_id] = build_service(s)
end
@discoveries << sd
if @static_config && type.to_sym != :static
@static_config = false
end
end
rebalance
end
def static_config?
@static_config
end
def start
@discoveries.each do |d|
d.start(@queue)
end
end
%i[after_start stop before_shutdown shutdown after_shutdown close terminate].each do |mth|
define_method(mth) do
@discoveries.each do |d|
d.__send__(mth)
end
end
end
def run_once
# Don't care race in this loop intentionally
s = @queue.size
if s == 0
return
end
s.times do
msg = @queue.pop
unless msg.is_a?(Fluent::Plugin::ServiceDiscovery::DiscoveryMessage)
@log.warn("BUG: #{msg}")
next
end
begin
handle_message(msg)
rescue => e
@log.error(e)
end
end
rebalance
end
def rebalance
@load_balancer.rebalance(services)
end
def select_service(&block)
@load_balancer.select_service(&block)
end
def services
@services.values
end
private
def handle_message(msg)
service = msg.service
case msg.type
when Fluent::Plugin::ServiceDiscovery::SERVICE_IN
if (n = build_service(service))
@log.info("Service in: name=#{service.name} #{service.host}:#{service.port}")
@services[service.discovery_id] = n
else
raise "failed to build service in name=#{service.name} #{service.host}:#{service.port}"
end
when Fluent::Plugin::ServiceDiscovery::SERVICE_OUT
s = @services.delete(service.discovery_id)
if s
@log.info("Service out: name=#{service.name} #{service.host}:#{service.port}")
else
@log.warn("Not found service: name=#{service.name} #{service.host}:#{service.port}")
end
else
@log.error("BUG: unknow message type: #{msg.type}")
end
end
def build_service(n)
@custom_build_method ? @custom_build_method.call(n) : n
end
end
end
end
end