bhgomes/oeis

View on GitHub
oeis/client.py

Summary

Maintainability
A
50 mins
Test Coverage
# -*- coding: utf-8 -*- #
#
# oeis/client.py
#
#
# MIT License
#
# Copyright (c) 2019 Brandon Gomes
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
#

"""
Client Interface to OEIS.

"""

from wrapt import ObjectProxy

# ---------------- oeis Library ---------------- #

from .base import name as oeis_name
from .util import import_package, getattrmethod, BoxObject, subset_box


__all__ = (
    "REQUESTS_SUPPORT",
    "REQUESTS_TOOLBELT_SUPPORT",
    "CACHE_CONTROL_SUPPORT",
    "AIOHTTP_SUPPORT",
    "QUERY_FORMAT",
    "ENTRY_FORMAT",
    "BFILE_FORMAT",
    "fetch",
    "afetch",
    "MISSING_PAGE_TEXT",
    "is_404",
    "FAILED_SEARCH_TEXT",
    "is_no_match",
    "query",
    "entry",
    "exists",
    "bfile",
    "bfile_exists",
    "Session",
)


requests, REQUESTS_SUPPORT = import_package("requests")


_, REQUESTS_TOOLBELT_SUPPORT = import_package("requests_toolbelt")


cachecontrol, CACHE_CONTROL_SUPPORT = import_package("cachecontrol")


aiohttp, AIOHTTP_SUPPORT = import_package("aiohttp")


QUERY_FORMAT = "https://oeis.org/search?q={0}&fmt=json"


ENTRY_FORMAT = "https://oeis.org/search?q=id:{0}&fmt=json"


BFILE_FORMAT = "https://oeis.org/A{0}/b{0}.txt"


def get_text(response):
    """
    Get Text from Response.

    :param response:
    :return:
    """
    return getattrmethod(response, "text", "")


def get_json(response):
    """
    Get JSON from Response.

    :param response:
    :return:
    """
    return getattrmethod(response, "json", "")


def _fetch(url, session, *, as_json=False):
    """
    Fetch URL Content via Session.

    :param url:
    :param session:
    :param as_json:
    :return:
    """
    with session.get(url) as response:
        return get_json(response) if as_json else get_text(response)


async def _afetch(url, session, *, as_json=False):
    """
    Asynchronously Fetch URL Content via Session.

    :param url:
    :param session:
    :param as_json:
    :return:
    """
    async with session.get(url) as response:
        return await get_json(response) if as_json else get_text(response)


def _requests_fetch(url, session=requests, *args, **kwargs):
    """
    Default _requests_ Fetch.

    :param url:
    :param session:
    :param args:
    :param kwargs:
    :return:
    """
    return _fetch(url, session, *args, **kwargs)


class _aiohttp_mock_client_session:
    """Imitated _requests_ Get API."""

    async def get(*args, **kwargs):
        """
        Asynchronous Get.

        :param args:
        :param kwargs:
        :return:
        """
        async with aiohttp.ClientSession() as session:
            return await session.get(*args, **kwargs)


async def _aiohttp_afetch(url, session=_aiohttp_mock_client_session(), *args, **kwargs):
    """
    Default _aiohttp_ Asynchronous Fetch.

    :param url:
    :param session:
    :param args:
    :param kwargs:
    :return:
    """
    return await _afetch(url, session, *args, **kwargs)


fetch = _requests_fetch if REQUESTS_SUPPORT else _fetch


afetch = _aiohttp_afetch if AIOHTTP_SUPPORT else _afetch


def _fetch_formatted(url, value, *args, as_json=False, **kwargs):
    """
    Fetch Content from Formatted URL.

    :param url:
    :param value:
    :param args:
    :param as_json:
    :param kwargs:
    :return:
    """
    return fetch(url.format(value), *args, as_json=as_json, **kwargs)


async def _afetch_formatted(url, value, *args, as_json=False, **kwargs):
    """
    Asynchronously Fetch Content from Formatted URL.

    :param url:
    :param value:
    :param args:
    :param as_json:
    :param kwargs:
    :return:
    """
    return await afetch(url.format(value), *args, as_json=as_json, **kwargs)


MISSING_PAGE_TEXT = (
    "Sorry, the page you requested was not found.",
    "Try the search box at the top of this page.",
)


def is_404(html):
    """
    Check if HTML from OEIS Website is the 404 Page.

    :param html:
    :return:
    """
    lines = iter(html.strip().split("\n"))
    try:
        for line in lines:
            if MISSING_PAGE_TEXT[0] in line:
                return MISSING_PAGE_TEXT[1] in next(lines)
    except StopIteration:
        pass
    return False


