requests_ecp/ecp.py
# -*- coding: utf-8 -*-
# Copyright (C) Cardiff University (2020-2022)
#
# This file is part of requests_ecp.
#
# requests_ecp is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# requests_ecp is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with requests_ecp. If not, see <http://www.gnu.org/licenses/>.
"""ECP AuthN implementation for Python requests.
"""
__author__ = "Duncan Macleod <duncan.macleod@ligo.org>"
from lxml import etree
from requests import (
HTTPError,
Request,
Session,
)
# -- utilities --------------
def _get_xml_attribute(xdata, path):
"""Parse an attribute from an XML document
"""
namespaces = {
'ecp': 'urn:oasis:names:tc:SAML:2.0:profiles:SSO:ecp',
'S': 'http://schemas.xmlsoap.org/soap/envelope/',
'paos': 'urn:liberty:paos:2003-08'
}
return xdata.xpath(path, namespaces=namespaces)[0]
def _send(
connection,
method,
url,
**kwargs,
):
"""Format and send a request.
"""
request_kw = {k: kwargs.pop(k) for k in (
"auth",
"cookies",
"data",
"files",
"headers",
"json",
) if k in kwargs}
# if given a Session use it
if isinstance(connection, Session):
response = connection.request(
method.lower(),
url,
allow_redirects=False,
**request_kw,
)
# otherwise manually prepare the request
else:
request = Request(
method=method,
url=url,
**request_kw,
).prepare()
response = connection.send(request, **kwargs)
response.raise_for_status()
return response
def _report_soap_fault(
connection,
url,
message=(
"responseConsumerURL from SP and assertionConsumerServiceURL "
"from IdP do not match"
),
**kwargs,
):
"""Report a problem with the SOAP configuration of SP/IdP pair.
"""
return _send(
connection,
"POST",
url,
data=f"""
<S:Envelope xmlns:S="http://schemas.xmlsoap.org/soap/envelope/">
<S:Body>
<S:Fault>
<faultcode>S:Server</faultcode>
<faultstring>{message}</faultstring>
</S:Fault>
</S:Body>
</S:Envelope>""".strip(), # noqa
headers={'Content-Type': 'application/vnd.paos+xml'},
)
# -- ECP worker -------------
def authenticate(
connection,
auth,
endpoint,
url,
**kwargs,
):
"""Perform an ECP authorisation round-trip.
A good description of this is round-trip is here:
https://medium.com/@winma.15/soap-vs-paos-bindings-in-saml-9ce12a052a0f
This function isn't really meant to be called outside of
the `HTTPECPAuth` response handler, and is not well tested in any
other usage.
Parameters
----------
connection : `requests.Session`, `requests.adapters.HTTPAdapter`
The thing to use to send the request, must support a `send` method.
auth : `requests.auth.AuthBase`
The authentication object to use when communicating with the
ECP Identity Provider.
endpoint : `str`
The URL of the Identity Provider ECP endpoint.
url : `str`
The URL of the resource on the Service Provider to request.
kwargs
Other keyword arguments are passed directly to
:meth:`requests.Session.request` or `http.client.HTTPConnection`.
Returns
-------
responses : `tuple` of `requests.Response`
A `tuple` of three (3) `requests.Response` objects are returned
in the order in which they were requested. The final response
_should_ include a ``302 Found`` redirect back to the original
requested resource.
"""
# -- step 1: initiate ECP request -----------
# request resource via ECP
resp1 = _send(
connection,
method="GET",
url=url,
headers={
'Accept': 'text/html; application/vnd.paos+xml',
'PAOS': 'ver="urn:liberty:paos:2003-08";'
'"urn:oasis:names:tc:SAML:2.0:profiles:SSO:ecp"',
},
**kwargs,
)
# the response from the SP _should be_ an `<AuthnRequest>` message
# to be relayed to the IdP.
try:
spetree = etree.XML(resp1.content)
finally:
resp1.raw.release_conn()
# pick out the relay state element from the SP so that it can
# be included later in the response to the SP
relaystate = _get_xml_attribute(
spetree,
"//ecp:RelayState",
)
# pick out the responseConsumerURL to validate against the
# AssertionConsumerServiceURL we receive later from the IdP
rcurl = _get_xml_attribute(
spetree,
"/S:Envelope/S:Header/paos:Request/@responseConsumerURL",
)
# -- step 2: authenticate with endpoint -----
# remove the PAOS header to create a SOAP package for the IdP
idpbody = spetree
idpbody.remove(idpbody[0])
# forward <AuthnRequest> to Identity Provider using SOAP
resp2 = _send(
connection,
method="POST",
url=endpoint,
auth=auth,
data=etree.tostring(idpbody),
headers={"Content-Type": "text/xml; charset=utf-8"},
**kwargs,
)
try:
idptree = etree.XML(resp2.content)
except etree.XMLSyntaxError:
raise RuntimeError(
"Failed to parse response from {}, you most "
"likely incorrectly entered your passphrase".format(
endpoint,
),
)
finally:
resp2.raw.release_conn()
acsurl = _get_xml_attribute(
idptree,
"/S:Envelope/S:Header/ecp:Response/@AssertionConsumerServiceURL",
)
# validate URLs between SP and IdP
if acsurl != rcurl:
try:
_report_soap_fault(connection, rcurl)
except HTTPError:
pass # don't care, just doing a service
# -- step 3: post back to the SP ------------
# replace the IdP's <Response> with the `<RelayState>` we
# received originally...
actree = idptree
actree[0][0] = relaystate
# and post back to the SP's ECP endpoint
resp3 = _send(
connection,
method="POST",
url=acsurl,
data=etree.tostring(actree),
headers={'Content-Type': 'application/vnd.paos+xml'},
**kwargs,
)
# The result of this _should be_ a final redirect back to the
# resource we requested originally.
# return the response history:
return resp1, resp2, resp3