secynic/ipwhois

View on GitHub
ipwhois/whois.py

Summary

Maintainability
F
5 days
Test Coverage
# Copyright (c) 2013-2020 Philip Hane
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# 1. Redistributions of source code must retain the above copyright notice,
#    this list of conditions and the following disclaimer.
# 2. Redistributions in binary form must reproduce the above copyright notice,
#    this list of conditions and the following disclaimer in the documentation
#    and/or other materials provided with the distribution.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.

import sys
import re
import copy
from datetime import datetime
import logging
from .utils import unique_everseen
from . import (BlacklistError, WhoisLookupError, NetError)

if sys.version_info >= (3, 3):  # pragma: no cover
    from ipaddress import (ip_address,
                           ip_network,
                           summarize_address_range,
                           collapse_addresses)
else:  # pragma: no cover
    from ipaddr import (IPAddress as ip_address,
                        IPNetwork as ip_network,
                        summarize_address_range,
                        collapse_address_list as collapse_addresses)

log = logging.getLogger(__name__)

# Legacy base whois output dictionary.
BASE_NET = {
    'cidr': None,
    'name': None,
    'handle': None,
    'range': None,
    'description': None,
    'country': None,
    'state': None,
    'city': None,
    'address': None,
    'postal_code': None,
    'emails': None,
    'created': None,
    'updated': None
}

RIR_WHOIS = {
    'arin': {
        'server': 'whois.arin.net',
        'fields': {
            'name': r'(NetName):[^\S\n]+(?P<val>.+?)\n',
            'handle': r'(NetHandle):[^\S\n]+(?P<val>.+?)\n',
            'description': r'(OrgName|CustName):[^\S\n]+(?P<val>.+?)'
                    '(?=(\n\\S):?)',
            'country': r'(Country):[^\S\n]+(?P<val>.+?)\n',
            'state': r'(StateProv):[^\S\n]+(?P<val>.+?)\n',
            'city': r'(City):[^\S\n]+(?P<val>.+?)\n',
            'address': r'(Address):[^\S\n]+(?P<val>.+?)(?=(\n\S):?)',
            'postal_code': r'(PostalCode):[^\S\n]+(?P<val>.+?)\n',
            'emails': (
                r'.+?:.*?[^\S\n]+(?P<val>[\w\-\.]+?@[\w\-\.]+\.[\w\-]+)('
                '[^\\S\n]+.*?)*?\n'
            ),
            'created': r'(RegDate):[^\S\n]+(?P<val>.+?)\n',
            'updated': r'(Updated):[^\S\n]+(?P<val>.+?)\n',
        },
        'dt_format': '%Y-%m-%d'
    },
    'ripencc': {
        'server': 'whois.ripe.net',
        'fields': {
            'name': r'(netname):[^\S\n]+(?P<val>.+?)\n',
            'handle': r'(nic-hdl):[^\S\n]+(?P<val>.+?)\n',
            'description': r'(descr):[^\S\n]+(?P<val>.+?)(?=(\n\S):?)',
            'country': r'(country):[^\S\n]+(?P<val>.+?)\n',
            'address': r'(address):[^\S\n]+(?P<val>.+?)(?=(\n\S):?)',
            'emails': (
                r'.+?:.*?[^\S\n]+(?P<val>[\w\-\.]+?@[\w\-\.]+\.[\w\-]+)('
                '[^\\S\n]+.*?)*?\n'
            ),
            'created': (
                r'(created):[^\S\n]+(?P<val>[0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]'
                '{2}:[0-9]{2}:[0-9]{2}Z).*?\n'
            ),
            'updated': (
                r'(last-modified):[^\S\n]+(?P<val>[0-9]{4}-[0-9]{2}-[0-9]{2}T'
                '[0-9]{2}:[0-9]{2}:[0-9]{2}Z).*?\n'
            )
        },
        'dt_format': '%Y-%m-%dT%H:%M:%SZ'
    },
    'apnic': {
        'server': 'whois.apnic.net',
        'fields': {
            'name': r'(netname):[^\S\n]+(?P<val>.+?)\n',
            'handle': r'(nic-hdl):[^\S\n]+(?P<val>.+?)\n',
            'description': r'(descr):[^\S\n]+(?P<val>.+?)(?=(\n\S):?)',
            'country': r'(country):[^\S\n]+(?P<val>.+?)\n',
            'address': r'(address):[^\S\n]+(?P<val>.+?)(?=(\n\S):?)',
            'emails': (
                r'.+?:.*?[^\S\n]+(?P<val>[\w\-\.]+?@[\w\-\.]+\.[\w\-]+)('
                '[^\\S\n]+.*?)*?\n'
            ),
            'updated': r'(changed):[^\S\n]+.*(?P<val>[0-9]{8}).*?\n'
        },
        'dt_format': '%Y%m%d'
    },
    'lacnic': {
        'server': 'whois.lacnic.net',
        'fields': {
            'handle': r'(nic-hdl):[^\S\n]+(?P<val>.+?)\n',
            'description': r'(owner):[^\S\n]+(?P<val>.+?)(?=(\n\S):?)',
            'country': r'(country):[^\S\n]+(?P<val>.+?)\n',
            'emails': (
                r'.+?:.*?[^\S\n]+(?P<val>[\w\-\.]+?@[\w\-\.]+\.[\w\-]+)('
                '[^\\S\n]+.*?)*?\n'
            ),
            'created': r'(created):[^\S\n]+(?P<val>[0-9]{8}).*?\n',
            'updated': r'(changed):[^\S\n]+(?P<val>[0-9]{8}).*?\n'
        },
        'dt_format': '%Y%m%d'
    },
    'afrinic': {
        'server': 'whois.afrinic.net',
        'fields': {
            'name': r'(netname):[^\S\n]+(?P<val>.+?)\n',
            'handle': r'(nic-hdl):[^\S\n]+(?P<val>.+?)\n',
            'description': r'(descr):[^\S\n]+(?P<val>.+?)(?=(\n\S):?)',
            'country': r'(country):[^\S\n]+(?P<val>.+?)\n',
            'address': r'(address):[^\S\n]+(?P<val>.+?)(?=(\n\S):?)',
            'emails': (
                r'.+?:.*?[^\S\n]+(?P<val>[\w\-\.]+?@[\w\-\.]+\.[\w\-]+)('
                '[^\\S\n]+.*?)*?\n'
            ),
        }
    }
}

