lib/puma/server.rb

Summary

Maintainability
F
4 days
Test Coverage
# frozen_string_literal: true

require 'stringio'

require 'puma/thread_pool'
require 'puma/const'
require 'puma/events'
require 'puma/null_io'
require 'puma/reactor'
require 'puma/client'
require 'puma/binder'
require 'puma/util'
require 'puma/io_buffer'
require 'puma/request'

require 'socket'
require 'forwardable'

module Puma

  # The HTTP Server itself. Serves out a single Rack app.
  #
  # This class is used by the `Puma::Single` and `Puma::Cluster` classes
  # to generate one or more `Puma::Server` instances capable of handling requests.
  # Each Puma process will contain one `Puma::Server` instance.
  #
  # The `Puma::Server` instance pulls requests from the socket, adds them to a
  # `Puma::Reactor` where they get eventually passed to a `Puma::ThreadPool`.
  #
  # Each `Puma::Server` will have one reactor and one thread pool.
  class Server

    include Puma::Const
    include Request
    extend Forwardable

    attr_reader :thread
    attr_reader :events
    attr_reader :min_threads, :max_threads  # for #stats
    attr_reader :requests_count             # @version 5.0.0

    # @todo the following may be deprecated in the future
    attr_reader :auto_trim_time, :early_hints, :first_data_timeout,
      :leak_stack_on_error,
      :persistent_timeout, :reaping_time

    # @deprecated v6.0.0
    attr_writer :auto_trim_time, :early_hints, :first_data_timeout,
      :leak_stack_on_error, :min_threads, :max_threads,
      :persistent_timeout, :reaping_time

    attr_accessor :app
    attr_accessor :binder

    def_delegators :@binder, :add_tcp_listener, :add_ssl_listener,
      :add_unix_listener, :connected_ports

    ThreadLocalKey = :puma_server

    # Create a server for the rack app +app+.
    #
    # +events+ is an object which will be called when certain error events occur
    # to be handled. See Puma::Events for the list of current methods to implement.
    #
    # Server#run returns a thread that you can join on to wait for the server
    # to do its work.
    #
    # @note Several instance variables exist so they are available for testing,
    #   and have default values set via +fetch+.  Normally the values are set via
    #   `::Puma::Configuration.puma_default_options`.
    #
    def initialize(app, events=Events.stdio, options={})
      @app = app
      @events = events

      @check, @notify = nil
      @status = :stop

      @auto_trim_time = 30
      @reaping_time = 1

      @thread = nil
      @thread_pool = nil

      @options = options

      @early_hints        = options.fetch :early_hints, nil
      @first_data_timeout = options.fetch :first_data_timeout, FIRST_DATA_TIMEOUT
      @min_threads        = options.fetch :min_threads, 0
      @max_threads        = options.fetch :max_threads , (Puma.mri? ? 5 : 16)
      @persistent_timeout = options.fetch :persistent_timeout, PERSISTENT_TIMEOUT
      @queue_requests     = options.fetch :queue_requests, true
      @max_fast_inline    = options.fetch :max_fast_inline, MAX_FAST_INLINE

      temp = !!(@options[:environment] =~ /\A(development|test)\z/)
      @leak_stack_on_error = @options[:environment] ? temp : true

      @binder = Binder.new(events)

      ENV['RACK_ENV'] ||= "development"

      @mode = :http

      @precheck_closing = true

      @requests_count = 0
    end

    def inherit_binder(bind)
      @binder = bind
    end

    class << self
      # @!attribute [r] current
      def current
        Thread.current[ThreadLocalKey]
      end

      # :nodoc:
      # @version 5.0.0
      def tcp_cork_supported?
        RbConfig::CONFIG['host_os'] =~ /linux/ &&
          Socket.const_defined?(:IPPROTO_TCP) &&
          Socket.const_defined?(:TCP_CORK)
      end

      # :nodoc:
      # @version 5.0.0
      def closed_socket_supported?
        RbConfig::CONFIG['host_os'] =~ /linux/ &&
          Socket.const_defined?(:IPPROTO_TCP) &&
          Socket.const_defined?(:TCP_INFO)
      end
      private :tcp_cork_supported?
      private :closed_socket_supported?
    end

    # On Linux, use TCP_CORK to better control how the TCP stack
    # packetizes our stream. This improves both latency and throughput.
    #
    if tcp_cork_supported?
      UNPACK_TCP_STATE_FROM_TCP_INFO = "C".freeze

      # 6 == Socket::IPPROTO_TCP
      # 3 == TCP_CORK
      # 1/0 == turn on/off
      def cork_socket(socket)
        begin
          socket.setsockopt(Socket::IPPROTO_TCP, Socket::TCP_CORK, 1) if socket.kind_of? TCPSocket
        rescue IOError, SystemCallError
          Thread.current.purge_interrupt_queue if Thread.current.respond_to? :purge_interrupt_queue
        end
      end

      def uncork_socket(socket)
        begin
          socket.setsockopt(Socket::IPPROTO_TCP, Socket::TCP_CORK, 0) if socket.kind_of? TCPSocket
        rescue IOError, SystemCallError
          Thread.current.purge_interrupt_queue if Thread.current.respond_to? :purge_interrupt_queue
        end
      end
    else
      def cork_socket(socket)
      end

      def uncork_socket(socket)
      end
    end

    if closed_socket_supported?
      def closed_socket?(socket)
        return false unless socket.kind_of? TCPSocket
        return false unless @precheck_closing

        begin
          tcp_info = socket.getsockopt(Socket::IPPROTO_TCP, Socket::TCP_INFO)
        rescue IOError, SystemCallError
          Thread.current.purge_interrupt_queue if Thread.current.respond_to? :purge_interrupt_queue
          @precheck_closing = false
          false
        else
          state = tcp_info.unpack(UNPACK_TCP_STATE_FROM_TCP_INFO)[0]
          # TIME_WAIT: 6, CLOSE: 7, CLOSE_WAIT: 8, LAST_ACK: 9, CLOSING: 11
          (state >= 6 && state <= 9) || state == 11
        end
      end
    else
      def closed_socket?(socket)
        false
      end
    end

    # @!attribute [r] backlog
    def backlog
      @thread_pool and @thread_pool.backlog
    end

    # @!attribute [r] running
    def running
      @thread_pool and @thread_pool.spawned
    end


    # This number represents the number of requests that
    # the server is capable of taking right now.
    #
    # For example if the number is 5 then it means
    # there are 5 threads sitting idle ready to take
    # a request. If one request comes in, then the
    # value would be 4 until it finishes processing.
    # @!attribute [r] pool_capacity
    def pool_capacity
      @thread_pool and @thread_pool.pool_capacity
    end

    # Runs the server.
    #
    # If +background+ is true (the default) then a thread is spun
    # up in the background to handle requests. Otherwise requests
    # are handled synchronously.
    #
    def run(background=true, thread_name: 'server')
      BasicSocket.do_not_reverse_lookup = true

      @events.fire :state, :booting

      @status = :run

      @thread_pool = ThreadPool.new(
        @min_threads,
        @max_threads,
        ::Puma::IOBuffer,
        &method(:process_client)
      )

      @thread_pool.out_of_band_hook = @options[:out_of_band]
      @thread_pool.clean_thread_locals = @options[:clean_thread_locals]

      if @queue_requests
        @reactor = Reactor.new(&method(:reactor_wakeup))
        @reactor.run
      end

      if @reaping_time
        @thread_pool.auto_reap!(@reaping_time)
      end

      if @auto_trim_time
        @thread_pool.auto_trim!(@auto_trim_time)
      end

      @check, @notify = Puma::Util.pipe unless @notify

      @events.fire :state, :running

      if background
        @thread = Thread.new do
          Puma.set_thread_name thread_name
          handle_servers
        end
        return @thread
      else
        handle_servers
      end
    end

    # This method is called from the Reactor thread when a queued Client receives data,
    # times out, or when the Reactor is shutting down.
    #
    # It is responsible for ensuring that a request has been completely received
    # before it starts to be processed by the ThreadPool. This may be known as read buffering.
    # If read buffering is not done, and no other read buffering is performed (such as by an application server
    # such as nginx) then the application would be subject to a slow client attack.
    #
    # For a graphical representation of how the request buffer works see [architecture.md](https://github.com/puma/puma/blob/master/docs/architecture.md#connection-pipeline).
    #
    # The method checks to see if it has the full header and body with
    # the `Puma::Client#try_to_finish` method. If the full request has been sent,
    # then the request is passed to the ThreadPool (`@thread_pool << client`)
    # so that a "worker thread" can pick up the request and begin to execute application logic.
    # The Client is then removed from the reactor (return `true`).
    #
    # If a client object times out, a 408 response is written, its connection is closed,
    # and the object is removed from the reactor (return `true`).
    #
    # If the Reactor is shutting down, all Clients are either timed out or passed to the
    # ThreadPool, depending on their current state (#can_close?).
    #
    # Otherwise, if the full request is not ready then the client will remain in the reactor
    # (return `false`). When the client sends more data to the socket the `Puma::Client` object
    # will wake up and again be checked to see if it's ready to be passed to the thread pool.
    def reactor_wakeup(client)
      shutdown = !@queue_requests
      if client.try_to_finish || (shutdown && !client.can_close?)
        @thread_pool << client
      elsif shutdown || client.timeout == 0
        client.timeout!
      end
    rescue StandardError => e
      client_error(e, client)
      client.close
      true
    end

    def handle_servers
      begin
        check = @check
        sockets = [check] + @binder.ios
        pool = @thread_pool
        queue_requests = @queue_requests

        remote_addr_value = nil
        remote_addr_header = nil

        case @options[:remote_address]
        when :value
          remote_addr_value = @options[:remote_address_value]
        when :header
          remote_addr_header = @options[:remote_address_header]
        end

        while @status == :run
          begin
            ios = IO.select sockets
            ios.first.each do |sock|
              if sock == check
                break if handle_check
              else
                pool.wait_until_not_full
                pool.wait_for_less_busy_worker(
                  @options[:wait_for_less_busy_worker].to_f)

                io = begin
                  sock.accept_nonblock
                rescue IO::WaitReadable
                  next
                end
                client = Client.new io, @binder.env(sock)
                if remote_addr_value
                  client.peerip = remote_addr_value
                elsif remote_addr_header
                  client.remote_addr_header = remote_addr_header
                end
                pool << client
              end
            end
          rescue Object => e
            @events.unknown_error e, nil, "Listen loop"
          end
        end

        @events.fire :state, @status

        if queue_requests
          @queue_requests = false
          @reactor.shutdown
        end
        graceful_shutdown if @status == :stop || @status == :restart
      rescue Exception => e
        @events.unknown_error e, nil, "Exception handling servers"
      ensure
        begin
          @check.close unless @check.closed?
        rescue Errno::EBADF, RuntimeError
          # RuntimeError is Ruby 2.2 issue, can't modify frozen IOError
          # Errno::EBADF is infrequently raised
        end
        @notify.close
        @notify = nil
        @check = nil
      end

      @events.fire :state, :done
    end

    # :nodoc:
    def handle_check
      cmd = @check.read(1)

      case cmd
      when STOP_COMMAND
        @status = :stop
        return true
      when HALT_COMMAND
        @status = :halt
        return true
      when RESTART_COMMAND
        @status = :restart
        return true
      end

      return false
    end

    # Given a connection on +client+, handle the incoming requests,
    # or queue the connection in the Reactor if no request is available.
    #
    # This method is called from a ThreadPool worker thread.
    #
    # This method supports HTTP Keep-Alive so it may, depending on if the client
    # indicates that it supports keep alive, wait for another request before
    # returning.
    #
    # Return true if one or more requests were processed.
    def process_client(client, buffer)
      # Advertise this server into the thread
      Thread.current[ThreadLocalKey] = self

      clean_thread_locals = @options[:clean_thread_locals]
      close_socket = true

      requests = 0

      begin
        if @queue_requests &&
          !client.eagerly_finish

          client.set_timeout(@first_data_timeout)
          if @reactor.add client
            close_socket = false
            return false
          end
        end

        with_force_shutdown(client) do
          client.finish(@first_data_timeout)
        end

        while true
          @requests_count += 1
          case handle_request(client, buffer)
          when false
            break
          when :async
            close_socket = false
            break
          when true
            buffer.reset

            ThreadPool.clean_thread_locals if clean_thread_locals

            requests += 1

            check_for_more_data = @status == :run

            if requests >= @max_fast_inline
              # This will mean that reset will only try to use the data it already
              # has buffered and won't try to read more data. What this means is that
              # every client, independent of their request speed, gets treated like a slow
              # one once every max_fast_inline requests.
              check_for_more_data = false
            end

            next_request_ready = with_force_shutdown(client) do
              client.reset(check_for_more_data)
            end

            unless next_request_ready
              break unless @queue_requests
              client.set_timeout @persistent_timeout
              if @reactor.add client
                close_socket = false
                break
              end
            end
          end
        end
        true
      rescue StandardError => e
        client_error(e, client)
        # The ensure tries to close +client+ down
        requests > 0
      ensure
        buffer.reset

        begin
          client.close if close_socket
        rescue IOError, SystemCallError
          Thread.current.purge_interrupt_queue if Thread.current.respond_to? :purge_interrupt_queue
          # Already closed
        rescue StandardError => e
          @events.unknown_error e, nil, "Client"
        end
      end
    end

    # Triggers a client timeout if the thread-pool shuts down
    # during execution of the provided block.
    def with_force_shutdown(client, &block)
      @thread_pool.with_force_shutdown(&block)
    rescue ThreadPool::ForceShutdown
      client.timeout!
    end

    # :nocov:

    # Given the request +env+ from +client+ and the partial body +body+
    # plus a potential Content-Length value +cl+, finish reading
    # the body and return it.
    #
    # If the body is larger than MAX_BODY, a Tempfile object is used
    # for the body, otherwise a StringIO is used.
    # @deprecated 6.0.0
    #
    def read_body(env, client, body, cl)
      content_length = cl.to_i

      remain = content_length - body.bytesize

      return StringIO.new(body) if remain <= 0

      # Use a Tempfile if there is a lot of data left
      if remain > MAX_BODY
        stream = Tempfile.new(Const::PUMA_TMP_BASE)
        stream.binmode
      else
        # The body[0,0] trick is to get an empty string in the same
        # encoding as body.
        stream = StringIO.new body[0,0]
      end

      stream.write body

      # Read an odd sized chunk so we can read even sized ones
      # after this
      chunk = client.readpartial(remain % CHUNK_SIZE)

      # No chunk means a closed socket
      unless chunk
        stream.close
        return nil
      end

      remain -= stream.write(chunk)

      # Read the rest of the chunks
      while remain > 0
        chunk = client.readpartial(CHUNK_SIZE)
        unless chunk
          stream.close
          return nil
        end

        remain -= stream.write(chunk)
      end

      stream.rewind

      return stream
    end
    # :nocov:

    # Handle various error types thrown by Client I/O operations.
    def client_error(e, client)
      # Swallow, do not log
      return if [ConnectionError, EOFError].include?(e.class)

      lowlevel_error(e, client.env)
      case e
      when MiniSSL::SSLError
        @events.ssl_error e, client.io
      when HttpParserError
        client.write_error(400)
        @events.parse_error e, client
      else
        client.write_error(500)
        @events.unknown_error e, nil, "Read"
      end
    end

    # A fallback rack response if +@app+ raises as exception.
    #
    def lowlevel_error(e, env, status=500)
      if handler = @options[:lowlevel_error_handler]
        if handler.arity == 1
          return handler.call(e)
        elsif handler.arity == 2
          return handler.call(e, env)
        else
          return handler.call(e, env, status)
        end
      end

      if @leak_stack_on_error
        [status, {}, ["Puma caught this error: #{e.message} (#{e.class})\n#{e.backtrace.join("\n")}"]]
      else
        [status, {}, ["An unhandled lowlevel error occurred. The application logs may have details.\n"]]
      end
    end

    # Wait for all outstanding requests to finish.
    #
    def graceful_shutdown
      if @options[:shutdown_debug]
        threads = Thread.list
        total = threads.size

        pid = Process.pid

        $stdout.syswrite "#{pid}: === Begin thread backtrace dump ===\n"

        threads.each_with_index do |t,i|
          $stdout.syswrite "#{pid}: Thread #{i+1}/#{total}: #{t.inspect}\n"
          $stdout.syswrite "#{pid}: #{t.backtrace.join("\n#{pid}: ")}\n\n"
        end
        $stdout.syswrite "#{pid}: === End thread backtrace dump ===\n"
      end

      if @options[:drain_on_shutdown]
        count = 0

        while true
          ios = IO.select @binder.ios, nil, nil, 0
          break unless ios

          ios.first.each do |sock|
            begin
              if io = sock.accept_nonblock
                count += 1
                client = Client.new io, @binder.env(sock)
                @thread_pool << client
              end
            rescue SystemCallError
            end
          end
        end

        @events.debug "Drained #{count} additional connections."
      end

      if @status != :restart
        @binder.close
      end

      if @thread_pool
        if timeout = @options[:force_shutdown_after]
          @thread_pool.shutdown timeout.to_f
        else
          @thread_pool.shutdown
        end
      end
    end

    def notify_safely(message)
      @notify << message
    rescue IOError, NoMethodError, Errno::EPIPE
      # The server, in another thread, is shutting down
      Thread.current.purge_interrupt_queue if Thread.current.respond_to? :purge_interrupt_queue
    rescue RuntimeError => e
      # Temporary workaround for https://bugs.ruby-lang.org/issues/13239
      if e.message.include?('IOError')
        Thread.current.purge_interrupt_queue if Thread.current.respond_to? :purge_interrupt_queue
      else
        raise e
      end
    end
    private :notify_safely

    # Stops the acceptor thread and then causes the worker threads to finish
    # off the request queue before finally exiting.

    def stop(sync=false)
      notify_safely(STOP_COMMAND)
      @thread.join if @thread && sync
    end

    def halt(sync=false)
      notify_safely(HALT_COMMAND)
      @thread.join if @thread && sync
    end

    def begin_restart(sync=false)
      notify_safely(RESTART_COMMAND)
      @thread.join if @thread && sync
    end

    def shutting_down?
      @status == :stop || @status == :restart
    end

    # List of methods invoked by #stats.
    # @version 5.0.0
    STAT_METHODS = [:backlog, :running, :pool_capacity, :max_threads, :requests_count].freeze

    # Returns a hash of stats about the running server for reporting purposes.
    # @version 5.0.0
    # @!attribute [r] stats
    def stats
      STAT_METHODS.map {|name| [name, send(name) || 0]}.to_h
    end
  end
end