FAILED_SEARCH_TEXT = ("Sorry, but the terms do not match anything in the table.",)


def is_no_match(html):
    """
    Check if HTML from OEIS Website is the Missing Match Page.

    :param html:
    :return:
    """
    return any(FAILED_SEARCH_TEXT[0] in line for line in html.strip().split("\n"))


def query(term, *args, **kwargs):
    """
    Search OEIS for Given Term.

    :param term:
    :param args:
    :param kwargs:
    :return:
    """
    if term:
        return _fetch_formatted(QUERY_FORMAT, term, *args, **kwargs)
    raise TypeError("Search Term must be non-empty.")


def entry(number, *args, check_name=True, **kwargs):
    """
    Get OEIS Entry Metadata.

    :param number:
    :param args:
    :param check_name:
    :param kwargs:
    :return:
    """
    if check_name:
        number = oeis_name(number)
    result = _fetch_formatted(ENTRY_FORMAT, number, *args, as_json=True, **kwargs)
    if not result["count"]:
        return BoxObject(None, raw=result)
    return subset_box(result, key=lambda d: d["results"][0], origin_name="raw")


def exists(number, *args, **kwargs):
    """
    Check if Entry is Not None.

    :param number:
    :param args:
    :param kwargs:
    :return:
    """
    # TODO: can be improved
    return bool(entry(number, *args, **kwargs))


def _get_bfile_line_content(line):
    """
    Get B-File Content without Comments.

    :param line:
    :return:
    """
    return line.partition("#")[0]


def _parsed_bfile_lines(lines):
    """
    Parse B-File Lines for Sequence.

    :param lines:
    :return:
    """
    lines = iter(lines)
    for line in lines:
        start = _get_bfile_line_content(line)
        if start:
            yield from map(int, start.split())
            break
    for line in lines:
        start = _get_bfile_line_content(line)
        if start:
            yield int(start.split()[1])


def bfile(number, *args, check_name=True, starting_index=0, **kwargs):
    """
    Get B-File associated to OEIS Entry.

    :param number:
    :param args:
    :param check_name:
    :param starting_index:
    :param kwargs:
    :return:
    """
    if check_name:
        number = oeis_name(number)
    html = _fetch_formatted(BFILE_FORMAT, number[1:], *args, **kwargs).strip()
    sequence = _parsed_bfile_lines(html.split("\n"))
    offset = 0
    try:
        offset = next(sequence)
        for _ in range(starting_index):
            next(sequence)
    except StopIteration:
        pass
    except Exception as error:
        if not is_404(html):
            raise error
    return BoxObject(tuple(sequence), offset=offset)


def bfile_exists(number, *args, **kwargs):
    """
    Check if B-File Exists for an OEIS Entry.

    :param number:
    :param args:
    :param kwargs:
    :return:
    """
    # TODO: can be improved
    return bool(bfile(number, *args, **kwargs))


class Session(ObjectProxy):
    """Session Wrapper."""

    def __init__(self, session):
        """
        Initialize Session.

        :param session:
        """
        super().__init__(session)

    def fetch(self, url, *args, **kwargs):
        """
        Session fetch Wrapper.

        :param url:
        :param args:
        :param kwargs:
        :return:
        """
        return fetch(url, self.__wrapped__, *args, **kwargs)

    def query(self, term, *args, **kwargs):
        """
        Session query Wrapper.

        :param term:
        :param args:
        :param kwargs:
        :return:
        """
        return query(term, self.__wrapped__, *args, **kwargs)

    def entry(self, number, *args, **kwargs):
        """
        Session entry Wrapper.

        :param number:
        :param args:
        :param kwargs:
        :return:
        """
        return entry(number, self.__wrapped__, *args, **kwargs)

    def exists(self, number, *args, **kwargs):
        """
        Session exists Wrapper.

        :param number:
        :param args:
        :param kwargs:
        :return:
        """
        return exists(number, self.__wrapped__, *args, **kwargs)

    def bfile(self, number, *args, **kwargs):
        """
        Session bfile Wrapper.

        :param number:
        :param args:
        :param kwargs:
        :return:
        """
        return bfile(number, self.__wrapped__, *args, **kwargs)

    def bfile_exists(self, number, *args, **kwargs):
        """
        Session bfile_exists Wrapper.

        :param number:
        :param args:
        :param kwargs:
        :return:
        """
        return bfile_exists(number, self.__wrapped__, *args, **kwargs)