secynic/ipwhois

View on GitHub
ipwhois/experimental.py

Summary

Maintainability
D
2 days
Test Coverage
# Copyright (c) 2017-2019 Philip Hane
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# 1. Redistributions of source code must retain the above copyright notice,
#    this list of conditions and the following disclaimer.
# 2. Redistributions in binary form must reproduce the above copyright notice,
#    this list of conditions and the following disclaimer in the documentation
#    and/or other materials provided with the distribution.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.

import socket
import logging
import time
from collections import namedtuple

from .exceptions import (ASNLookupError, HTTPLookupError, HTTPRateLimitError,
                         ASNRegistryError)
from .asn import IPASN
from .net import (CYMRU_WHOIS, Net)
from .rdap import RDAP
from .utils import unique_everseen

log = logging.getLogger(__name__)


def get_bulk_asn_whois(addresses=None, retry_count=3, timeout=120):
    """
    The function for retrieving ASN information for multiple IP addresses from
    Cymru via port 43/tcp (WHOIS).

    Args:
        addresses (:obj:`list` of :obj:`str`): IP addresses to lookup.
        retry_count (:obj:`int`): The number of times to retry in case socket
            errors, timeouts, connection resets, etc. are encountered.
            Defaults to 3.
        timeout (:obj:`int`): The default timeout for socket connections in
            seconds. Defaults to 120.

    Returns:
        str: The raw ASN bulk data, new line separated.

    Raises:
        ValueError: addresses argument must be a list of IPv4/v6 address
            strings.
        ASNLookupError: The ASN bulk lookup failed.
    """

    if not isinstance(addresses, list):

        raise ValueError('addresses argument must be a list of IPv4/v6 '
                         'address strings.')

    try:

        # Create the connection for the Cymru whois query.
        conn = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        conn.settimeout(timeout)
        log.debug('ASN bulk query initiated.')
        conn.connect((CYMRU_WHOIS, 43))

        # Query the Cymru whois server, and store the results.
        conn.sendall((
            ' -r -a -c -p -f begin\n{0}\nend'.format(
                '\n'.join(addresses))
        ).encode())

        data = ''
        while True:

            d = conn.recv(4096).decode()
            data += d

            if not d:

                break

        conn.close()

        return str(data)

    except (socket.timeout, socket.error) as e:  # pragma: no cover

        log.debug('ASN bulk query socket error: {0}'.format(e))
        if retry_count > 0:

            log.debug('ASN bulk query retrying (count: {0})'.format(
                str(retry_count)))
            return get_bulk_asn_whois(addresses, retry_count - 1, timeout)

        else:

            raise ASNLookupError('ASN bulk lookup failed.')

    except:  # pragma: no cover

        raise ASNLookupError('ASN bulk lookup failed.')


