edgewall/trac

View on GitHub
trac/web/api.py

Summary

Maintainability
F
4 days
Test Coverage
# -*- coding: utf-8 -*-
#
# Copyright (C) 2005-2023 Edgewall Software
# Copyright (C) 2005-2006 Christopher Lenz <cmlenz@gmx.de>
# All rights reserved.
#
# This software is licensed as described in the file COPYING, which
# you should have received as part of this distribution. The terms
# are also available at https://trac.edgewall.org/wiki/TracLicense.
#
# This software consists of voluntary contributions made by many
# individuals. For the exact contribution history, see the revision
# history and logs, available at https://trac.edgewall.org/log/.
#
# Author: Christopher Lenz <cmlenz@gmx.de>

from abc import ABCMeta
from http.cookies import CookieError, BaseCookie, SimpleCookie
from http.server import BaseHTTPRequestHandler
from datetime import datetime
import base64
import hashlib
import io
import mimetypes
import os
import re
import sys
import tempfile
import urllib.parse

try:
    import multipart
except ImportError:
    multipart = None
    import cgi
else:
    cgi = None

from trac.core import Interface, TracBaseError, TracError
from trac.util import as_bool, as_int, get_last_traceback, lazy, \
                      normalize_filename
from trac.util.datefmt import http_date, localtz, to_datetime, utc
from trac.util.html import Fragment, tag
from trac.util.text import empty, exception_to_unicode, to_unicode
from trac.util.translation import _, N_, tag_
from trac.web.href import Href
from trac.web.wsgi import _FileWrapper, is_client_disconnect_exception


class IAuthenticator(Interface):
    """Extension point interface for components that can provide the name
    of the remote user."""

    def authenticate(req):
        """Return the name of the remote user, or `None` if the identity of the
        user is unknown."""


class IRequestHandler(Interface):
    """Decide which `trac.core.Component` handles which `Request`, and how.

    The boolean property `is_valid_default_handler` determines whether the
    `IRequestFilter` can be used as a `default_handler` and defaults to
    `True`. To be suitable as a `default_handler`, an `IRequestFilter` must
    return an HTML document and `data` dictionary for rendering the document,
    and must not require that `match_request` be called prior to
    `process_request`.

    The boolean property `jquery_noconflict` determines whether jQuery's
    `noConflict` mode will be activated by the handler, and defaults to
    `False`.
    """

    def match_request(req):
        """Return whether the handler wants to process the given request."""

    def process_request(req):
        """Process the request.

        Return a `(template_name, data)` pair, where `data` is a
        dictionary of substitutions for the Jinja2 template (the
        template context, in Jinja2 terms).

        Optionally, the return value can also be a `(template_name,
        data, metadata)` triple, where `metadata` is a `dict` with
        hints for the template engine or the web front-end.

        Keys supported are:

          - `'content_type'`: the mimetype used for content delivery;
            "text/html" is assumed if the key is not present or the
            `metadata` was not specified

          - `'text'`: a boolean value indicating whether the Jinja2
            auto-escaping feature should be deactivated
            (``text=True``) or not (``text=False``); defaults to
            ``False``, suitable for generating HTML or XML content

          - `'fragment'`: a boolean value indicating whether the
            generated content will be used as part of another page
            (``fragment=True``) or as a stand-alone page
            (``fragment=False``), the default

          - `'domain'`: a string value indicating the translation
            domain to which the translated strings in the template
            belong to

        Note that if template processing should not occur, this method
        can simply send the response itself (see `Request` methods)
        and not return anything, as the `Request` methods raise a
        `RequestDone` exception.

        :Since 1.0: Clearsilver templates are no longer supported.

        :Since 1.1.2: the rendering `method` (xml, xhtml or text) may be
           returned as a fourth parameter in the tuple, but if not specified
           it will be inferred from the `content_type` when rendering the
           template.

        :Since 1.3.2: returns a pair, or a tuple in which the third
           element is a `dict` instead of a string like in the old
           API.  Note that the old API (`(template, data,
           content_type)` where `content_type` is a string or `None`)
           is still supported. When used, this means that `template`
           is a legacy Genshi template.

        :Since 1.5.1: Genshi templates are no longer supported. The
            third argument, `metadata`, will always be a `dict` or `None`
            when specified.
        """