RWHOIS = {
    'fields': {
        'cidr': r'(network:IP-Network):(?P<val>.+?)\n',
        'name': r'(network:ID):(?P<val>.+?)\n',
        'description': (
            r'(network:(Org-Name|Organization(;I)?)):(?P<val>.+?)\n'
        ),
        'country': r'(network:(Country|Country-Code)):(?P<val>.+?)\n',
        'state': r'(network:State):(?P<val>.+?)\n',
        'city': r'(network:City):(?P<val>.+?)\n',
        'address': r'(network:Street-Address):(?P<val>.+?)\n',
        'postal_code': r'(network:Postal-Code):(?P<val>.+?)\n',
        'emails': (
            r'.+?:.*?[^\S\n]+(?P<val>[\w\-\.]+?@[\w\-\.]+\.[\w\-]+)('
            '[^\\S\n]+.*?)*?\n'
        ),
        'created': r'(network:Created):(?P<val>.+?)\n',
        'updated': r'(network:Updated):(?P<val>.+?)\n'
    }
}

ASN_REFERRALS = {
    'whois://whois.ripe.net': 'ripencc',
    'whois://whois.apnic.net': 'apnic',
    'whois://whois.lacnic.net': 'lacnic',
    'whois://whois.afrinic.net': 'afrinic',
}


