muneebalam/scrapenhl2

View on GitHub
scrapenhl2/scrape/general_helpers.py

Summary

Maintainability
B
4 hrs
Test Coverage
"""
This module contains general helper methods. None of these methods have dependencies on other scrapenhl2 modules.
"""

import functools
import logging
import os
import os.path
import pickle
import re
import time
import requests

import numpy as np
import pandas as pd
from fuzzywuzzy import fuzz

__SESSION__ = None


def print_and_log(message, level='info', print_and_log=True):
    """
    A helper method that prints message to console and also writes to log with specified level.

    :param message: str, the message
    :param level: str, the level of log: info, warn, error, critical
    :param print_and_log: bool. If False, logs only.

    :return: nothing
    """
    if print_and_log:
        print(message)
    if level == 'warn':
        logging.warning(message)
    elif level == 'error':
        logging.error(message)
    elif level == 'critical':
        logging.critical(message)
    else:
        logging.info(message)


def once_per_second(fn, calls_per_second=1):
    """
    A decorator that sleeps for one second after executing the function. Used when scraping NHL site.
    This also means all functions that access the internet sleep for a second.

    :param fn: the function

    :return: nothing
    """

    @functools.wraps(fn)
    def wrapper(*args, **kwargs):
        time.sleep(1 / calls_per_second)
        return fn(*args, **kwargs)


def log_exceptions(fn):
    """
    A decorator that wraps the passed in function and logs exceptions should one occur

    :param function: the function

    :return: nothing
    """

    @functools.wraps(fn)
    def wrapper(*args, **kwargs):
        try:
            return fn(*args, **kwargs)
        except:
            # log the exception
            err = "There was an exception in  "
            err += fn.__name__
            logging.exception(err)

            # and write their args to file, named after function.
            index = 0  # used in case one function is called multiple times
            fname = get_logging_folder() + "{0:s}{1:d}.pkl".format(fn.__name__, index)
            while os.path.exists(fname):
                index += 1
                fname = get_logging_folder() + "{0:s}{1:d}.pkl".format(fn.__name__, index)

            f = open(fname, "w")
            pickle.dump(args, f)
            pickle.dump(kwargs, f)
            f.close()

            # f = open("example", "r")
            # value1 = pickle.load(f)
            # value2 = pickle.load(f)
            # f.close()

            # re-raise the exception
            raise

    return wrapper


def get_logging_folder():
    return './.logs/'


def start_logging():
    """Clears out logging folder, and starts the log in this folder"""

    if os.path.exists(get_logging_folder()):
        for file in os.listdir(get_logging_folder()):
            os.remove(get_logging_folder() + file)
    else:
        os.mkdir(get_logging_folder())

    logging.basicConfig(level=logging.DEBUG, filemode="w",
                        format="%(asctime)-15s %(levelname)-8s %(message)s",
                        filename=get_logging_folder() + 'logfile.log')


start_logging()


def check_types(obj):
    """
    A helper method to check if obj is int, float, np.int64, or str. This is frequently needed, so is helpful.

    :param obj: the object to check the type

    :return: bool
    """
    return check_number(obj) or isinstance(obj, str)


def check_number(obj):
    """
    A helper method to check if obj is int, float, np.int64, etc. This is frequently needed, so is helpful.

    :param obj: the object to check the type

    :return: bool
    """
    return isinstance(obj, int) or isinstance(obj, float) or isinstance(obj, np.number)


def check_number_last_first_format(name):
    """
    Checks if specified name looks like "8 Ovechkin, Alex"

    :param name: str

    :return: bool
    """
    if re.match('^\d{1,2}\s*[A-Z]+\s*[A-Z]+', name.replace("'", '')) is None:  # added in apostrophe case for O'Brien
        return False
    return True


@functools.lru_cache(maxsize=128, typed=False)
def infer_season_from_date(date):
    """
    Looks at a date and infers the season based on that: Year-1 if month is Aug or before; returns year otherwise.

    :param date: str, YYYY-MM-DD

    :return: int, the season. 2007-08 would be 2007.
    """
    season, month, day = [int(x) for x in date.split('-')]
    if month < 9:
        season -= 1
    return season


def mmss_to_secs(strtime):
    """
    Converts time from mm:ss to seconds

    :param strtime: str, mm:ss

    :return: int
    """
    mins, sec = strtime.split(':')
    return 60 * int(mins) + int(sec)


def try_to_access_dict(base_dct, *keys, **kwargs):
    """
    A helper method that accesses base_dct using keys, one-by-one. Returns None if a key does not exist.

    :param base_dct: dict, a dictionary
    :param keys: str, int, or other valid dict keys
    :param kwargs: can specify default using kwarg default_return=0, for example.

    :return: obj, base_dct[key1][key2][key3]... or None if a key is not in the dictionary
    """
    temp = base_dct
    default_return = None
    for k, v in kwargs.items():
        default_return = v

    try:
        for key in keys:
            temp = temp[key]
        return temp
    except KeyError:  # for string keys
        return default_return
    except IndexError:  # for array indices
        return default_return
    except TypeError:  # might not be a dictionary or list
        return default_return


def add_sim_scores(df, name):
    """
    Adds fuzzywuzzy's token set similarity scores to provded dataframe

    :param df: pandas dataframe with column Name
    :param name: str, name to compare to

    :return: df with an additional column SimScore
    """
    df.loc[:, 'SimScore'] = df.Name.apply(lambda x: fuzz.token_set_ratio(name, x))
    return df