def is_valid_default_handler(handler):
    """Returns `True` if the `handler` is a valid default handler, as
    described in the `IRequestHandler` interface documentation.
    """
    return handler and getattr(handler, 'is_valid_default_handler', True)


class IRequestFilter(Interface):
    """Enable components to interfere with the processing done by the
    main handler, either before and/or after it enters in action.
    """

    def pre_process_request(req, handler):
        """Called after initial handler selection, and can be used to change
        the selected handler or redirect request.

        Always returns the request handler, even if unchanged.
        """

    def post_process_request(req, template, data, metadata):
        """Do any post-processing the request might need

        This typically means adding values to the template `data`
        dictionary, or changing the Jinja2 template.

        `data` and `metadata` may be updated in place.

        Always returns a tuple of ``(template, data)`` or ``(template,
        data, metadata)``, even if unchanged.

        Note that `template`, `data`, `metadata` will be `None` if:
         - called when processing an error page
         - the default request handler did not return any result

        :Since 1.0: Clearsilver templates are no longer supported.

        :Since 1.1.2: the rendering `method` will be passed if it is returned
           by the request handler, otherwise `method` will be `None`. For
           backward compatibility, the parameter is optional in the
           implementation's signature.

        :Since 1.3.2: Genshi templates are still supported, and if
           `process_request` uses the old API (`(template, data,
           content_type)`), the `metadata` parameter passed to
           `post_process_request` will actually be the `content_type`
           value (`String` or `None`).

        :Since 1.5.1.: Genshi templates are no longer supported. `metadata`
            is always a `dict` or `None`.

        """


class TracNotImplementedError(TracError, NotImplementedError):
    """Raised when a `NotImplementedError` is trapped.

    This exception is for internal use and should not be raised by
    plugins. Plugins should raise `NotImplementedError`.

    :since: 1.0.11
    """

    title = N_("Not Implemented Error")


HTTP_STATUS = {int(code): reason.title()
               for code, (reason, description)
               in BaseHTTPRequestHandler.responses.items()}


class HTTPException(TracBaseError, metaclass=ABCMeta):

    code = None

    def __init__(self, detail, *args):
        """Factory for HTTPException classes."""
        if isinstance(detail, TracBaseError):
            self.detail = detail.message
            self.reason = detail.title
        else:
            self.detail = detail
        if args:
            self.detail = self.detail % args
        arg = '%s %s (%s)' % (self.code, self.reason, to_unicode(self.detail))
        super().__init__(arg)

    @property
    def message(self):
        # The message is based on the e.detail, which can be an Exception
        # object, but not a TracError one: when creating HTTPException,
        # a TracError.message is directly assigned to e.detail
        if isinstance(self.detail, Exception): # not a TracBaseError
            message = exception_to_unicode(self.detail)
        elif isinstance(self.detail, Fragment): # TracBaseError markup
            message = self.detail
        else:
            message = to_unicode(self.detail)
        return message

    @property
    def title(self):
        try:
            # We first try to get localized error messages here, but we
            # should ignore secondary errors if the main error was also
            # due to i18n issues
            title = _("Error")
            if self.reason:
                if title.lower() in self.reason.lower():
                    title = self.reason
                else:
                    title = _("Error: %(message)s", message=self.reason)
        except Exception:
            title = "Error"
        return title

    @classmethod
    def subclass(cls, name, code):
        """Create a new Exception class representing a HTTP status code."""
        reason = HTTP_STATUS.get(code, 'Unknown')
        new_class = type(name, (HTTPException,), {
            '__doc__': 'Exception for HTTP %d %s' % (code, reason)
        })
        new_class.code = code
        new_class.reason = reason
        return new_class

_HTTPException_subclass_names = []
for code in [code for code in HTTP_STATUS if code >= 400]:
    exc_name = HTTP_STATUS[code].replace(' ', '').replace('-', '')
    if exc_name.lower().startswith('http'):
        exc_name = exc_name[4:]
    exc_name = 'HTTP' + exc_name
    setattr(sys.modules[__name__], exc_name,
            HTTPException.subclass(exc_name, code))
    _HTTPException_subclass_names.append(exc_name)
del code, exc_name


