lib/fluent/plugin_helper/http_server.rb
#
# Fluentd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
begin
require 'async'
require 'fluent/plugin_helper/http_server/server'
rescue LoadError => _
require 'fluent/plugin_helper/http_server/compat/server'
Fluent::PluginHelper::HttpServer::Server = Fluent::PluginHelper::HttpServer::Compat::Server
end
require 'fluent/plugin_helper/thread'
require 'fluent/plugin_helper/server' # For Server::ServerTransportParams
require 'fluent/plugin_helper/http_server/ssl_context_builder'
module Fluent
module PluginHelper
module HttpServer
include Fluent::PluginHelper::Thread
include Fluent::Configurable
# stop : stop http server and mark callback thread as stopped
# shutdown : [-]
# close : correct stopped threads
# terminate: kill thread
def self.included(mod)
mod.include Fluent::PluginHelper::Server::ServerTransportParams
end
def initialize(*)
super
@_http_server = nil
end
def create_http_server(title, addr:, port:, logger:, default_app: nil, proto: nil, tls_opts: nil, &block)
logger.warn('this method is deprecated. Use #http_server_create_http_server instead')
http_server_create_http_server(title, addr: addr, port: port, logger: logger, default_app: default_app, proto: proto, tls_opts: tls_opts, &block)
end
# @param title [Symbol] the thread name. this value should be unique.
# @param addr [String] Listen address
# @param port [String] Listen port
# @param logger [Logger] logger used in this server
# @param default_app [Object] This method must have #call.
# @param proto [Symbol] :tls or :tcp
# @param tls_opts [Hash] options for TLS.
def http_server_create_http_server(title, addr:, port:, logger:, default_app: nil, proto: nil, tls_opts: nil, &block)
unless block_given?
raise ArgumentError, 'BUG: callback not specified'
end
if proto == :tls || (@transport_config && @transport_config.protocol == :tls)
http_server_create_https_server(title, addr: addr, port: port, logger: logger, default_app: default_app, tls_opts: tls_opts, &block)
else
@_http_server = HttpServer::Server.new(addr: addr, port: port, logger: logger, default_app: default_app) do |serv|
yield(serv)
end
_block_until_http_server_start do |notify|
thread_create(title) do
@_http_server.start(notify)
end
end
end
end
# @param title [Symbol] the thread name. this value should be unique.
# @param addr [String] Listen address
# @param port [String] Listen port
# @param logger [Logger] logger used in this server
# @param default_app [Object] This method must have #call.
# @param tls_opts [Hash] options for TLS.
def http_server_create_https_server(title, addr:, port:, logger:, default_app: nil, tls_opts: nil)
topt =
if tls_opts
_http_server_overwrite_config(@transport_config, tls_opts)
else
@transport_config
end
ctx = Fluent::PluginHelper::HttpServer::SSLContextBuilder.new($log).build(topt)
@_http_server = HttpServer::Server.new(addr: addr, port: port, logger: logger, default_app: default_app, tls_context: ctx) do |serv|
yield(serv)
end
_block_until_http_server_start do |notify|
thread_create(title) do
@_http_server.start(notify)
end
end
end
def stop
if @_http_server
@_http_server.stop
end
super
end
private
def _http_server_overwrite_config(config, opts)
conf = config.dup
Fluent::PluginHelper::Server::SERVER_TRANSPORT_PARAMS.map(&:to_s).each do |param|
if opts.key?(param)
conf[param] = opts[param]
end
end
conf
end
# To block until server is ready to listen
def _block_until_http_server_start
que = Queue.new
yield(que)
que.pop
end
end
end
end