chrislit/abydos

View on GitHub
abydos/util/_data.py

Summary

Maintainability
C
1 day
Test Coverage
# Copyright 2019-2020 by Christopher C. Little.
# This file is part of Abydos.
#
# Abydos is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Abydos is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Abydos. If not, see <http://www.gnu.org/licenses/>.

"""abydos.util._data.

The util._data module manages datasets from
https://github.com/chrislit/abydos-data, including downloading them,
decompressing them, and locating them once installed.

Much of this is copied from NLTK's similar facility in
http://www.nltk.org/_modules/nltk/data.html, because they seem to have the
issues figured out, because I don't want to expend the effort to re-invent a
solution, and because their license (Apache) allows for it.
"""

import os
import re
import sys
import urllib.request
import zipfile

from typing import List, Match, Optional, Tuple, cast
from xml.etree import ElementTree  # noqa: S405

__all__ = [
    'data_path',
    'download_package',
    'list_available_packages',
    'list_installed_packages',
    'package_path',
]


DATA_SUBDIRS = ['corpora']
INDEX_URL = (
    'https://raw.githubusercontent.com/chrislit/abydos-data/master/index.xml'
)

data_path = []  # type: List[str]
"""A list of directories where the Abydos data package might reside.
   These directories will be checked in order when looking for a
   resource in the data package.  Note that this allows users to
   substitute in their own versions of resources, if they have them
   (e.g., in their home directory under ~/abydos_data)."""

# User-specified locations:
_paths_from_env = os.environ.get('ABYDOS_DATA', str('')).split(
    os.pathsep
)  # pragma: no cover
data_path += [d for d in _paths_from_env if d]
if 'APPENGINE_RUNTIME' not in os.environ and os.path.expanduser('~/') != '~/':
    data_path.append(os.path.expanduser(str('~/abydos_data')))

if sys.platform.startswith('win'):  # pragma: no cover
    # Common locations on Windows:
    data_path += [
        os.path.join(sys.prefix, str('abydos_data')),
        os.path.join(sys.prefix, str('share'), str('abydos_data')),
        os.path.join(sys.prefix, str('lib'), str('abydos_data')),
        os.path.join(
            os.environ.get(str('APPDATA'), str('C:\\')), str('abydos_data')
        ),
        str(r'C:\abydos_data'),
        str(r'D:\abydos_data'),
        str(r'E:\abydos_data'),
    ]
else:
    # Common locations on UNIX & OS X:
    data_path += [
        os.path.join(sys.prefix, str('abydos_data')),
        os.path.join(sys.prefix, str('share'), str('abydos_data')),
        os.path.join(sys.prefix, str('lib'), str('abydos_data')),
        str('/usr/share/abydos_data'),
        str('/usr/local/share/abydos_data'),
        str('/usr/lib/abydos_data'),
        str('/usr/local/lib/abydos_data'),
    ]


def package_path(resource_name: str) -> str:
    """Given a resource name, returns the path to the package."""
    for path in data_path:
        for subdir in DATA_SUBDIRS:
            check_path = os.path.join(path, subdir, resource_name)
            if os.path.isdir(check_path):
                return check_path
    msg = 'Data package not found. You may need to install or re-install it.'
    raise FileNotFoundError(msg)


def list_installed_packages(
    path: Optional[str] = None,
) -> List[Tuple[str, str, float]]:
    """List all installed data packages."""
    if path:
        paths = [path]
    else:
        paths = data_path
    packages = []
    for path in paths:
        for subdir in DATA_SUBDIRS:
            check_path = os.path.join(path, subdir)
            if os.path.isdir(check_path):
                possible_packages = os.listdir(check_path)
                for package in possible_packages:
                    if os.path.isdir(os.path.join(check_path, package)):
                        with open(
                            os.path.join(check_path, package + '.xml')
                        ) as xml:
                            file = xml.read()
                            name = cast(
                                Match[str], re.search(r'name="([^"]+)"', file)
                            ).group(1)
                            version = cast(
                                Match[str],
                                re.search(r'version="([^"]+)"', file),
                            ).group(1)
                        packages.append((package, name, float(version)))
    return packages