class _RequestArgs(dict):
    """Dictionary subclass that provides convenient access to request
    parameters that may contain multiple values."""

    def get(self, name, default=None):
        """Return the first value for the specified parameter, or `default`
        if the parameter was not provided.

        :since 1.3.2: aliases `getfirst`. Use `getlist` if the value is
                      expected to be a list.
        """
        return self.getfirst(name, default)

    def as_int(self, name, default=None, min=None, max=None):
        """Return the value as an integer. Return `default` if
        if an exception is raised while converting the value to an
        integer.

        :param name: the name of the request parameter
        :keyword default: the value to return if the parameter is not
                          specified or an exception occurs converting
                          the value to an integer.
        :keyword min: lower bound to which the value is limited
        :keyword max: upper bound to which the value is limited

        :since: 1.2
        """
        if name not in self:
            return default
        return as_int(self.get(name), default, min, max)

    def as_bool(self, name, default=None):
        """Return the value as a boolean. Return `default` if
        if an exception is raised while converting the value to a
        boolean.

        :param name: the name of the request parameter
        :keyword default: the value to return if the parameter is not
                          specified or an exception occurs converting
                          the value to a boolean.

        :since: 1.2
        """
        if name not in self:
            return default
        return as_bool(self.get(name), default)

    def getbool(self, name, default=None):
        """Return the value as a boolean. Raise an `HTTPBadRequest`
        exception if an exception occurs while converting the value to
        a boolean.

        :param name: the name of the request parameter
        :keyword default: the value to return if the parameter is not
                          specified.

        :since: 1.2
        """
        if name not in self:
            return default
        value = self[name]
        if isinstance(value, list):
            raise HTTPBadRequest(tag_("Invalid value for request argument "
                                      "%(name)s.", name=tag.em(name)))
        value = as_bool(value, None)
        if value is None:
            raise HTTPBadRequest(tag_("Invalid value for request argument "
                                      "%(name)s.", name=tag.em(name)))
        return value

    def getint(self, name, default=None, min=None, max=None):
        """Return the value as an integer. Raise an `HTTPBadRequest`
        exception if an exception occurs while converting the value
        to an integer.

        :param name: the name of the request parameter
        :keyword default: the value to return if the parameter is not
                          specified
        :keyword min: lower bound to which the value is limited
        :keyword max: upper bound to which the value is limited

        :since: 1.2
        """
        if name not in self:
            return default
        value = as_int(self[name], None, min, max)
        if value is None:
            raise HTTPBadRequest(tag_("Invalid value for request argument "
                                      "%(name)s.", name=tag.em(name)))
        return value

    def getfirst(self, name, default=None):
        """Return the first value for the specified parameter, or `default`
        if the parameter was not provided.

        :since 1.3.2: `get` aliases `getfirst` and should be used instead.
        """
        if name not in self:
            return default
        val = self[name]
        if isinstance(val, list):
            val = val[0]
        return val

    def getlist(self, name):
        """Return a list of values for the specified parameter, even if only
        one value was provided.
        """
        if name not in self:
            return []
        val = self[name]
        if not isinstance(val, list):
            val = [val]
        return val

    def getfile(self, name):
        """Return a tuple of the filename, file object and file size.

        :param name: the name of the request parameter

        :since: 1.3.3
        """
        upload = self.getfirst(name)
        return self._getfile(upload)

    def getfilelist(self, name):
        """Return a list of tuples containing the filename, file object
        and file size.

        :param name: the name of the request parameter

        :since: 1.3.3
        """
        return [self._getfile(upload) for upload in self.getlist(name)]

    def require(self, name):
        """Raise an `HTTPBadRequest` exception if the parameter is
        not in the request.

        :param name: the name of the request parameter

        :since: 1.2
        """
        if name not in self:
            raise HTTPBadRequest(
                tag_("Missing request argument. The %(name)s argument "
                     "must be included in the request.", name=tag.em(name)))

    def _getfile(self, upload):
        filename = fileobj = size = None
        if hasattr(upload, 'filename'):
            filename = normalize_filename(upload.filename)
        if hasattr(upload, 'file'):
            fileobj = upload.file
            if hasattr(fileobj, 'fileno'):
                try:
                    size = os.fstat(fileobj.fileno())[6]
                except io.UnsupportedOperation:
                    size = None
            if size is None:
                fileobj.seek(0, 2)  # seek to end of file
                size = fileobj.tell()
                fileobj.seek(0)

        return filename, fileobj, size


