BaptisteZloch/Quant-Invest-Lab

View on GitHub
quant_invest_lab/data_provider.py

Summary

Maintainability
C
7 hrs
Test Coverage
from datetime import datetime
from functools import lru_cache
from typing import Optional, Callable, Literal
import os
import pandas as pd
import json
import time
from random import randint
from kucoin.client import Market
from concurrent.futures import ThreadPoolExecutor, as_completed
from tqdm import tqdm

from quant_invest_lab.constants import TIMEFRAME_IN_S, TIMEFRAMES
from quant_invest_lab.types import Timeframe


def build_multi_crypto_dataframe(
    symbols: set,
    drop_na: bool = False,
    column_to_keep: Literal[
        "Close", "Open", "High", "Low", "Volume", "Amount"
    ] = "Close",
    timeframe: Timeframe = "1day",
) -> pd.DataFrame:
    """Build a multi cryptocurrencies dataframe from a set of symbols. This dataframe will contain the closing price of each symbol or any other column specified in `column_to_keep`. This function is useful to build a dataframe for a multi cryptocurrencies portfolio.

    Args:
    -----
        symbols (set): The name of the symbols to download.

        drop_na (bool, optional): Whether or not to drop NaNs. Defaults to False.

        column_to_keep (Literal["Close","Open","High","Low","Volume","Amount"], optional): The name of the column to keep. Defaults to "Close".

        timeframe (Timeframe, optional): The data granularity. Defaults to "1day".

    Returns:
    -----
        pd.DataFrame: The multi cryptocurrencies dataframe.
    """
    df = pd.DataFrame()
    for symbol in tqdm(symbols, desc="Fetching symbols..."):
        currency_df = download_crypto_historical_data(
            symbol, timeframe, compute_return=False, refresh_list_of_symbol=False
        )
        # currency_df.index = currency_df.index.map(
        #     lambda x: datetime(x.year, x.month, x.day)
        # )
        assert (
            column_to_keep in currency_df.columns
        ), f"Column {column_to_keep} not in {symbol} dataframe. Please choose another column."

        df = pd.concat(
            [
                df,
                currency_df[[f"{column_to_keep}"]].rename(
                    columns={f"{column_to_keep}": f"{symbol}_{column_to_keep}"}
                ),
            ],
            axis=1,
        )
    return df if not drop_na else df.dropna()


def download_crypto_historical_data(
    symbol: str,
    timeframe: Timeframe = "1hour",
    compute_return: bool = True,
    refresh_list_of_symbol: bool = True,
    keep_timestamps_col: bool = False,
) -> pd.DataFrame:
    """Fetch a cryptocurrency historical data from Kucoin for a given symbol and timeframe.

    Args:
    ----
        symbol (str): The symbol to download.

        timeframe (Timeframe, optional): The timeframe of the data to download. Defaults to "1hour".

        compute_return (bool, optional): Whether or not to compute the return on close T and T-1. Defaults to True.

        refresh_list_of_symbol (bool, optional): Refresh the list of symbols/ticker from kucoin, it must be true for the first execution of the code. Defaults to True.

        keep_timestamps_col (bool, optional): Whether or not to keep the timestamps. Defaults to False.

    Returns:
    ----
        pd.DataFrame: The historical data of the symbol with date index
    """
    service = CryptoService()
    if refresh_list_of_symbol is True:
        service.refresh_list_of_symbols()  # Uncomment for the first usage
    df = service.get_history_of_symbol(f"{symbol}", timeframe)
    df["Date"] = df["Timestamp"].apply(datetime.fromtimestamp)
    df = df.set_index("Date")
    df = df.loc[~df.index.duplicated(), :]
    df = df.sort_index()

    if compute_return is True:
        df["Returns"] = df["Close"].pct_change()
    if keep_timestamps_col is False:
        df = df.drop(columns=["Timestamp"])

    df = df.fillna(0.0)
    return df  # .asfreq(TIMEFRAME_TO_FREQ[timeframe])#.ffill()