def list_available_packages(
    url: Optional[str] = None,
) -> Tuple[
    List[Tuple[str, str, float, str, str, str]],
    List[Tuple[str, str, List[str]]],
]:
    """List all data packages available for install."""
    installed_packages = {_[0]: _[2] for _ in list_installed_packages()}

    if url is None:
        url = INDEX_URL
    if url[:8] != 'https://':
        raise ValueError('url should begin with "https://"')
    with urllib.request.urlopen(url) as ix:  # noqa: S310
        xml = ElementTree.fromstring(ix.read())  # noqa: S314

    packages = [
        (
            _.attrib['id'],
            _.attrib['name'],
            float(_.attrib['version']),
            _.attrib['url'],
            _.attrib['subdir'],
            'not-installed'
            if _.attrib['id'] not in installed_packages
            else (
                'up-to-date'
                if installed_packages[_.attrib['id']]
                >= float(_.attrib['version'])
                else 'update available'
            ),
        )
        for _ in xml.findall('packages/package')
    ]
    collections = [
        (
            _.attrib['id'],
            _.attrib['name'],
            [__.attrib['ref'] for __ in _.findall('item')],
        )
        for _ in xml.findall('collections/collection')
    ]
    return packages, collections


def _default_download_dir() -> Optional[str]:
    """Return the directory to which packages will be downloaded by default.

    This is mostly copied from NLTK's
    nltk.downloader.Downloader.default_download_dir

    """
    # Check if we are on GAE where we cannot write into filesystem.
    if 'APPENGINE_RUNTIME' in os.environ:  # pragma: no cover
        return None

    # Check if we have sufficient permissions to install in a
    # variety of system-wide locations.
    for abydos_data in data_path:
        if os.path.exists(abydos_data) and os.access(
            abydos_data, os.W_OK
        ):  # pragma: no cover
            return abydos_data

    # On Windows, use %APPDATA%
    if sys.platform == 'win32' and 'APPDATA' in os.environ:  # pragma: no cover
        homedir = os.environ['APPDATA']

    # Otherwise, install in the user's home directory.
    else:  # pragma: no cover
        homedir = os.path.expanduser('~/')
        if homedir == '~/':
            raise ValueError('Could not find a default download directory')

    # append "abydos_data" to the home directory
    return os.path.join(homedir, 'abydos_data')  # pragma: no cover


def download_package(
    resource_name: str,
    url: Optional[str] = None,
    data_path: Optional[str] = None,
    force: bool = False,
    silent: bool = False,
) -> None:
    """Download and install a package or collection."""
    packages, collections = list_available_packages(url)
    installed = list_installed_packages(data_path)
    if data_path is None:
        data_path = _default_download_dir()
    os.makedirs(cast(str, data_path), mode=0o775, exist_ok=True)

    for coll in collections:
        if resource_name == coll[0]:
            if not silent:  # pragma: no branch
                print('Installing {} collection'.format(coll[1]))  # noqa: T001
            for resource_name in coll[2]:
                download_package(resource_name, url, data_path)
            return
    else:
        for pack in packages:
            if resource_name == pack[0]:
                if not force:
                    for inst in installed:  # pragma: no branch
                        if pack[0] == inst[0] and pack[2] <= inst[2]:
                            if not silent:
                                print(  # pragma: no cover  # noqa: T001
                                    '{} package already up-to-date'.format(
                                        pack[1]
                                    )
                                )
                            return
                if not silent:  # pragma: no branch
                    print(  # noqa: T001
                        'Installing {} package'.format(pack[1])
                    )
                zip_fn = os.path.join(
                    cast(str, data_path), pack[4], pack[0] + '.zip'
                )
                os.makedirs(
                    os.path.join(cast(str, data_path), pack[4]),
                    mode=0o775,
                    exist_ok=True,
                )
                urllib.request.urlretrieve(  # noqa: S310
                    pack[3][:-3] + 'xml', zip_fn[:-3] + 'xml'
                )
                urllib.request.urlretrieve(pack[3], zip_fn)  # noqa: S310
                zip_pkg = zipfile.ZipFile(zip_fn)
                zip_pkg.extractall(os.path.join(cast(str, data_path), pack[4]))
                zip_pkg.close()
                os.remove(zip_fn)


if __name__ == '__main__':
    import doctest

    doctest.testmod()