def parse_arg_list(query_string):
    """Parse a query string into a list of `(name, value)` tuples.

    :Since 1.1.2: a leading `?` is stripped from `query_string`."""
    args = []
    if not query_string:
        return args
    unquote_plus = urllib.parse.unquote_plus
    query_string = query_string.lstrip('?')
    for arg in query_string.split('&'):
        nv = arg.split('=', 1)
        if len(nv) == 2:
            (name, value) = nv
        else:
            (name, value) = (nv[0], empty)
        name = unquote_plus(name, encoding='utf-8', errors='strict')
        value = unquote_plus(value, encoding='utf-8', errors='strict')
        args.append((name, value))
    return args


def arg_list_to_args(arg_list):
    """Convert a list of `(name, value)` tuples into into a `_RequestArgs`."""
    args = _RequestArgs()
    for name, value in arg_list:
        if name in args:
            if isinstance(args[name], list):
                args[name].append(value)
            else:
                args[name] = [args[name], value]
        else:
            args[name] = value
    return args


if hasattr(str, 'isascii'):
    _isascii = lambda value: value.isascii()
else:
    _is_non_ascii_re = re.compile(r'[^\x00-\x7f]')
    _isascii = lambda value: not _is_non_ascii_re.search(value)


def wsgi_string_decode(value):
    """Convert from a WSGI "bytes-as-unicode" string to an unicode string.
    """
    if not isinstance(value, str):
        raise TypeError('Must a str instance rather than %s' % type(value))
    if not _isascii(value):
        value = value.encode('iso-8859-1').decode('utf-8')
    return value


def wsgi_string_encode(value):
    """Convert from an unicode string to a WSGI "bytes-as-unicode" string.
    """
    if not isinstance(value, str):
        raise TypeError('Must a str instance rather than %s' % type(value))
    if not _isascii(value):
        value = value.encode('utf-8').decode('iso-8859-1')
    return value


def _raise_if_null_bytes(value):
    if value and '\x00' in value:
        raise HTTPBadRequest(_("Invalid request arguments."))


if multipart:
    def parse_header(header):
        return multipart.parse_options_header(header)

    def parse_form_data(environ):
        if environ['REQUEST_METHOD'] != 'POST':
            query_string = environ.get('QUERY_STRING', '')
            for name, value in parse_arg_list(query_string):
                _raise_if_null_bytes(name)
                _raise_if_null_bytes(value)
                yield name, value
            return

        ctype = environ.get('CONTENT_TYPE')
        if ctype:
            ctype, options = parse_header(ctype)

        if ctype == 'application/x-www-form-urlencoded':
            length = int(environ.get('CONTENT_LENGTH', -1))
            data = environ['wsgi.input'].read(length)
            pairs = urllib.parse.parse_qsl(
                str(data, 'utf-8'), keep_blank_values=True,
                strict_parsing=True, encoding='utf-8', errors='strict')
            for name, value in pairs:
                _raise_if_null_bytes(name)
                _raise_if_null_bytes(value)
                yield name, value
            return

        if ctype == 'multipart/form-data':
            forms, files = multipart.parse_form_data(environ, charset='utf-8',
                                                     strict=True)
            try:
                for name in forms:
                    _raise_if_null_bytes(name)
                    for value in forms.getall(name):
                        _raise_if_null_bytes(value)
                        yield name, value
                for name in files:
                    _raise_if_null_bytes(name)
                    for item in files.getall(name):
                        _raise_if_null_bytes(item.filename)
                        yield name, item
            except:
                for name in files:
                    for item in files.getall(name):
                        item.close()
                raise
            return

else:
    parse_header = cgi.parse_header

    class _FieldStorage(cgi.FieldStorage):
        """Our own version of cgi.FieldStorage, with tweaks."""

        def make_file(self):
            # We always use binary mode even if filename parameter is missing
            return tempfile.TemporaryFile('wb+')

        def read_multi(self, *args, **kwargs):
            try:
                super().read_multi(*args, **kwargs)
            except ValueError:
                # Most likely "Invalid boundary in multipart form",
                # possibly an upload of a .mht file? See #9880.
                self.read_single()

    def parse_form_data(environ):
        environ = environ.copy()
        fp = environ['wsgi.input']

        # Avoid letting cgi.FieldStorage consume the input stream when the
        # request does not contain form data
        ctype = environ.get('CONTENT_TYPE')
        if ctype:
            ctype, options = parse_header(ctype)
        if ctype not in ('application/x-www-form-urlencoded',
                         'multipart/form-data'):
            fp = io.BytesIO()

        # Python 2.6 introduced a backwards incompatible change for
        # FieldStorage where QUERY_STRING is no longer ignored for POST
        # requests. We'll keep the pre 2.6 behaviour for now...
        if environ['REQUEST_METHOD'] == 'POST':
            environ.pop('QUERY_STRING', '')
        fs = _FieldStorage(fp, environ=environ, keep_blank_values=True,
                           errors='strict')
        for value in fs.list or ():
            name = value.name
            _raise_if_null_bytes(name)
            if value.filename:
                _raise_if_null_bytes(value.filename)
            else:
                value = value.value
                _raise_if_null_bytes(value)
            yield name, value


