fluent/fluentd

View on GitHub
lib/fluent/plugin_helper/thread.rb

Summary

Maintainability
B
4 hrs
Test Coverage
#
# Fluentd
#
#    Licensed under the Apache License, Version 2.0 (the "License");
#    you may not use this file except in compliance with the License.
#    You may obtain a copy of the License at
#
#        http://www.apache.org/licenses/LICENSE-2.0
#
#    Unless required by applicable law or agreed to in writing, software
#    distributed under the License is distributed on an "AS IS" BASIS,
#    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#    See the License for the specific language governing permissions and
#    limitations under the License.
#

require 'fluent/clock'

module Fluent
  module PluginHelper
    module Thread
      THREAD_DEFAULT_WAIT_SECONDS = 1
      THREAD_SHUTDOWN_HARD_TIMEOUT_IN_TESTS = 100 # second

      # stop     : mark callback thread as stopped
      # shutdown : [-]
      # close    : correct stopped threads
      # terminate: kill all threads

      attr_reader :_threads # for test driver

      def thread_current_running?
        # checker for code in callback of thread_create
        ::Thread.current[:_fluentd_plugin_helper_thread_running] || false
      end

      def thread_wait_until_start
        until @_threads_mutex.synchronize{ @_threads.values.reduce(true){|r,t| r && t[:_fluentd_plugin_helper_thread_started] } }
          sleep 0.1
        end
      end

      def thread_wait_until_stop
        timeout_at = Fluent::Clock.now + THREAD_SHUTDOWN_HARD_TIMEOUT_IN_TESTS
        until @_threads_mutex.synchronize{ @_threads.values.reduce(true){|r,t| r && !t[:_fluentd_plugin_helper_thread_running] } }
          break if Fluent::Clock.now > timeout_at
          sleep 0.1
        end
        @_threads_mutex.synchronize{ @_threads.values }.each do |t|
          if t.alive?
            puts "going to kill the thread still running: #{t[:_fluentd_plugin_helper_thread_title]}"
            t.kill rescue nil
            t[:_fluentd_plugin_helper_thread_running] = false
          end
        end
      end

      # Ruby 2.2.3 or earlier (and all 2.1.x) cause bug about Threading ("Stack consistency error")
      #  by passing splatted argument to `yield`
      # https://bugs.ruby-lang.org/issues/11027
      # We can enable to pass arguments after expire of Ruby 2.1 (& older 2.2.x)
      # def thread_create(title, *args)
      #   Thread.new(*args) do |*t_args|
      #     yield *t_args
      def thread_create(title)
        raise ArgumentError, "BUG: title must be a symbol" unless title.is_a? Symbol
        raise ArgumentError, "BUG: callback not specified" unless block_given?
        m = Mutex.new
        m.lock
        thread = ::Thread.new do
          m.lock # run thread after that thread is successfully set into @_threads
          m.unlock
          thread_exit = false
          ::Thread.current[:_fluentd_plugin_helper_thread_title] = title
          ::Thread.current[:_fluentd_plugin_helper_thread_started] = true
          ::Thread.current[:_fluentd_plugin_helper_thread_running] = true
          begin
            yield
            thread_exit = true
          rescue Exception => e
            log.warn "thread exited by unexpected error", plugin: self.class, title: title, error: e
            thread_exit = true
            raise
          ensure
            @_threads_mutex.synchronize do
              @_threads.delete(::Thread.current.object_id)
            end
            ::Thread.current[:_fluentd_plugin_helper_thread_running] = false
            if ::Thread.current.alive? && !thread_exit
              log.warn "thread doesn't exit correctly (killed or other reason)", plugin: self.class, title: title, thread: ::Thread.current, error: $!
            end
          end
        end
        thread.abort_on_exception = true
        thread.name = title.to_s if thread.respond_to?(:name)
        @_threads_mutex.synchronize do
          @_threads[thread.object_id] = thread
        end
        m.unlock
        thread
      end

      def thread_exist?(title)
        @_threads.values.count{|thread| title == thread[:_fluentd_plugin_helper_thread_title] } > 0
      end

      def thread_started?(title)
        t = @_threads.values.find{|thread| title == thread[:_fluentd_plugin_helper_thread_title] }
        t && t[:_fluentd_plugin_helper_thread_started]
      end

      def thread_running?(title)
        t = @_threads.values.find{|thread| title == thread[:_fluentd_plugin_helper_thread_title] }
        t && t[:_fluentd_plugin_helper_thread_running]
      end

      def initialize
        super
        @_threads_mutex = Mutex.new
        @_threads = {}
        @_thread_wait_seconds = THREAD_DEFAULT_WAIT_SECONDS
      end

      def stop
        super
        wakeup_threads = []
        @_threads_mutex.synchronize do
          @_threads.each_value do |thread|
            thread[:_fluentd_plugin_helper_thread_running] = false
            wakeup_threads << thread if thread.alive? && thread.status == "sleep"
          end
        end
        wakeup_threads.each do |thread|
          if thread.alive?
            thread.wakeup
          end
        end
      end

      def after_shutdown
        super
        wakeup_threads = []
        @_threads_mutex.synchronize do
          @_threads.each_value do |thread|
            wakeup_threads << thread if thread.alive? && thread.status == "sleep"
          end
        end
        wakeup_threads.each do |thread|
          thread.wakeup if thread.alive?
        end
      end

      def close
        @_threads_mutex.synchronize{ @_threads.keys }.each do |obj_id|
          thread = @_threads[obj_id]
          if !thread || thread.join(@_thread_wait_seconds)
            @_threads_mutex.synchronize{ @_threads.delete(obj_id) }
          end
        end

        super
      end

      def terminate
        super
        @_threads_mutex.synchronize{ @_threads.keys }.each do |obj_id|
          thread = @_threads[obj_id]
          log.warn "killing existing thread", thread: thread
          thread.kill if thread
        end
        @_threads_mutex.synchronize{ @_threads.keys }.each do |obj_id|
          thread = @_threads[obj_id]
          thread.join
          @_threads_mutex.synchronize{ @_threads.delete(obj_id) }
        end
        @_thread_wait_seconds = nil
      end
    end
  end
end