michadenheijer/pynytimes

View on GitHub
pynytimes/helpers/article_search.py

Summary

Maintainability
A
0 mins
Test Coverage
"""Article Search helper functions"""
# Import typings dependencies
from __future__ import annotations
from typing import Any, Final, Optional, Union

# Import Python dependencies
import datetime
import warnings

NoneType: Final = type(None)

LARGE_RESULTS_WARN = 100
MAXIMUM_RESULTS = 2010

# FIXME not all filters are implemented

# Set query options that are currently supported
CURRENT_FILTER_SUPPORT: list[str] = [
    "source",
    "news_desk",
    "section_name",
    "glocation",
    "type_of_material",
    "subject",
    "body",
    "headline",
]


def _article_search_result_warnings(results: int):
    # Show warnings when a lot of results are requested
    if results >= LARGE_RESULTS_WARN:
        warnings.warn(
            "Asking for a lot of results, because of rate"
            + " limits it can take a while."
        )

    # Show waring when above maximum amount of results
    if results >= MAXIMUM_RESULTS:
        warnings.warn(
            "Asking for more results then the API can provide,"
            + "loading maximum results."
        )


def _article_search_check_type(
    query: Optional[str],
    dates: dict[str, Union[datetime.date, datetime.datetime, None]],
    options: dict[str, Any],
    results: int,
):
    # Check if types are correct
    if not isinstance(query, (str, NoneType)):
        raise TypeError("Query needs to be None or str")

    if not isinstance(dates, dict):
        raise TypeError("Dates needs to be a dict")

    if not isinstance(options, dict):
        raise TypeError("Options needs to be a dict")

    if not isinstance(results, (int, NoneType)):
        raise TypeError("Results needs to be None or int")


def _article_search_check_sort_options(options: dict[str, str]):
    # Get and check if sort option is valid
    sort = options.get("sort")

    if sort not in [None, "newest", "oldest", "relevance"]:
        raise ValueError("Sort option is not valid")


def _article_search_check_date_types(dates: dict[str, Any]):
    # Raise error if date is incorrect type
    date_types = (datetime.datetime, datetime.date, NoneType)

    begin_date = dates.get("begin_date")
    if not isinstance(begin_date, date_types):
        raise TypeError(
            "Begin date needs to be datetime.datetime, datetime.date or None"
        )

    end_date = dates.get("end_date")
    if not isinstance(end_date, date_types):
        raise TypeError(
            "End date needs to be datetime.datetime, datetime.date or None"
        )


def article_search_check_input(
    query: Optional[str],
    dates: dict[str, Union[datetime.date, datetime.datetime, None]],
    options: dict[str, Any],
    results: int,
) -> None:
    """Check input of article_search"""
    _article_search_check_type(query, dates, options, results)
    _article_search_check_sort_options(options)
    _article_search_check_date_types(dates)
    _article_search_result_warnings(results)


def _convert_date_to_str(
    date: Union[datetime.datetime, datetime.date, None]
) -> Optional[str]:
    if date is not None:
        return datetime.datetime(date.year, date.month, date.day).strftime(
            "%Y%m%d"
        )

    return None


def article_search_parse_dates(
    dates: dict[str, Union[datetime.datetime, datetime.date, None]]
) -> tuple[Optional[str], Optional[str]]:
    """Parse dates into options"""
    # Get dates if defined
    begin_date = dates.get("begin")
    end_date = dates.get("end")
    return (_convert_date_to_str(begin_date), _convert_date_to_str(end_date))


def _filter_input(values: list) -> str:
    if not isinstance(values, list):
        raise TypeError(
            "One of the parameters in the Article Search function,"
            " is not a list while it should be"
        )

    input = ""
    # Add all the data in the list to the filter
    for i, value in enumerate(values):
        input += f'"{value}"'
        if i < len(values) - 1:
            input += " "

    return input


def article_search_parse_options(options: dict[str, Any]) -> dict:
    """Help to create all fq queries"""
    # pylint: disable=invalid-name
    # Get options already defined in fq (filter query)
    fq = options.get("fq", "")

    # Run for every filter
    for _filter in CURRENT_FILTER_SUPPORT:
        # Get data for filter if it's not defined continue to next filter
        values = options.get(_filter)
        if values is None:
            continue

        # Notice that this does not support OR statements, however
        # implementing would complicate this function a lot.
        # This may be worth it in the future, but not now.

        # Check if filter query is already defined. If it is then add
        # " AND " to the query
        if len(fq) != 0:
            fq += " AND "

        # Add filter
        filter_input = _filter_input(values)
        fq += f"{_filter}:({filter_input})"

        # Remove the filter from options
        options.pop(_filter)

    # Set fq in options
    options["fq"] = fq

    # Return the options
    return options