class RequestDone(TracBaseError):
    """Marker exception that indicates whether request processing has completed
    and a response was sent.
    """
    iterable = None

    def __init__(self, iterable=None):
        self.iterable = iterable


class Cookie(SimpleCookie):
    def load(self, rawdata, ignore_parse_errors=False):
        if ignore_parse_errors:
            self.bad_cookies = []
            self._BaseCookie__set = self._loose_set
        SimpleCookie.load(self, rawdata)
        if ignore_parse_errors:
            self._BaseCookie__set = self._strict_set
            for key in self.bad_cookies:
                del self[key]

    _strict_set = BaseCookie._BaseCookie__set

    def _loose_set(self, key, real_value, coded_value):
        # If a key appears multiple times, the first occurrence has the
        # narrowest scope, keep that
        if key in self:
            return
        try:
            self._strict_set(key, real_value, coded_value)
        except CookieError:
            self.bad_cookies.append(key)
            dict.__setitem__(self, key, None)


class Request(object):
    """Represents a HTTP request/response pair.

    This class provides a convenience API over WSGI.
    """

    _disallowed_control_codes_re = re.compile(r'[\x00-\x08\x0a-\x1f\x7f]')
    _reserved_headers = {'content-type', 'content-length', 'location',
                         'etag', 'pragma', 'cache-control', 'expires'}
    # RFC7230 3.2 Header Fields
    _valid_header_re = re.compile(r"[-0-9A-Za-z!#$%&'*+.^_`|~]+\Z")

    def __init__(self, environ, start_response):
        """Create the request wrapper.

        :param environ: The WSGI environment dict
        :param start_response: The WSGI callback for starting the response
        :param callbacks: A dictionary of functions that are used to lazily
            evaluate attribute lookups
        """
        self.environ = environ
        self._start_response = start_response
        self._write = None
        self._status = '200 OK'
        self._response = None
        self._content_type = None

        self._outheaders = []
        self.outcookie = Cookie()

        self.callbacks = {
            'arg_list': Request._parse_arg_list,
            'args': lambda req: arg_list_to_args(req.arg_list),
            'languages': Request._parse_languages,
            'incookie': Request._parse_cookies,
            '_inheaders': Request._parse_headers,
            'locale': lambda req: None,  # prevent AttributeError
        }
        self.redirect_listeners = []

        self.base_url = self.environ.get('trac.base_url')
        if not self.base_url:
            self.base_url = self._reconstruct_url()
        self.href = Href(self.base_path)
        self.abs_href = Href(self.base_url)

    def __getattr__(self, name):
        """Performs lazy attribute lookup by delegating to the functions in the
        callbacks dictionary."""
        if name in self.callbacks:
            value = self.callbacks[name](self)
            setattr(self, name, value)
            return value
        raise AttributeError(name)

    def __repr__(self):
        uri = self.environ.get('PATH_INFO', '')
        qs = self.environ.get('QUERY_STRING', '')
        if qs:
            uri += '?' + qs
        return '<%s "%s %r">' % (self.__class__.__name__, self.method, uri)

    # Public API

    @lazy
    def is_authenticated(self):
        """Returns `True` if `authname` is not `anonymous`.

        :since: 1.3.2
        """
        return self.authname and self.authname != 'anonymous'

    @lazy
    def is_xhr(self):
        """Returns `True` if the request is an `XMLHttpRequest`.

        :since: 1.1.6
        """
        return self.get_header('X-Requested-With') == 'XMLHttpRequest'

    @property
    def method(self):
        """The HTTP method of the request"""
        return self.environ['REQUEST_METHOD']

    @property
    def path_info(self):
        """Path inside the application"""
        path_info = self.environ.get('PATH_INFO', '')
        try:
            return wsgi_string_decode(path_info)
        except UnicodeError:
            raise HTTPNotFound(_("Invalid URL encoding (was %(path_info)r)",
                                 path_info=path_info))

    @property
    def query_string(self):
        """Query part of the request"""
        return wsgi_string_decode(self.environ.get('QUERY_STRING', ''))

    @property
    def remote_addr(self):
        """IP address of the remote user"""
        return self.environ.get('REMOTE_ADDR')

    @property
    def remote_user(self):
        """ Name of the remote user.

        Will be `None` if the user has not logged in using HTTP authentication.
        """
        user = self.environ.get('REMOTE_USER')
        if user is not None:
            return wsgi_string_decode(user)

    @property
    def request_path(self):
        return self.href(self.path_info)

    @property
    def response_started(self):
        return self._write is not None

    @property
    def scheme(self):
        """The scheme of the request URL"""
        return self.environ['wsgi.url_scheme']

    @property
    def base_path(self):
        """The root path of the application"""
        return wsgi_string_decode(self.environ.get('SCRIPT_NAME', ''))

    @property
    def server_name(self):
        """Name of the server"""
        return self.environ['SERVER_NAME']

    @property
    def server_port(self):
        """Port number the server is bound to"""
        return int(self.environ['SERVER_PORT'])

    def add_redirect_listener(self, listener):
        """Add a callable to be called prior to executing a redirect.

        The callable is passed the arguments to the `redirect()` call.
        """
        self.redirect_listeners.append(listener)

    def get_header(self, name):
        """Return the value of the specified HTTP header, or `None` if there's
        no such header in the request.
        """
        name = name.lower()
        for key, value in self._inheaders:
            if key == name:
                return value
        return None

    def send_response(self, code=200):
        """Set the status code of the response."""
        self._status = '%s %s' % (code, HTTP_STATUS.get(code, 'Unknown'))

    def send_header(self, name, value):
        """Send the response header with the specified name and value.

        `value` must either be a `str` string or can be converted to one
        (e.g. numbers, ...)
        """
        if name.lower() == 'content-type':
            self._content_type = value.split(';', 1)[0]
        self._outheaders.append((name, str(value)))

    def end_headers(self, exc_info=None):
        """Must be called after all headers have been sent and before the
        actual content is written.
        """
        if self.method == 'POST' and self._content_type == 'text/html':
            # Disable XSS protection (#12926)
            self.send_header('X-XSS-Protection', 0)
        self._send_configurable_headers()
        self._send_cookie_headers()
        self._write = self._start_response(self._status, self._outheaders,
                                           exc_info)

    def check_modified(self, datetime, extra=''):
        """Check the request "If-None-Match" header against an entity tag.

        The entity tag is generated from the specified last modified time
        (`datetime`), optionally appending an `extra` string to
        indicate variants of the requested resource.

        That `extra` parameter can also be a list, in which case the MD5 sum
        of the list content will be used.

        If the generated tag matches the "If-None-Match" header of the request,
        this method sends a "304 Not Modified" response to the client.
        Otherwise, it adds the entity tag as an "ETag" header to the response
        so that consecutive requests can be cached.
        """

        # In <RFC9110 8.8.3. ETag>, the value enclosed with double quotes
        # allows %x21 / %x23-7E / %x80-FF bytes (except SPACE, <">, DEL).
        # However, WSGI requires latin-1 encoding in the headers. We use sha1
        # and urlsafe-base64 encoded for the value.
        def digest_base64(iterable):
            m = hashlib.sha1()
            for item in iterable:
                m.update(item.encode('utf-8'))
            digest = m.digest()
            encoded = base64.urlsafe_b64encode(digest).rstrip(b'=')
            return str(encoded, 'ascii')

        if isinstance(extra, list):
            extra = digest_base64(map(repr, extra))
        authname = digest_base64([self.authname])
        ts = to_datetime(datetime, utc).isoformat().replace('+00:00', 'Z')
        etag = 'W/"%s/%s/%s"' % (authname, ts, extra)
        inm = self.get_header('If-None-Match')
        if not inm or inm != etag:
            self.send_header('ETag', etag)
        else:
            self.send_response(304)
            self.send_header('Content-Length', 0)
            self.end_headers()
            raise RequestDone

    def redirect(self, url, permanent=False):
        """Send a redirect to the client, forwarding to the specified URL.

        The `url` may be relative or absolute, relative URLs will be translated
        appropriately.
        """
        for listener in self.redirect_listeners:
            listener(self, url, permanent)

        if permanent:
            status = 301 # 'Moved Permanently'
        elif self.method == 'POST':
            status = 303 # 'See Other' -- safe to use in response to a POST
        else:
            status = 302 # 'Found' -- normal temporary redirect

        self.send_response(status)
        if not url.startswith(('http://', 'https://')):
            # Make sure the URL is absolute
            scheme, host = urllib.parse.urlparse(self.base_url)[:2]
            url = urllib.parse.urlunparse((scheme, host, url, None, None,
                                           None))

        self.send_header('Location', url)
        self.send_header('Content-Type', 'text/plain')
        self.send_header('Content-Length', 0)
        self.send_header('Pragma', 'no-cache')
        self.send_header('Cache-Control', 'no-cache')
        self.send_header('Expires', 'Fri, 01 Jan 1999 00:00:00 GMT')
        self.end_headers()
        raise RequestDone

    def send(self, content, content_type='text/html', status=200):
        self._send(content, content_type, status)

    def send_error(self, exc_info, content, content_type='text/html',
                   status=500):
        self._outheaders = []
        self._send(content, content_type, status, exc_info)

    def send_no_content(self):
        self.send_response(204)
        self.send_header('Content-Length', 0)
        self.send_header('Content-Type', 'text/plain')
        self.end_headers()
        raise RequestDone

    def send_file(self, path, mimetype=None):
        """Send a local file to the browser.

        This method includes the "Last-Modified", "Content-Type" and
        "Content-Length" headers in the response, corresponding to the file
        attributes. It also checks the last modification time of the local file
        against the "If-Modified-Since" provided by the user agent, and sends a
        "304 Not Modified" response if it matches.
        """
        if not os.path.isfile(path):
            raise HTTPNotFound(_("File %(path)s not found", path=path))

        stat = os.stat(path)
        mtime = datetime.fromtimestamp(stat.st_mtime, localtz)
        last_modified = http_date(mtime)
        if last_modified == self.get_header('If-Modified-Since'):
            self.send_response(304)
            self.send_header('Content-Length', 0)
            self.end_headers()
            raise RequestDone

        if not mimetype:
            mimetype = mimetypes.guess_type(path)[0] or \
                       'application/octet-stream'

        self.send_response(200)
        self.send_header('Content-Type', mimetype)
        self.send_header('Content-Length', stat.st_size)
        self.send_header('Last-Modified', last_modified)
        use_xsendfile = getattr(self, 'use_xsendfile', False)
        if use_xsendfile:
            xsendfile_header = getattr(self, 'xsendfile_header', None)
            if xsendfile_header:
                self.send_header(xsendfile_header, os.path.abspath(path))
            else:
                use_xsendfile = False
        self.end_headers()

        if not use_xsendfile and self.method != 'HEAD':
            fileobj = open(path, 'rb')
            file_wrapper = self.environ.get('wsgi.file_wrapper', _FileWrapper)
            self._response = file_wrapper(fileobj, 4096)
        raise RequestDone

    def read(self, size=None):
        """Read the specified number of bytes from the request body."""
        fileobj = self.environ['wsgi.input']
        if size is None:
            size = self.get_header('Content-Length')
            if size is None:
                size = -1
            else:
                size = int(size)
        data = fileobj.read(size)
        return data

    CHUNK_SIZE = 4096

    def write(self, data):
        """Write the given data to the response body.

        *data* **must** be a `bytes` string or an iterable instance
        which iterates `bytes` strings, encoded with the charset which
        has been specified in the ``'Content-Type'`` header or UTF-8
        otherwise.

        Note that when the ``'Content-Length'`` header is specified,
        its value either corresponds to the length of *data*, or, if
        there are multiple calls to `write`, to the cumulative length
        of the *data* arguments.
        """
        if not self._write:
            self.end_headers()
        try:
            chunk_size = self.CHUNK_SIZE
            bufsize = 0
            buf = []
            buf_append = buf.append
            if isinstance(data, bytes):
                data = [data]
            for chunk in data:
                if isinstance(chunk, str):
                    raise ValueError("Can't send str content")
                if not chunk:
                    continue
                bufsize += len(chunk)
                buf_append(chunk)
                if bufsize >= chunk_size:
                    self._write(b''.join(buf))
                    bufsize = 0
                    buf[:] = ()
            if bufsize > 0:
                self._write(b''.join(buf))
        except IOError as e:
            if is_client_disconnect_exception(e):
                raise RequestDone
            raise

    @classmethod
    def is_valid_header(cls, name, value=None):
        """Check whether the field name, and optionally the value, make
        a valid HTTP header.
        """
        valid_name = name and name.lower() not in cls._reserved_headers and \
                     bool(cls._valid_header_re.match(name))
        valid_value = not cls._disallowed_control_codes_re.search(value) \
                      if value else True
        return valid_name & valid_value

    # Internal methods

    def _send(self, content, content_type='text/html', status=200,
              exc_info=None):
        self.send_response(status)
        self.send_header('Cache-Control', 'must-revalidate')
        self.send_header('Expires', 'Fri, 01 Jan 1999 00:00:00 GMT')
        self.send_header('Content-Type', content_type + ';charset=utf-8')
        if isinstance(content, str):
            self.send_header('Content-Length', len(content))
        self.end_headers(exc_info)

        if self.method != 'HEAD':
            self.write(content)
        raise RequestDone

    def _parse_arg_list(self):
        """Parse the supplied request parameters into a list of
        `(name, value)` tuples.
        """
        try:
            return list(parse_form_data(self.environ))
        except UnicodeDecodeError as e:
            raise HTTPBadRequest(_("Invalid encoding in form data: %(msg)s",
                                   msg=exception_to_unicode(e)))
        except IOError as e:
            if is_client_disconnect_exception(e):
                raise HTTPBadRequest(
                    _("Exception caught while reading request: %(msg)s",
                      msg=exception_to_unicode(e)))
            raise

    def _parse_cookies(self):
        cookies = Cookie()
        header = self.get_header('Cookie')
        if header:
            cookies.load(header, ignore_parse_errors=True)
        return cookies

    def _parse_headers(self):
        headers = [(name[5:].replace('_', '-').lower(), value)
                   for name, value in self.environ.items()
                   if name.startswith('HTTP_')]
        if 'CONTENT_LENGTH' in self.environ:
            headers.append(('content-length', self.environ['CONTENT_LENGTH']))
        if 'CONTENT_TYPE' in self.environ:
            headers.append(('content-type', self.environ['CONTENT_TYPE']))
        return headers

    def _parse_languages(self):
        """The list of languages preferred by the remote user, taken from the
        ``Accept-Language`` header.
        """
        header = self.get_header('Accept-Language') or 'en-us'
        langs = []
        for i, lang in enumerate(header.split(',')):
            code, params = parse_header(lang)
            q = 1
            if 'q' in params:
                try:
                    q = float(params['q'])
                except ValueError:
                    q = 0
            langs.append((-q, i, code))
        langs.sort()
        return [code for q, i, code in langs]

    def _reconstruct_url(self):
        """Reconstruct the absolute base URL of the application."""
        host = self.get_header('Host')
        if not host:
            # Missing host header, so reconstruct the host from the
            # server name and port
            default_port = {'http': 80, 'https': 443}
            if self.server_port and self.server_port != \
                    default_port[self.scheme]:
                host = '%s:%d' % (self.server_name, self.server_port)
            else:
                host = self.server_name
        return urllib.parse.urlunparse((self.scheme, host, self.base_path,
                                        None, None, None))

    def _send_configurable_headers(self):
        sent_headers = [name.lower() for name, val in self._outheaders]
        for name, val in getattr(self, 'configurable_headers', []):
            if name.lower() not in sent_headers:
                self.send_header(name, val)

    def _send_cookie_headers(self):
        for name in list(self.outcookie):
            path = self.outcookie[name].get('path')
            if path:
                path = path.replace(' ', '%20') \
                           .replace(';', '%3B') \
                           .replace(',', '%3C')
            self.outcookie[name]['path'] = path

        cookies = to_unicode(self.outcookie.output(header=''))
        for cookie in cookies.splitlines():
            self._outheaders.append(('Set-Cookie', cookie.strip()))


__no_apidoc__ = _HTTPException_subclass_names