csirtgadgets/silver-meme

View on GitHub
csirtg_indicator/utils/geo.py

Summary

Maintainability
B
4 hrs
Test Coverage
import logging
import os
import geoip2.database
import re
from pprint import pprint
import sys
from geoip2.errors import AddressNotFoundError

from csirtg_indicator.utils.network import resolve_fqdn, resolve_url
from csirtg_indicator.constants import FQDN as RESOLVE_FQDN

# more local first, see search path loop
DB_SEARCH_PATHS = [
    '/usr/share/GeoIP',
    '/usr/local/share/GeoIP',
    '/usr/local/var/GeoIP',
    '/var/lib/GeoIP',
    './',
]

CITY_DB_PATH = 'GeoLite2-City.mmdb'
ASN_DB_PATH = 'GeoLite2-ASN.mmdb'
CITY_DB = False

for p in DB_SEARCH_PATHS:
    if os.path.isfile(os.path.join(p, CITY_DB_PATH)):
        CITY_DB = geoip2.database.Reader(os.path.join(p, CITY_DB_PATH))

    if os.path.isfile(os.path.join(p, ASN_DB_PATH)):
        ASN_DB = geoip2.database.Reader(os.path.join(p, ASN_DB_PATH))


def _resolve(indicator):
    if not CITY_DB:
        return

    if indicator.city and indicator.longitude:
        return

    i = indicator.indicator

    if indicator.itype in ['url', 'fqdn']:
        if not indicator.resolve_fqdn:
            return

        if indicator.itype == 'url':
            i = resolve_url(i)

        i = resolve_fqdn(i)
        if not i:
            return

        if not indicator.rdata:
            indicator.rdata = i

    try:
        g = CITY_DB.city(i)
    except AddressNotFoundError as e:
        return

    if g.country.iso_code:
        indicator.cc = g.country.iso_code

    if g.city.name:
        indicator.city = g.city.name

    if g.location.longitude:
        indicator.longitude = g.location.longitude

    if g.location.latitude:
        indicator.latitude = g.location.latitude

    if g.location.latitude and g.location.longitude:
        indicator.location = [g.location.longitude, g.location.latitude]

    if g.location.time_zone:
        indicator.timezone = g.location.time_zone

    try:
        indicator.region = g.subdivisions[0].names['en']
    except Exception:
        pass

    try:
        g = ASN_DB.asn(i)
    except AddressNotFoundError as e:
        return

    if not g:
        return

    if g.autonomous_system_number:
        indicator.asn = g.autonomous_system_number

    if g.autonomous_system_organization:
        indicator.asn_desc = g.autonomous_system_organization


def process(indicator):
    if indicator.itype not in ['ipv4', 'ipv6', 'fqdn', 'url']:
        return indicator

    if indicator.is_private():
        return indicator

    # https://geoip2.readthedocs.org/en/latest/
    i = str(indicator.indicator)
    tmp = indicator.indicator

    if indicator.itype in ['ipv4', 'ipv6']:
        match = re.search(r'^(\S+)\\/\d+$', i)
        if match:
            indicator.indicator = match.group(1)

    try:
        if indicator.indicator:
            _resolve(indicator)
        indicator.indicator = tmp

    except ValueError as e:
        indicator.indicator = tmp

    return indicator


def main():
    # if you include this up top, it ruins the dep chain
    from csirtg_indicator import Indicator

    i = sys.argv[1]

    i = Indicator(i)
    i = process(i)

    pprint(i)


if __name__ == "__main__":
    main()