actionpack/lib/action_dispatch/middleware/host_authorization.rb
# frozen_string_literal: true
# :markup: markdown
module ActionDispatch
# # Action Dispatch HostAuthorization
#
# This middleware guards from DNS rebinding attacks by explicitly permitting the
# hosts a request can be sent to, and is passed the options set in
# `config.host_authorization`.
#
# Requests can opt-out of Host Authorization with `exclude`:
#
# config.host_authorization = { exclude: ->(request) { request.path =~ /healthcheck/ } }
#
# When a request comes to an unauthorized host, the `response_app` application
# will be executed and rendered. If no `response_app` is given, a default one
# will run. The default response app logs blocked host info with level 'error'
# and responds with `403 Forbidden`. The body of the response contains debug
# info if `config.consider_all_requests_local` is set to true, otherwise the
# body is empty.
class HostAuthorization
ALLOWED_HOSTS_IN_DEVELOPMENT = [".localhost", ".test", IPAddr.new("0.0.0.0/0"), IPAddr.new("::/0")]
PORT_REGEX = /(?::\d+)/ # :nodoc:
SUBDOMAIN_REGEX = /(?:[a-z0-9-]+\.)/i # :nodoc:
IPV4_HOSTNAME = /(?<host>\d+\.\d+\.\d+\.\d+)#{PORT_REGEX}?/ # :nodoc:
IPV6_HOSTNAME = /(?<host>[a-f0-9]*:[a-f0-9.:]+)/i # :nodoc:
IPV6_HOSTNAME_WITH_PORT = /\[#{IPV6_HOSTNAME}\]#{PORT_REGEX}/i # :nodoc:
VALID_IP_HOSTNAME = Regexp.union( # :nodoc:
/\A#{IPV4_HOSTNAME}\z/,
/\A#{IPV6_HOSTNAME}\z/,
/\A#{IPV6_HOSTNAME_WITH_PORT}\z/,
)
class Permissions # :nodoc:
def initialize(hosts)
@hosts = sanitize_hosts(hosts)
end
def empty?
@hosts.empty?
end
def allows?(host)
@hosts.any? do |allowed|
if allowed.is_a?(IPAddr)
begin
allowed === extract_hostname(host)
rescue
# IPAddr#=== raises an error if you give it a hostname instead of IP. Treat
# similar errors as blocked access.
false
end
else
allowed === host
end
end
end
private
def sanitize_hosts(hosts)
Array(hosts).map do |host|
case host
when Regexp then sanitize_regexp(host)
when String then sanitize_string(host)
else host
end
end
end
def sanitize_regexp(host)
/\A#{host}#{PORT_REGEX}?\z/
end
def sanitize_string(host)
if host.start_with?(".")
/\A#{SUBDOMAIN_REGEX}?#{Regexp.escape(host[1..-1])}#{PORT_REGEX}?\z/i
else
/\A#{Regexp.escape host}#{PORT_REGEX}?\z/i
end
end
def extract_hostname(host)
host.slice(VALID_IP_HOSTNAME, "host") || host
end
end
class DefaultResponseApp # :nodoc:
RESPONSE_STATUS = 403
def call(env)
request = Request.new(env)
format = request.xhr? ? "text/plain" : "text/html"
log_error(request)
response(format, response_body(request))
end
private
def response_body(request)
return "" unless request.get_header("action_dispatch.show_detailed_exceptions")
template = DebugView.new(hosts: request.env["action_dispatch.blocked_hosts"])
template.render(template: "rescues/blocked_host", layout: "rescues/layout")
end
def response(format, body)
[RESPONSE_STATUS,
{ Rack::CONTENT_TYPE => "#{format}; charset=#{Response.default_charset}",
Rack::CONTENT_LENGTH => body.bytesize.to_s },
[body]]
end
def log_error(request)
logger = available_logger(request)
return unless logger
logger.error("[#{self.class.name}] Blocked hosts: #{request.env["action_dispatch.blocked_hosts"].join(", ")}")
end
def available_logger(request)
request.logger || ActionView::Base.logger
end
end
def initialize(app, hosts, exclude: nil, response_app: nil)
@app = app
@permissions = Permissions.new(hosts)
@exclude = exclude
@response_app = response_app || DefaultResponseApp.new
end
def call(env)
return @app.call(env) if @permissions.empty?
request = Request.new(env)
hosts = blocked_hosts(request)
if hosts.empty? || excluded?(request)
mark_as_authorized(request)
@app.call(env)
else
env["action_dispatch.blocked_hosts"] = hosts
@response_app.call(env)
end
end
private
def blocked_hosts(request)
hosts = []
origin_host = request.get_header("HTTP_HOST")
hosts << origin_host unless @permissions.allows?(origin_host)
forwarded_host = request.x_forwarded_host&.split(/,\s?/)&.last
hosts << forwarded_host unless forwarded_host.blank? || @permissions.allows?(forwarded_host)
hosts
end
def excluded?(request)
@exclude && @exclude.call(request)
end
def mark_as_authorized(request)
request.set_header("action_dispatch.authorized_host", request.host)
end
end
end