def fuzzy_match_player(name_provided, names, minimum_similarity=50):
    """
    This method checks similarity between each entry in names and the name_provided using token set matching and
    returns the entry that matches best. Returns None if no similarity is greater than minimum_similarity.
    (See e.g. http://chairnerd.seatgeek.com/fuzzywuzzy-fuzzy-string-matching-in-python/)

    :param name_provided: str, name to look for
    :param names: list (or ndarray, or similar) of
    :param minimum_similarity: int from 0 to 100, minimum similarity. If all are below this, returns None.

    :return: str, string in names that best matches name_provided
    """
    df = pd.DataFrame({'Name': names})
    df = add_sim_scores(df, name_provided)
    df = df.sort_values(by='SimScore', ascending=False).query('SimScore >= {0:f}'.format(minimum_similarity))
    if len(df) == 0:
        print('Could not find match for {0:s}'.format(name_provided))
        return None
    else:
        # print(df.iloc[0])
        return df.Name.iloc[0]


def intervals(lst, interval_pct=10):
    """
    A method that divides list into intervals and returns tuples indicating each interval mark.
    Useful for giving updates when cycling through games.

    :param lst: lst to divide
    :param interval_pct: int, pct for each interval to represent. e.g. 10 means it will mark every 10%.

    :return: a list of tuples of (index, value)
    """

    lst = sorted(lst)
    dfintervals = []
    i = 0
    while True:
        frac = interval_pct / 100 * i
        index = round(len(lst) * frac)
        if index >= len(lst):
            break
        val = lst[index]
        dfintervals.append((index, val))
        i += 1
    return dfintervals


def remove_leading_number(string):
    """
    Will convert 8 Alex Ovechkin to Alex Ovechkin, or Alex Ovechkin to Alex Ovechkin

    :param string: a string

    :return: string without leading numbers
    """
    newstring = string
    while newstring[0] in {'1', '2', '3', '4', '5', '6', '7', '8', '9', '0'}:
        newstring = newstring[1:]
    return newstring.strip()


def flip_first_last(name):
    """
    Changes Ovechkin, Alex to Alex Ovechkin. Also changes to title case.

    :param name: str

    :return: str, flipped if applicable
    """
    if ',' not in name:
        return name

    # What about case of , Jr or , IV? Ignore for now
    newname = ' '.join([x.strip() for x in name.split(',')[::-1]])
    return newname.title()


def period_contribution(x):
    """
    Turns period--1, 2, 3, OT, etc--into # of seconds elapsed in game until start.
    :param x: str or int, 1, 2, 3, etc
    :return: int, number of seconds elapsed until start of specified period
    """
    try:
        x = int(x)
        return 1200 * (x - 1)
    except ValueError:
        return 3600 if x == 'OT' else 3900  # OT or SO


def get_lastname(pname):
    """
    Splits name on first space and returns second part.

    :param pname: str, player name

    :return: str, player last name
    """
    return pname.split(' ', 1)[1]


def get_initials(pname):
    """
    Splits name on spaces and returns first letter from each part.

    :param pname: str, player name

    :return: str, player initials
    """
    return ''.join([part[0] for part in pname.split(' ')])


def try_url_n_times(url, timeout=5, n=5):
    """
    A helper method that tries to access given url up to five times, returning the page.

    :param url: str, the url to access
    :param timeout: int, number of secs to wait before timeout. Default 5.
    :param n: int, the max number of tries. Default 5.

    :return: bytes
    """

    global __SESSION__
    if __SESSION__ is None:
        __SESSION__ = requests.Session()


    page = None
    for tries in range(n):
        try:
            resp = __SESSION__.get(url, timeout=5)
            page = resp.text
            break
        except requests.HTTPError as httpe:
            if '404' in str(httpe):
                break
            else:
                print('HTTP error with', url, httpe, httpe.args)
        except requests.exceptions.ReadTimeout as rt:
            print(rt)
            print('Failed on try {} on url {}'.format(tries, url))
        except Exception as e:  # timeout
            print(e)
            print('Could not access {0:s}; try {1:d} of {2:d}'.format(url, tries, n))
    return page

def melt_helper(df, **kwargs):
    """
    Earlier versions of pandas do not support pd.DataFrame.melt. This helps to bridge the gap.
    It first tries df.melt, and if that doesn't work, it uses pd.melt.

    :param df: dataframe
    :param kwargs: arguments to pd.melt or pd.DataFrame.melt.

    :return: melted dataframe
    """

    try:
        return df.melt(**kwargs)
    except AttributeError:
        return pd.melt(df, **kwargs)


def anti_join(df1, df2, **kwargs):
    """
    Anti-joins two dataframes.

    :param df1: dataframe
    :param df2: dataframe
    :param kwargs: keyword arguments as passed to pd.DataFrame.merge (except for 'how'). Specifically, need join keys.

    :return: dataframe
    """

    return df1.merge(df2, how='left', indicator=True, **kwargs) \
        .query('_merge != "both"') \
        .drop('_merge', axis=1)


def fill_join(df1, df2, **kwargs):
    """
    Uses data from df2 to fill in missing values from df1. Helpful when you have to join using multiple data sources.
    Preserves data order. Won't work when joining introduces duplicates.

    :param df1: dataframe
    :param df2: dataframe
    :param kwargs: keyword arguments as passed to pd.DataFrame.merge (except for 'how' and 'suffixes')

    :return: dataframe
    """

    tmp = df1.merge(df2, how='left', suffixes=['', '_drop'], **kwargs)

    cols1 = set(df1.columns)
    cols2 = set(df2.columns)
    for col in cols2:
        if col in cols1 and col + '_drop' in tmp:
            df1.loc[:, col] = df1[col].fillna(tmp[col + '_drop'])

    return df1