cotag/spider-gazelle

View on GitHub
lib/spider-gazelle/spider/http1.rb

Summary

Maintainability
D
2 days
Test Coverage
# frozen_string_literal: true

require 'http-parser'     # C based, fast, http parser
require 'spider-gazelle/gazelle/request'


module SpiderGazelle
    class Spider
        class Http1
            class Callbacks
                def initialize
                    @parser = ::HttpParser::Parser.new self
                    @logger = Logger.instance
                end


                attr_accessor :connection
                attr_reader   :parser


                def on_message_begin(parser)
                    @connection.start_parsing
                end

                def on_url(parser, url)
                    @connection.parsing.url << url
                end

                def on_header_field(parser, header)
                    req = @connection.parsing
                    req.header.frozen? ? req.header = header : req.header << header
                end

                def on_header_value(parser, value)
                    req = @connection.parsing
                    if req.header.frozen?
                        req.env[req.header] << value
                    else
                        header = req.header
                        header.upcase!
                        header.gsub!('-', '_')
                        header.prepend('HTTP_')
                        header.freeze
                        if req.env[header]
                            req.env[header] << ', '
                            req.env[header] << value
                        else
                            req.env[header] = String.new(value)
                        end
                    end
                end

                def on_headers_complete(parser)
                    @connection.headers_complete
                end

                def on_body(parser, data)
                    @connection.parsing.body << data
                end

                def on_message_complete(parser)
                    @connection.finished_parsing
                end
            end


            Hijack = Struct.new :socket, :env


            def initialize(return_method, callbacks, thread, logger, gazelles)
                # The HTTP parser callbacks object for this thread
                @return_method = return_method
                @callbacks = callbacks
                @thread = thread
                @logger = logger

                @gazelles = gazelles

                # The parser state for this instance
                @state = ::HttpParser::Parser.new_instance do |inst|
                    inst.type = :request
                end

                # The request and response queues
                @requests = []
                @responses = []
            end


            attr_reader   :parsing

            DUMMY_PROGRESS = proc {}

            def load(socket, port, app, tls)
                @socket = socket
                @port = port
                @app = app

                @remote_ip = socket.peername[0]
                @scheme = tls ? 'https' : 'http'

                set_on_close(socket)
            end

            # Only close the socket we are meaning to close
            def set_on_close(socket)
                socket.finally { on_close if socket == @socket }
            end

            def on_close
                # Unlink the progress callback (prevent funny business)
                @socket.progress &DUMMY_PROGRESS
                @socket.storage = nil
                reset
                @return_method.call(self)
            end
            alias_method :unlink, :on_close

            def reset
                @app = nil
                @socket = nil
                @remote_ip = nil

                # Safe to leave these
                # @port = nil
                # @mode = nil
                # @scheme = nil

                if @processing
                    @processing.defer.reject(:socket_closed)
                end
                @processing = nil
                @transmitting = nil

                @requests.clear
                @responses.clear
                @state.reset!
            end

            def parse(data)
                # This works as we only ever call this from a single thread
                @callbacks.connection = self
                parsing_error if @callbacks.parser.parse(@state, data)
            end

            # ----------------
            # Parser Callbacks
            # ----------------
            def start_parsing
                @parsing = Gazelle::Request.new @thread, @app, @port, @remote_ip, @scheme, @socket
            end

            def headers_complete
                @parsing.env['REQUEST_METHOD'] = @state.http_method.to_s
            end

            def finished_parsing
                request = @parsing
                @parsing = nil
                request.keep_alive = @state.keep_alive?
                request.upgrade = @state.upgrade?

                @thread.next_tick { after_parsing(request) }
            end

            def after_parsing(request)
                @socket.stop_read unless request.keep_alive

                # Process the async request in the same way as Mizuno
                # See: http://polycrystal.org/2012/04/15/asynchronous_responses_in_rack.html
                # Process a response that was marked as async.
                request.env['async.callback'] = proc { |data|
                    @thread.schedule { request.defer.resolve([request, data]) }
                }
                @requests << request

                process_next unless @processing
            end

            # ------------------
            # Request Processing
            # ------------------
            EMPTY_RESPONSE = [''].freeze
            def process_next
                @processing = @requests.shift
                if @processing
                    request = @processing

                    # queue response
                    request.then do |response|
                        @responses << response
                        send_next_response unless @transmitting

                        # Processing will be set to nil if the array is empty
                        process_next
                    end

                    @gazelles.next.schedule do
                        process_on_gazelle(request)
                    end
                end
            end

            def process_on_gazelle(request)
                result = begin
                    request.execute!
                rescue StandardError => e
                    Logger.instance.print_error e, 'framework error'
                    request.keep_alive = false
                    [500, {}, EMPTY_RESPONSE]
                end

                if request.is_async && !request.hijacked
                    if result.nil? && !request.defer.resolved?
                        # TODO:: setup timeout for async response
                    end
                else
                    # Complete the current request
                    request.defer.resolve([request, result])
                end
            rescue Exception => error
                Logger.instance.print_error error, 'critical error'
                Reactor.instance.shutdown
            end

            # ----------------
            # Response Sending
            # ----------------
            def send_next_response
                request, result = @responses.shift
                @transmitting = request
                return unless request

                if request.hijacked
                    # Unlink the management of the socket
                    # Then forward the raw socket to the upgrade handler
                    socket = @socket
                    unlink
                    request.hijacked.resolve Hijack.new(socket, request.env)

                elsif @socket.closed
                    body = result[2]
                    body.close if body.respond_to?(:close)
                else
                    status, headers, body = result
                    send_body = request.env['REQUEST_METHOD'] != 'HEAD'

                    # If a file, stream the body in a non-blocking fashion
                    if body.respond_to? :to_path
                        begin
                            file = @thread.file body.to_path, File::RDONLY, wait: true

                            file.catch do |err|
                                @logger.warn "Error reading file: #{err}"

                                if data_written
                                    file.close
                                    @socket.shutdown
                                else
                                    send_internal_error
                                end
                            end

                            # Request has completed - send the next one
                            file.finally do
                                send_next_response
                            end

                            # Send the body in parallel without blocking the next request in dev
                            # Also if this is a head request we still want the body closed
                            body.close if body.respond_to?(:close)
                            data_written = false

                            statprom = file.stat wait: false
                            statprom.then do |stats|
                                #etag = ::Digest::MD5.hexdigest "#{stats[:st_mtim][:tv_sec]}#{body.to_path}"
                                #if etag == request.env[HTTP_ETAG]
                                #    header = NOT_MODIFIED_304.dup
                                #    add_header(header, ETAG, etag)
                                #    header << "\r\n"
                                #    @socket.write header
                                #    return
                                #end
                                #headers[ETAG] ||= etag

                                if headers['Content-Length']
                                    type = :raw
                                else
                                    type = :http
                                    headers['Transfer-Encoding'] = 'chunked'
                                end

                                data_written = true
                                write_headers request.keep_alive, status, headers

                                if send_body
                                    # File is open and available for reading
                                    promise = file.send_file(@socket, using: type)
                                    promise.then do
                                        file.close
                                        @socket.shutdown if request.keep_alive == false
                                    end
                                    promise.catch do |err|
                                        @logger.warn "Error sending file: #{err}"
                                        @socket.close
                                        file.close
                                    end
                                else
                                    file.close
                                    @socket.shutdown unless request.keep_alive
                                end
                            end
                            statprom.catch do |err|
                                @logger.warn "Error reading file stats: #{err}"
                                file.close
                                send_internal_error
                            end
                        rescue => err
                            @logger.warn "Error reading file: #{err}"
                            send_internal_error
                        end
                    else
                        # Optimize the response
                        begin
                            if body.size < 2
                                headers['Content-Length'] = body.size == 1 ? body[0].bytesize.to_s : '0'
                            end
                        rescue # just in case
                        end

                        keep_alive = request.keep_alive

                        if send_body
                            write_response request, status, headers, body
                        else
                            body.close if body.respond_to?(:close)
                            write_headers keep_alive, status, headers
                            @socket.shutdown if keep_alive == false
                        end

                        send_next_response
                    end
                end
            end

            def write_response(request, status, headers, body)
                keep_alive = request.keep_alive

                if headers['Content-Length']
                    headers['Content-Length'] = headers['Content-Length'].to_s
                    write_headers keep_alive, status, headers

                    # Stream the response (pass directly into @socket.write)
                    body.each { |data| @socket.write(data) }
                    @socket.shutdown if keep_alive == false
                else
                    headers['Transfer-Encoding'] = 'chunked'
                    write_headers keep_alive, status, headers

                    # Stream the response
                    body.each do |part|
                        chunk = part.bytesize.to_s(16) << "\r\n" << part << "\r\n"
                        @socket.write chunk
                    end

                    @socket.write "0\r\n\r\n"
                    @socket.shutdown if keep_alive == false
                end

                body.close if body.respond_to?(:close)
            end

            def add_header(header, key, value)
                header << key
                header << ': '
                header << value
                header << "\r\n"
            end

            def write_headers(keep_alive, status, headers)
                headers['Connection'] = 'close' if keep_alive == false

                header = String.new("HTTP/1.1 #{status} #{fetch_code(status)}\r\n")
                headers.each do |key, value|
                    next if key.start_with? 'rack'
                    value.to_s.split("\n").each {|val| add_header(header, key, val)}
                end
                header << "\r\n"
                @socket.write header
            end

            HTTP_STATUS_CODES = Rack::Utils::HTTP_STATUS_CODES
            HTTP_STATUS_DEFAULT = proc { 'CUSTOM' }
            def fetch_code(status)
                HTTP_STATUS_CODES.fetch(status, &HTTP_STATUS_DEFAULT)
            end


            # ----------------
            # Error Management
            # ----------------
            def critical_error
                # Kill the process
                Reactor.instance.shutdown
            end

            def parsing_error
                # Stop reading from the client
                # Wait for existing requests to complete
                # Send an error response for the current request
                @socket.stop_read
                previous = @requests[-1] || @processing

                if previous
                    previous.finally do
                        send_parsing_error
                    end
                else
                    send_parsing_error
                end
            end

            def send_parsing_error
                @logger.info "Parsing error!"
                @socket.stop_read
                @socket.write "HTTP/1.1 400 Bad Request\r\nConnection: close\r\nContent-Length: 0\r\n\r\n"
                @socket.shutdown
            end

            def send_internal_error
                @logger.info "Internal error"
                @socket.stop_read
                @socket.write "HTTP/1.1 500 Internal Server Error\r\nConnection: close\r\nContent-Length: 0\r\n\r\n"
                @socket.shutdown
            end
        end
    end
end