class Whois:
    """
    The class for parsing via whois

    Args:
        net (:obj:`ipwhois.net.Net`): The network object.

    Raises:
        NetError: The parameter provided is not an instance of
            ipwhois.net.Net
        IPDefinedError: The address provided is defined (does not need to be
            resolved).
    """

    def __init__(self, net):

        from .net import Net

        # ipwhois.net.Net validation
        if isinstance(net, Net):

            self._net = net

        else:

            raise NetError('The provided net parameter is not an instance of '
                           'ipwhois.net.Net')

    def parse_fields(self, response, fields_dict, net_start=None,
                     net_end=None, dt_format=None, field_list=None):
        """
        The function for parsing whois fields from a data input.

        Args:
            response (:obj:`str`): The response from the whois/rwhois server.
            fields_dict (:obj:`dict`): The mapping of fields to regex search
                values (required).
            net_start (:obj:`int`): The starting point of the network (if
                parsing multiple networks). Defaults to None.
            net_end (:obj:`int`): The ending point of the network (if parsing
                multiple networks). Defaults to None.
            dt_format (:obj:`str`): The format of datetime fields if known.
                Defaults to None.
            field_list (:obj:`list` of :obj:`str`): If provided, fields to
                parse. Defaults to:

                ::

                    ['name', 'handle', 'description', 'country', 'state',
                    'city', 'address', 'postal_code', 'emails', 'created',
                    'updated']

        Returns:
            dict: A dictionary of fields provided in fields_dict, mapping to
                the results of the regex searches.
        """

        ret = {}

        if not field_list:

            field_list = ['name', 'handle', 'description', 'country', 'state',
                          'city', 'address', 'postal_code', 'emails',
                          'created', 'updated']

        generate = ((field, pattern) for (field, pattern) in
                    fields_dict.items() if field in field_list)

        for field, pattern in generate:

            pattern = re.compile(
                str(pattern),
                re.DOTALL
            )

            if net_start is not None:

                match = pattern.finditer(response, net_end, net_start)

            elif net_end is not None:

                match = pattern.finditer(response, net_end)

            else:

                match = pattern.finditer(response)

            values = []
            sub_section_end = None
            for m in match:

                if sub_section_end:

                    if field not in (
                        'emails'
                    ) and (sub_section_end != (m.start() - 1)):

                        break

                try:

                    values.append(m.group('val').strip())

                except IndexError:

                    pass

                sub_section_end = m.end()

            if len(values) > 0:

                value = None
                try:

                    if field == 'country':

                        value = values[0].upper()

                    elif field in ['created', 'updated'] and dt_format:

                        value = datetime.strptime(
                            values[0],
                            str(dt_format)).isoformat('T')

                    elif field in ['emails']:

                        value = list(unique_everseen(values))

                    else:

                        values = unique_everseen(values)
                        value = '\n'.join(values).strip()

                except ValueError as e:

                    log.debug('Whois field parsing failed for {0}: {1}'.format(
                        field, e))
                    pass

                ret[field] = value

        return ret

    def get_nets_arin(self, response):
        """
        The function for parsing network blocks from ARIN whois data.

        Args:
            response (:obj:`str`): The response from the ARIN whois server.

        Returns:
            list of dict: Mapping of networks with start and end positions.

            ::

                [{
                    'cidr' (str) - The network routing block
                    'start' (int) - The starting point of the network
                    'end' (int) - The endpoint point of the network
                }]
        """

        nets = []

        # Find the first NetRange value.
        pattern = re.compile(
            r'^NetRange:[^\S\n]+(.+)$',
            re.MULTILINE
        )
        temp = pattern.search(response)
        net_range = None
        net_range_start = None
        if temp is not None:
            net_range = temp.group(1).strip()
            net_range_start = temp.start()

        # Iterate through all of the networks found, storing the CIDR value
        # and the start and end positions.
        for match in re.finditer(
            r'^CIDR:[^\S\n]+(.+?,[^\S\n].+|.+)$',
            response,
            re.MULTILINE
        ):

            try:

                net = copy.deepcopy(BASE_NET)

                if len(nets) > 0:
                    temp = pattern.search(response, match.start())
                    net_range = None
                    net_range_start = None
                    if temp is not None:
                        net_range = temp.group(1).strip()
                        net_range_start = temp.start()

                if net_range is not None:
                    if net_range_start < match.start() or len(nets) > 0:

                        try:

                            net['range'] = '{0} - {1}'.format(
                                ip_network(net_range)[0].__str__(),
                                ip_network(net_range)[-1].__str__()
                            ) if '/' in net_range else net_range

                        except ValueError:  # pragma: no cover

                            net['range'] = net_range

                net['cidr'] = ', '.join(
                    [ip_network(c.strip()).__str__()
                     for c in match.group(1).split(', ')]
                )
                net['start'] = match.start()
                net['end'] = match.end()
                nets.append(net)

            except ValueError:

                pass

        return nets

    def get_nets_lacnic(self, response):
        """
        The function for parsing network blocks from LACNIC whois data.

        Args:
            response (:obj:`str`): The response from the LACNIC whois server.

        Returns:
            list of dict: Mapping of networks with start and end positions.

            ::

                [{
                    'cidr' (str) - The network routing block
                    'start' (int) - The starting point of the network
                    'end' (int) - The endpoint point of the network
                }]
        """

        nets = []

        # Iterate through all of the networks found, storing the CIDR value
        # and the start and end positions.
        for match in re.finditer(
            r'^(inetnum|inet6num|route):[^\S\n]+(.+?,[^\S\n].+|.+)$',
            response,
            re.MULTILINE
        ):

            try:

                net = copy.deepcopy(BASE_NET)
                net_range = match.group(2).strip()

                try:

                    net['range'] = net['range'] = '{0} - {1}'.format(
                        ip_network(net_range)[0].__str__(),
                        ip_network(net_range)[-1].__str__()
                    ) if '/' in net_range else net_range

                except ValueError:  # pragma: no cover

                    net['range'] = net_range

                temp = []
                for addr in net_range.split(', '):

                    count = addr.count('.')
                    if count != 0 and count < 4:

                        addr_split = addr.strip().split('/')
                        for i in range(count + 1, 4):
                            addr_split[0] += '.0'

                        addr = '/'.join(addr_split)

                    temp.append(ip_network(addr.strip()).__str__())

                net['cidr'] = ', '.join(temp)
                net['start'] = match.start()
                net['end'] = match.end()
                nets.append(net)

            except ValueError:

                pass

        return nets

    def get_nets_other(self, response):
        """
        The function for parsing network blocks from generic whois data.

        Args:
            response (:obj:`str`): The response from the whois/rwhois server.

        Returns:
            list of dict: Mapping of networks with start and end positions.

            ::

                [{
                    'cidr' (str) - The network routing block
                    'start' (int) - The starting point of the network
                    'end' (int) - The endpoint point of the network
                }]
        """

        nets = []

        # Iterate through all of the networks found, storing the CIDR value
        # and the start and end positions.
        for match in re.finditer(
            r'^(inetnum|inet6num|route):[^\S\n]+((.+?)[^\S\n]-[^\S\n](.+)|'
                '.+)$',
            response,
            re.MULTILINE
        ):

            try:

                net = copy.deepcopy(BASE_NET)
                net_range = match.group(2).strip()

                try:

                    net['range'] = net['range'] = '{0} - {1}'.format(
                        ip_network(net_range)[0].__str__(),
                        ip_network(net_range)[-1].__str__()
                    ) if '/' in net_range else net_range

                except ValueError:  # pragma: no cover

                    net['range'] = net_range

                if match.group(3) and match.group(4):

                    addrs = []
                    addrs.extend(summarize_address_range(
                        ip_address(match.group(3).strip()),
                        ip_address(match.group(4).strip())))

                    cidr = ', '.join(
                        [i.__str__() for i in collapse_addresses(addrs)]
                    )

                else:

                    cidr = ip_network(net_range).__str__()

                net['cidr'] = cidr
                net['start'] = match.start()
                net['end'] = match.end()
                nets.append(net)

            except (ValueError, TypeError):

                pass

        return nets

    def lookup(self, inc_raw=False, retry_count=3, response=None,
               get_referral=False, extra_blacklist=None,
               ignore_referral_errors=False, asn_data=None,
               field_list=None, is_offline=False):
        """
        The function for retrieving and parsing whois information for an IP
        address via port 43/tcp (WHOIS).

        Args:
            inc_raw (:obj:`bool`, optional): Whether to include the raw
                results in the returned dictionary. Defaults to False.
            retry_count (:obj:`int`): The number of times to retry in case
                socket errors, timeouts, connection resets, etc. are
                encountered. Defaults to 3.
            response (:obj:`str`): Optional response object, this bypasses the
                NIR lookup. Required when is_offline=True.
            get_referral (:obj:`bool`): Whether to retrieve referral whois
                information, if available. Defaults to False.
            extra_blacklist (:obj:`list`): Blacklisted whois servers in
                addition to the global BLACKLIST. Defaults to None.
            ignore_referral_errors (:obj:`bool`): Whether to ignore and
                continue when an exception is encountered on referral whois
                lookups. Defaults to False.
            asn_data (:obj:`dict`): Result from
                :obj:`ipwhois.asn.IPASN.lookup` (required).
            field_list (:obj:`list` of :obj:`str`): If provided, fields to
                parse. Defaults to:

                ::

                    ['name', 'handle', 'description', 'country', 'state',
                    'city', 'address', 'postal_code', 'emails', 'created',
                    'updated']

            is_offline (:obj:`bool`): Whether to perform lookups offline. If
                True, response and asn_data must be provided. Primarily used
                for testing. Defaults to False.

        Returns:
            dict: The IP whois lookup results

            ::

                {
                    'query' (str) - The IP address
                    'asn' (str) - The Autonomous System Number
                    'asn_date' (str) - The ASN Allocation date
                    'asn_registry' (str) - The assigned ASN registry
                    'asn_cidr' (str) - The assigned ASN CIDR
                    'asn_country_code' (str) - The assigned ASN country code
                    'asn_description' (str) - The ASN description
                    'nets' (list) - Dictionaries containing network
                        information which consists of the fields listed in the
                        ipwhois.whois.RIR_WHOIS dictionary.
                    'raw' (str) - Raw whois results if the inc_raw parameter
                        is True.
                    'referral' (dict) - Referral whois information if
                        get_referral is True and the server is not blacklisted.
                        Consists of fields listed in the ipwhois.whois.RWHOIS
                        dictionary.
                    'raw_referral' (str) - Raw referral whois results if the
                        inc_raw parameter is True.
                }
        """

        # Create the return dictionary.
        results = {
            'query': self._net.address_str,
            'nets': [],
            'raw': None,
            'referral': None,
            'raw_referral': None
        }

        # The referral server and port. Only used if get_referral is True.
        referral_server = None
        referral_port = 0

        # Only fetch the response if we haven't already.
        if response is None or (not is_offline and
                                asn_data['asn_registry'] != 'arin'):

            log.debug('Response not given, perform WHOIS lookup for {0}'
                      .format(self._net.address_str))

            # Retrieve the whois data.
            response = self._net.get_whois(
                asn_registry=asn_data['asn_registry'], retry_count=retry_count,
                extra_blacklist=extra_blacklist
            )

            if get_referral:

                # Search for a referral server.
                for match in re.finditer(
                    r'^ReferralServer:[^\S\n]+(.+:[0-9]+)$',
                    response,
                    re.MULTILINE
                ):

                    try:

                        temp = match.group(1)
                        if 'rwhois://' not in temp:  # pragma: no cover
                            raise ValueError

                        temp = temp.replace('rwhois://', '').split(':')

                        if int(temp[1]) > 65535:  # pragma: no cover
                            raise ValueError

                        referral_server = temp[0]
                        referral_port = int(temp[1])

                    except (ValueError, KeyError):  # pragma: no cover

                        continue

                    break

        # Retrieve the referral whois data.
        if get_referral and referral_server:

            log.debug('Perform referral WHOIS lookup')

            response_ref = None

            try:

                response_ref = self._net.get_whois(
                    asn_registry='', retry_count=retry_count,
                    server=referral_server, port=referral_port,
                    extra_blacklist=extra_blacklist
                )

            except (BlacklistError, WhoisLookupError):

                if ignore_referral_errors:

                    pass

                else:

                    raise

            if response_ref:

                log.debug('Parsing referral WHOIS data')

                if inc_raw:

                    results['raw_referral'] = response_ref

                temp_rnet = self.parse_fields(
                    response_ref,
                    RWHOIS['fields'],
                    field_list=field_list
                )

                # Add the networks to the return dictionary.
                results['referral'] = temp_rnet

        # If inc_raw parameter is True, add the response to return dictionary.
        if inc_raw:

            results['raw'] = response

        nets = []

        if asn_data['asn_registry'] == 'arin':

            nets_response = self.get_nets_arin(response)

        elif asn_data['asn_registry'] == 'lacnic':

            nets_response = self.get_nets_lacnic(response)

        else:

            nets_response = self.get_nets_other(response)

        nets.extend(nets_response)

        # Iterate through all of the network sections and parse out the
        # appropriate fields for each.
        log.debug('Parsing WHOIS data')
        for index, net in enumerate(nets):

            section_end = None
            if index + 1 < len(nets):

                section_end = nets[index + 1]['start']

            try:

                dt_format = RIR_WHOIS[results['asn_registry']]['dt_format']

            except KeyError:

                dt_format = None

            temp_net = self.parse_fields(
                response,
                RIR_WHOIS[asn_data['asn_registry']]['fields'],
                section_end,
                net['end'],
                dt_format,
                field_list
            )

            # Merge the net dictionaries.
            net.update(temp_net)

            # The start and end values are no longer needed.
            del net['start'], net['end']

        # Add the networks to the return dictionary.
        results['nets'] = nets

        return results