def bulk_lookup_rdap(addresses=None, inc_raw=False, retry_count=3, depth=0,
                     excluded_entities=None, rate_limit_timeout=60,
                     socket_timeout=10, asn_timeout=240, proxy_openers=None):
    """
    The function for bulk retrieving and parsing whois information for a list
    of IP addresses via HTTP (RDAP). This bulk lookup method uses bulk
    ASN Whois lookups first to retrieve the ASN for each IP. It then optimizes
    RDAP queries to achieve the fastest overall time, accounting for
    rate-limiting RIRs.

    Args:
        addresses (:obj:`list` of :obj:`str`): IP addresses to lookup.
        inc_raw (:obj:`bool`, optional): Whether to include the raw whois
            results in the returned dictionary. Defaults to False.
        retry_count (:obj:`int`): The number of times to retry in case socket
            errors, timeouts, connection resets, etc. are encountered.
            Defaults to 3.
        depth (:obj:`int`): How many levels deep to run queries when additional
            referenced objects are found. Defaults to 0.
        excluded_entities (:obj:`list` of :obj:`str`): Entity handles to not
            perform lookups. Defaults to None.
        rate_limit_timeout (:obj:`int`): The number of seconds to wait before
            retrying when a rate limit notice is returned via rdap+json.
            Defaults to 60.
        socket_timeout (:obj:`int`): The default timeout for socket
            connections in seconds. Defaults to 10.
        asn_timeout (:obj:`int`): The default timeout for bulk ASN lookups in
            seconds. Defaults to 240.
        proxy_openers (:obj:`list` of :obj:`OpenerDirector`): Proxy openers
            for single/rotating proxy support. Defaults to None.

    Returns:
        namedtuple:

        :results (dict): IP address keys with the values as dictionaries
            returned by IPWhois.lookup_rdap().
        :stats (dict): Stats for the lookups:

        ::

            {
                'ip_input_total' (int) - The total number of addresses
                    originally provided for lookup via the addresses argument.
                'ip_unique_total' (int) - The total number of unique addresses
                    found in the addresses argument.
                'ip_lookup_total' (int) - The total number of addresses that
                    lookups were attempted for, excluding any that failed ASN
                    registry checks.
                'ip_failed_total' (int) - The total number of addresses that
                    lookups failed for. Excludes any that failed initially, but
                    succeeded after further retries.
                'lacnic' (dict) -
                {
                    'failed' (list) - The addresses that failed to lookup.
                        Excludes any that failed initially, but succeeded after
                        further retries.
                    'rate_limited' (list) - The addresses that encountered
                        rate-limiting. Unless an address is also in 'failed',
                        it eventually succeeded.
                    'total' (int) - The total number of addresses belonging to
                        this RIR that lookups were attempted for.
                }
                'ripencc' (dict) - Same as 'lacnic' above.
                'apnic' (dict) - Same as 'lacnic' above.
                'afrinic' (dict) - Same as 'lacnic' above.
                'arin' (dict) - Same as 'lacnic' above.
                'unallocated_addresses' (list) - The addresses that are
                    unallocated/failed ASN lookups. These can be addresses that
                    are not listed for one of the 5 RIRs (other). No attempt
                    was made to perform an RDAP lookup for these.
            }

    Raises:
        ASNLookupError: The ASN bulk lookup failed, cannot proceed with bulk
            RDAP lookup.
    """

    if not isinstance(addresses, list):

        raise ValueError('addresses must be a list of IP address strings')

    # Initialize the dicts/lists
    results = {}
    failed_lookups_dict = {}
    rated_lookups = []
    stats = {
        'ip_input_total': len(addresses),
        'ip_unique_total': 0,
        'ip_lookup_total': 0,
        'ip_failed_total': 0,
        'lacnic': {'failed': [], 'rate_limited': [], 'total': 0},
        'ripencc': {'failed': [], 'rate_limited': [], 'total': 0},
        'apnic': {'failed': [], 'rate_limited': [], 'total': 0},
        'afrinic': {'failed': [], 'rate_limited': [], 'total': 0},
        'arin': {'failed': [], 'rate_limited': [], 'total': 0},
        'unallocated_addresses': []
    }
    asn_parsed_results = {}

    if proxy_openers is None:

        proxy_openers = [None]

    proxy_openers_copy = iter(proxy_openers)

    # Make sure addresses is unique
    unique_ip_list = list(unique_everseen(addresses))

    # Get the unique count to return
    stats['ip_unique_total'] = len(unique_ip_list)

    # This is needed for iteration order
    rir_keys_ordered = ['lacnic', 'ripencc', 'apnic', 'afrinic', 'arin']

    # First query the ASN data for all IPs, can raise ASNLookupError, no catch
    bulk_asn = get_bulk_asn_whois(unique_ip_list, timeout=asn_timeout)

    # ASN results are returned as string, parse lines to list and remove first
    asn_result_list = bulk_asn.split('\n')
    del asn_result_list[0]

    # We need to instantiate IPASN, which currently needs a Net object,
    # IP doesn't matter here
    net = Net('1.2.3.4')
    ipasn = IPASN(net)

    # Iterate each IP ASN result, and add valid RIR results to
    # asn_parsed_results for RDAP lookups
    for asn_result in asn_result_list:

        temp = asn_result.split('|')

        # Not a valid entry, move on to next
        if len(temp) == 1:

            continue

        ip = temp[1].strip()

        # We need this since ASN bulk lookup is returning duplicates
        # This is an issue on the Cymru end
        if ip in asn_parsed_results.keys():  # pragma: no cover

            continue

        try:

            asn_parsed = ipasn.parse_fields_whois(asn_result)

        except ASNRegistryError:  # pragma: no cover

            continue

        # Add valid IP ASN result to asn_parsed_results for RDAP lookup
        asn_parsed_results[ip] = asn_parsed
        stats[asn_parsed['asn_registry']]['total'] += 1

    # Set the list of IPs that are not allocated/failed ASN lookup
    stats['unallocated_addresses'] = list(k for k in addresses if k not in
                                          asn_parsed_results)

    # Set the total lookup count after unique IP and ASN result filtering
    stats['ip_lookup_total'] = len(asn_parsed_results)

    # Track the total number of LACNIC queries left. This is tracked in order
    # to ensure the 9 priority LACNIC queries/min don't go into infinite loop
    lacnic_total_left = stats['lacnic']['total']

    # Set the start time, this value is updated when the rate limit is reset
    old_time = time.time()

    # Rate limit tracking dict for all RIRs
    rate_tracker = {
        'lacnic': {'time': old_time, 'count': 0},
        'ripencc': {'time': old_time, 'count': 0},
        'apnic': {'time': old_time, 'count': 0},
        'afrinic': {'time': old_time, 'count': 0},
        'arin': {'time': old_time, 'count': 0}
    }

    # Iterate all of the IPs to perform RDAP lookups until none are left
    while len(asn_parsed_results) > 0:

        # Sequentially run through each RIR to minimize lookups in a row to
        # the same RIR.
        for rir in rir_keys_ordered:

            # If there are still LACNIC IPs left to lookup and the rate limit
            # hasn't been reached, skip to find a LACNIC IP to lookup
            if (
                rir != 'lacnic' and lacnic_total_left > 0 and
                (rate_tracker['lacnic']['count'] != 9 or
                    (time.time() - rate_tracker['lacnic']['time']
                     ) >= rate_limit_timeout
                 )
               ):  # pragma: no cover

                continue

            # If the RIR rate limit has been reached and hasn't expired,
            # move on to the next RIR
            if (
                rate_tracker[rir]['count'] == 9 and (
                    (time.time() - rate_tracker[rir]['time']
                     ) < rate_limit_timeout)
               ):  # pragma: no cover

                continue

            # If the RIR rate limit has expired, reset the count/timer
            # and perform the lookup
            elif ((time.time() - rate_tracker[rir]['time']
                   ) >= rate_limit_timeout):  # pragma: no cover

                rate_tracker[rir]['count'] = 0
                rate_tracker[rir]['time'] = time.time()

            # Create a copy of the lookup IP dict so we can modify on
            # successful/failed queries. Loop each IP until it matches the
            # correct RIR in the parent loop, and attempt lookup
            tmp_dict = asn_parsed_results.copy()

            for ip, asn_data in tmp_dict.items():

                # Check to see if IP matches parent loop RIR for lookup
                if asn_data['asn_registry'] == rir:

                    log.debug('Starting lookup for IP: {0} '
                              'RIR: {1}'.format(ip, rir))

                    # Add to count for rate-limit tracking only for LACNIC,
                    # since we have not seen aggressive rate-limiting from the
                    # other RIRs yet
                    if rir == 'lacnic':

                        rate_tracker[rir]['count'] += 1

                    # Get the next proxy opener to use, or None
                    try:

                        opener = next(proxy_openers_copy)

                    # Start at the beginning if all have been used
                    except StopIteration:

                        proxy_openers_copy = iter(proxy_openers)
                        opener = next(proxy_openers_copy)

                    # Instantiate the objects needed for the RDAP lookup
                    net = Net(ip, timeout=socket_timeout, proxy_opener=opener)
                    rdap = RDAP(net)

                    try:

                        # Perform the RDAP lookup. retry_count is set to 0
                        # here since we handle that in this function
                        rdap_result = rdap.lookup(
                            inc_raw=inc_raw, retry_count=0, asn_data=asn_data,
                            depth=depth, excluded_entities=excluded_entities
                        )

                        log.debug('Successful lookup for IP: {0} '
                                  'RIR: {1}'.format(ip, rir))

                        # Lookup was successful, add to result. Set the nir
                        # key to None as this is not supported
                        # (yet - requires more queries)
                        results[ip] = asn_data
                        results[ip].update(rdap_result)

                        results[ip]['nir'] = None

                        # Remove the IP from the lookup queue
                        del asn_parsed_results[ip]

                        # If this was LACNIC IP, reduce the total left count
                        if rir == 'lacnic':

                            lacnic_total_left -= 1

                        log.debug(
                            '{0} total lookups left, {1} LACNIC lookups left'
                            ''.format(str(len(asn_parsed_results)),
                                      str(lacnic_total_left))
                        )

                        # If this IP failed previously, remove it from the
                        # failed return dict
                        if (
                            ip in failed_lookups_dict.keys()
                        ):  # pragma: no cover

                            del failed_lookups_dict[ip]

                        # Break out of the IP list loop, we need to change to
                        # the next RIR
                        break

                    except HTTPLookupError:  # pragma: no cover

                        log.debug('Failed lookup for IP: {0} '
                                  'RIR: {1}'.format(ip, rir))

                        # Add the IP to the failed lookups dict if not there
                        if ip not in failed_lookups_dict.keys():

                            failed_lookups_dict[ip] = 1

                        # This IP has already failed at least once, increment
                        # the failure count until retry_count reached, then
                        # stop trying
                        else:

                            failed_lookups_dict[ip] += 1

                            if failed_lookups_dict[ip] == retry_count:

                                del asn_parsed_results[ip]
                                stats[rir]['failed'].append(ip)
                                stats['ip_failed_total'] += 1

                                if rir == 'lacnic':

                                    lacnic_total_left -= 1

                        # Since this IP failed, we don't break to move to next
                        # RIR, we check the next IP for this RIR
                        continue

                    except HTTPRateLimitError:  # pragma: no cover

                        # Add the IP to the rate-limited lookups dict if not
                        # there
                        if ip not in rated_lookups:

                            rated_lookups.append(ip)
                            stats[rir]['rate_limited'].append(ip)

                        log.debug('Rate limiting triggered for IP: {0} '
                                  'RIR: {1}'.format(ip, rir))

                        # Since rate-limit was reached, reset the timer and
                        # max out the count
                        rate_tracker[rir]['time'] = time.time()
                        rate_tracker[rir]['count'] = 9

                        # Break out of the IP list loop, we need to change to
                        # the next RIR
                        break

    return_tuple = namedtuple('return_tuple', ['results', 'stats'])
    return return_tuple(results, stats)