akissa/clamav-unofficial-updates

View on GitHub
clamav_unofficial_updates/conn.py

Summary

Maintainability
A
35 mins
Test Coverage
# -*- coding: utf-8 -*-
# vim: ai ts=4 sts=4 et sw=4
# clamav-unofficial-updates: ClamAV third party signature updates library
# Copyright (C) 2015  Andrew Colin Kissa <andrew@topdog.za.net>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
"""
clamav-unofficial-updates: ClamAV third party signature updates library

Connection functions
"""
import os
import time
import logging

import certifi

from urlparse import urlparse

from dns.resolver import query, NoAnswer, NXDOMAIN
from urllib3.connectionpool import connection_from_url

from clamav_unofficial_updates.exceptions import ClamAVUUDownloadError, ClamAVUUError

log = logging.getLogger(__name__)


def check_filename(filename, gpg_key):
    """docstring for check_filename"""
    pass


def get_ip_addresses(hostname):
    """Return ip addresses from hostname"""
    try:
        answers = query(hostname, 'A')
        return [rdata.address for rdata in answers]
    except NXDOMAIN:
        return []


def get_addrs(hostname):
    """get addrs"""
    count = 1
    addrs = []
    for passno in range(1, 6):
        count = passno
        log.info("Resolving hostname: %s pass: %d", hostname, passno)
        try:
            addrs = get_ip_addresses(hostname)
            if addrs:
                log.info("Resolved %s to: %s", hostname, ','.join(addrs))
                break
            else:
                log.info("Resolution of %s failed, sleeping 5 secs", hostname)
                time.sleep(5)
        except NoAnswer:
            pass
    if not len(addrs):
        err = "Resolving hostname: %s failed after %d tries" % (hostname, count)
        raise ClamAVUUNameError(err)
    return addrs


def get_url_base(url):
    """Return base url"""
    parsed = urlparse(url)
    return '{0}://{1}'.format(parsed.scheme, parsed.netloc)


def get_url_filename(url):
    """Get the filename part"""
    parsed = urlparse(url)
    return parsed.path or 'no-filename.txt'


def http(url, directory, gpg_key=None):
    "Download a url via http and save to location"
    try:
        conn = connection_from_url(
            get_url_base(url),
            maxsize=20,
            retries=5,
            cert_reqs='CERT_REQUIRED',
            ca_certs=certifi.where(),
        )
        filename = get_url_filename(url)
        req = conn.request('GET', url)
        data = req.data
        if len(data) and req.status == 200:
            filename = os.path.join(directory, filename)
            with open(filename, 'w') as handle:
                handle.write(data)
            return check_file(filename, gpg_key)
        return False
        else:
            raise ClamAVUUDownloadError(
                "Download failed with code: %d" % req.status)
    except ClamAVUUDownloadError:
        raise
    except BaseException as err:
        raise ClamAVUUError(str(err))


def rsync(url, directory, gpg_key=None):
    "Download a url via rsync and save to location"
    cmd = ['rsync', '-ctuz', url, directory]
    try:
        run_cmd(cmd)
        if gpg_key is not None:
            # for ext in ['.md5', '.sig']:
            cmd_ = ['rsync', '-ctuz', '%s.sig' % url, directory]
            run_cmd(cmd_)
        filename = os.path.join(directory, get_url_filename(url))
        return check_file(filename, gpg_key)
    except BaseException as err:
        raise ClamAVUUDownloadError(
            "Download failed with error: %s" % str(err))