pybliometrics-dev/pybliometrics

View on GitHub
pybliometrics/scopus/utils/parse_content.py

Summary

Maintainability
A
2 hrs
Test Coverage
from collections import namedtuple
from functools import reduce
from warnings import warn


def filter_digits(s):
    """Helper function to remove non-digits characters from a string."""
    return "".join(filter(str.isdigit, s))


def chained_get(container, path, default=None):
    """Helper function to perform a series of .get() methods on a dictionary
    or return the `default`.

    Parameters
    ----------
    container : dict
        The dictionary on which the .get() methods should be performed.

    path : list or tuple
        The list of keys that should be searched for.

    default : any (optional, default=None)
        The object type that should be returned if the search yields
        no result.
    """
    # Obtain value via reduce
    try:
        return reduce(lambda c, k: c.get(k, default), path, container)
    except (AttributeError, TypeError):
        return default


def check_integrity(tuples, fields, action):
    """Check integrity of specific fields in a list of tuples and perfom
    provided action.
    """
    for field in fields:
        elements = [getattr(e, field) for e in tuples]
        if None not in elements:
            continue
        msg = "Parsed information doesn't pass integrity check because of "\
              f"incomplete information in field '{field}'"
        if action == "raise":
            raise AttributeError(msg)
        elif action == "warn":
            warn(msg)


def check_field_consistency(needles, haystack):
    """Raise ValueError if elements of a list are not present in a string."""
    wrong = set(needles) - set(haystack.split())
    if wrong:
        msg = f"Element(s) '{', '.join(sorted(wrong))}' not allowed in "\
              "parameter integrity_fields"
        raise ValueError(msg)


def deduplicate(lst):
    """Auxiliary function to deduplicate a list while preserving its order."""
    new = reduce(lambda x, y: x + y if y[0] not in x else x,
                 map(lambda x: [x], lst),
                 [])
    return new


def get_id(s, integer=True):
    """Helper function to return the Scopus ID at a fixed position."""
    path = ['coredata', 'dc:identifier']
    try:
        return int(chained_get(s, path, "").split(':')[-1])
    except ValueError:
        return None


def get_freetoread(item, path, default):
    """Helper function to return freetoread information from search results."""
    text = chained_get(item, path, default)
    try:
        text = text[-1]["$"]
    except TypeError:
        pass
    return text


def get_link(dct, idx, path=['coredata', 'link']):
    """Helper function to return the link at position `idx` from coredata."""
    links = chained_get(dct, path, [{}])
    try:
        return links[idx].get('@href')
    except IndexError:
        return None


def html_unescape(s:str): 
        from html import unescape
        return unescape(s) if s else None


def listify(element):
    """Helper function to turn an element into a list if it isn't a list yet.
    """
    if isinstance(element, list):
        return element
    else:
        return [element]


def make_float_if_possible(val):
    """Attempt a conversion to float type."""
    try:
        return float(val)
    except TypeError:
        return val


def make_int_if_possible(val):
    """Attempt a conversion to int type."""
    try:
        return int(val)
    except TypeError:
        return val


def make_search_summary(self, keyword, results, joiner="\n    "):
    """Create string for str dunder of search classes."""
    date = self.get_cache_file_mdate().split()[0]
    if self._n != 1:
        appendix = "s"
        verb = "have"
    else:
        appendix = ""
        verb = "has"
    s = f"Search '{self._query}' yielded {self._n:,} "\
        f"{keyword}{appendix} as of {date}"
    if results:
        s += ":" + joiner + joiner.join(results)
    elif self._n:
        s += f", which {verb} not been downloaded"
    return s


def parse_affiliation(affs, view):
    """Helper function to parse list of affiliation-related information."""
    order = 'id parent type relationship afdispname preferred_name '\
            'parent_preferred_name country_code country address_part city '\
            'state postal_code org_domain org_URL'
    aff = namedtuple('Affiliation', order, defaults=(None,) * len(order.split()))
    out = []

    if view in ('STANDARD', 'ENHANCED'):
        for item in listify(affs):
            if not item:
                continue
            doc = item.get('ip-doc', {}) or {}
            address = doc.get('address', {}) or {}
            try:
                parent = int(item['@parent'])
            except KeyError:
                parent = None
            new = aff(id=int(item['@affiliation-id']), parent=parent,
                type=doc.get('@type'), relationship=doc.get('@relationship'),
                afdispname=doc.get('@afdispname'),
                preferred_name=doc.get('preferred-name', {}).get('$'),
                parent_preferred_name=doc.get('parent-preferred-name', {}).get('$'),
                country_code=address.get('@country'), country=address.get('country'),
                address_part=address.get("address-part"), city=address.get('city'),
                postal_code=address.get('postal-code'), state=address.get('state'),
                org_domain=doc.get('org-domain'), org_URL=doc.get('org-URL'))
            if any(val for val in new):
                out.append(new)
    elif view == 'LIGHT':
        new = aff(preferred_name=affs.get('affiliation-name'),
                  city=affs.get('affiliation-city'),
                  country=affs.get('affiliation-country'))
        if any(val for val in new):
            out.append(new)

    return out or None


def parse_date_created(dct):
    """Helper function to parse date-created from profile."""
    date = dct['date-created']
    if date:
        return int(date['@year']), int(date['@month']), int(date['@day'])
    else:
        return None, None, None