class CryptoService:
    class KucoinDataFetcher:
        __client = Market(url="https://api.kucoin.com")

        def __construct_timestamp_list(
            self,
            start_timestamp: int,
            end_timestamp: int,
            timeframe: Timeframe,
            exchange_limit: int = 1500,
        ) -> list[int]:
            """Private function that generates a list of timestamps spaced of `exchange_limit` times `timeframe`.
            Args:
                start_timestamp (str): The initial timestamp.
                end_timestamp (str): The final timestamp.
                timeframe (Timeframe): The desired timeframe, sould be 1min, 3min, 5min, 15min, 1hour, 4hour, 1day...
                exchange_limit (int, optional): The exchange limit : 1500 for Kucoin here.. Defaults to 1500.
            Returns:
                list[int]: The list of timestamps.
            """
            remaining = (end_timestamp - start_timestamp) // TIMEFRAME_IN_S[timeframe]

            timestamp_i = end_timestamp
            timestamps = [timestamp_i]

            while remaining > exchange_limit:
                timestamp_i = timestamp_i - TIMEFRAME_IN_S[timeframe] * exchange_limit
                remaining = remaining - exchange_limit
                timestamps.append(timestamp_i)

            timestamps.append(start_timestamp)

            return sorted(timestamps, reverse=True)

        @staticmethod
        def __handle_429_error(func: Callable) -> Callable:
            """Static private decorator that handle error 429 response from Kucoin API.
            Args:
                func (Callable): The function to wrap.
            Returns:
                Callable: The wrapper.
            """

            def wrapper(*args, **kwargs):
                passed = False
                while passed == False:
                    try:
                        return func(*args, **kwargs)
                    except:
                        time.sleep(randint(10, 100) / 100)
                        pass

            return wrapper

        @__handle_429_error
        def __get_data(
            self,
            symbol: str,
            start_at: int,
            end_at: int,
            timeframe: Timeframe = "15min",
        ) -> pd.DataFrame:
            """Private function that uses Kucoin API to get the data for a specific symbol and timeframe.
            Args:
                symbol (str): The symbol for the data we want to extract. Defaults to "BTC-USDT".
                start_at (int): The starting timestamp and. Note that this function could only outputs 1500 records. If the timeframe and the timestamps don't satisfy it, it will return a dataframe with 1500 records from the starting timestamp.
                end_at (int): The ending timestamp.
                timeframe (Timeframe, optional): The timeframe, it must be 1min, 3min, 5min, 15min, 1hour, 4hour, 1day... Defaults to '15min'.
            Returns:
                Optional[pd.DataFrame]: The dataframe containing historical records.
            """
            klines = self.__client.get_kline(
                f"{symbol}", timeframe, startAt=start_at, endAt=end_at
            )
            df = pd.DataFrame(
                klines,
                columns=[
                    "Timestamp",
                    "Open",
                    "Close",
                    "High",
                    "Low",
                    "Amount",
                    "Volume",
                ],
                dtype=float,
            )
            df["Timestamp"] = df["Timestamp"].astype(int)

            return df

        def get_symbols(self) -> list[str]:
            """Get a list of all symbols in kucoin
            Returns:
                list[str]: The symbol's list.
            """
            tickers = self.__client.get_all_tickers()
            if tickers is not None:
                return [tik["symbol"] for tik in tickers["ticker"]]
            raise ValueError("Error, no symbols found.")

        def download_history(
            self, symbol: str, since: str, timeframe: Timeframe, jobs: int = -1
        ) -> pd.DataFrame:
            """Download a set of historical data and save it.
            Args:
                symbol (str): The symbol for the data we want to extract. Defaults to "BTC-USDT".
                since (str): The initial date in format : dd-mm-yyyy.
                timeframe (Timeframe): The timeframe, it must be 1min, 3min, 5min, 15min, 1hour, 4hour, 1day... Defaults to '15min'.
                jobs (int, optional): The number of thread to parallelize the code. Defaults to -1.
            Raises:
                ValueError: Error in using parallelism.
            Returns:
                pd.DataFrame: The dataframe containing historical records.
            """
            try:
                start_timestamp = int(datetime.strptime(since, "%d-%m-%Y").timestamp())
            except:
                raise ValueError(
                    "Error, wrong date format, provide something in this format: dd-mm-yyyy"
                )
            end_timestamp = int(datetime.now().timestamp())

            assert (
                start_timestamp < end_timestamp
            ), "Error, the starting timestamp must be less than ending timestamp"

            timestamps = self.__construct_timestamp_list(
                start_timestamp, end_timestamp, timeframe
            )

            df = pd.DataFrame(
                columns=[
                    "Timestamp",
                    "Open",
                    "Close",
                    "High",
                    "Low",
                    "Amount",
                    "Volume",
                ],
                dtype=float,
            )
            if jobs == -1 or jobs == 1:
                for i in range(len(timestamps) - 1):
                    df = pd.concat(
                        [
                            df,
                            self.__get_data(
                                symbol, timestamps[i + 1], timestamps[i], timeframe
                            ),
                        ]
                    )

            elif jobs > 1 and jobs <= 25:
                with ThreadPoolExecutor(
                    max_workers=(len(timestamps) if len(timestamps) <= 25 else jobs)
                ) as executor:
                    processes = [
                        executor.submit(
                            self.__get_data,
                            symbol,
                            timestamps[i + 1],
                            timestamps[i],
                            timeframe,
                        )
                        for i in range(len(timestamps) - 1)
                    ]

                for task in as_completed(processes):
                    df = pd.concat([df, task.result()])
            else:
                raise ValueError(
                    "Error, jobs must be between 25 and 2 to use parallelism or -1 and 1 to do it sequentially."
                )

            df = df.sort_values(by="Timestamp")
            df.drop_duplicates(inplace=True)
            return df

    __kucoin_fetcher = KucoinDataFetcher()
    __base_dir = "./data/"
    __absolute_start_date = "01-01-2017"

    def get_list_of_symbols(
        self, base_currency: Optional[str] = None, quote_currency: Optional[str] = None
    ) -> list[str]:
        """Return the list of symbol in the database's file.
        Args:
            base_currency (Optional[str]): Filter by base currency.
            quote_currency (Optional[str]): Filter by quote currency.
        Raises:
            ValueError: If there is an error.
        Returns:
            list[str]: The list of symbols.
        """
        symbols = self.__open_symbols_list()
        if base_currency == None and quote_currency == None:
            return symbols
        elif base_currency == None and quote_currency is not None:
            return list(
                filter(
                    lambda symbol: symbol.split("-")[-1] == quote_currency.upper(),
                    symbols,
                )
            )
        elif quote_currency == None and base_currency is not None:
            return list(
                filter(
                    lambda symbol: symbol.split("-")[0] == base_currency.upper(),
                    symbols,
                )
            )
        elif quote_currency is not None and base_currency is not None:
            return list(
                filter(
                    lambda symbol: symbol.split("-")[0] == base_currency.upper()
                    and symbol.split("-")[-1] == quote_currency.upper(),
                    symbols,
                )
            )
        raise ValueError("Error, wrong parameters.")

    def get_history_of_symbol(self, symbol: str, timeframe: Timeframe) -> pd.DataFrame:
        """Function that get the history of a symbol.
        Args:
            symbol (str): The crypto symbol file.
            timeframe (Timeframe): The history's timeframe.
        Returns:
            pd.DataFrame: Records corresponding to the history.
        """
        assert (
            timeframe in TIMEFRAMES
        ), f"Error, timeframe must be in {','.join(TIMEFRAMES)}"

        assert (
            symbol in self.get_list_of_symbols()
        ), f"Error, wrong symbol {symbol}, provide something like 'BTC-USDT'."
        return self.__refresh_or_download(symbol, timeframe)

    @lru_cache(maxsize=32, typed=True)
    def refresh_list_of_symbols(self) -> None:
        """Function that refreshes the database's crypto listing."""
        self.__init_directories()
        with open(f"{self.__base_dir}list_available/crypto_available.json", "w") as f:
            json.dump({"listing": self.__kucoin_fetcher.get_symbols()}, f)

    def check_file_exists(self, symbol: str, timeframe: Timeframe) -> bool:
        """Verify if the history has already been fetched
        Args:
            symbol (str): The crypto symbol file.
            timeframe (Timeframe): The history's timeframe.
        Returns:
            bool: Whether or not the history is present.
        """
        return os.path.exists(f"{self.__base_dir}{timeframe}/{symbol}.csv")

    def __open_symbols_list(self) -> list[str]:
        """Private function that opens, reads and returns the list of symbols from the database's file.
        Returns:
            list[str]: The list of symbols.
        """
        with open(f"{self.__base_dir}list_available/crypto_available.json", "r") as f:
            return json.load(f)["listing"]

    def __refresh_or_download(self, symbol: str, timeframe: Timeframe) -> pd.DataFrame:
        """Private function used to refresh or download the history of a symbol.
        Args:
            symbol (str): The crypto symbol file.
            timeframe (Timeframe): The history's timeframe.
        Returns:
            pd.DataFrame: The complete history refreshed or not.
        """
        if self.check_file_exists(symbol, timeframe):
            # print("History present -> Checking for refresh...")
            data = pd.read_csv(
                f"{self.__base_dir}{timeframe}/{symbol}.csv",
                sep=",",
                dtype=float,
            )
            last_timestamp = data["Timestamp"].iloc[-1]

            since = datetime.fromtimestamp(last_timestamp).strftime("%d-%m-%Y")
            # print(f"Last timestamp : {last_timestamp} = {since}")
            if since == datetime.now().strftime("%d-%m-%Y"):
                return data
            new_data = self.__kucoin_fetcher.download_history(
                symbol, since, timeframe, 16
            )

            data = pd.concat([data, new_data]).drop_duplicates(ignore_index=True)
            data["Timestamp"] = data["Timestamp"].astype(int)
            data.to_csv(
                f"{self.__base_dir}{timeframe}/{symbol}.csv", sep=",", index=False
            )
            # print("Finished")
            return data
        else:  # Download full history
            # print("No history -> Downloading full history...")
            data = self.__kucoin_fetcher.download_history(
                symbol, self.__absolute_start_date, timeframe, 16
            )
            data["Timestamp"] = data["Timestamp"].astype(int)
            data.to_csv(
                f"{self.__base_dir}{timeframe}/{symbol}.csv", sep=",", index=False
            )
            # print("Finished")
            return data

    def __init_directories(self) -> None:
        """Private function that init all directories."""
        for tf in TIMEFRAMES:
            os.makedirs(f"{self.__base_dir}{tf}", exist_ok=True)
        os.makedirs(f"{self.__base_dir}list_available", exist_ok=True)