siruku6/fx_alarm_py

View on GitHub
src/candle_loader.py

Summary

Maintainability
A
0 mins
Test Coverage
D
69%
from datetime import datetime, timedelta
from typing import Any, Dict, Optional, Tuple

from aws_lambda_powertools import Logger
from oanda_accessor_pyv20 import OandaInterface
import pandas as pd

from src.candle_storage import FXBase
from src.clients.dynamodb_accessor import DynamodbAccessor
import src.lib.format_converter as converter
import src.lib.interface as i_face
from src.trader_config import TraderConfig

LOGGER = Logger()


class CandleLoader:
    def __init__(self, config: TraderConfig, interface: OandaInterface, days: int) -> None:
        self.config: TraderConfig = config
        self.interface: OandaInterface = interface
        self.need_request: bool = self.__select_need_request(operation=config.operation)
        self.days: int = days

    def run(self) -> Dict[str, Optional[str]]:
        candles: pd.DataFrame
        if self.need_request is False:
            candles = pd.read_csv("tests/fixtures/sample_candles.csv")
        elif self.config.operation in ("backtest", "forward_test"):
            candles = self.interface.load_candles_by_days(
                days=self.days,
                granularity=self.config.get_entry_rules("granularity"),  # type: ignore
            )["candles"]
        elif self.config.operation == "live":
            candles = self.interface.load_specify_length_candles(
                length=70, granularity=self.config.get_entry_rules("granularity")  # type: ignore
            )["candles"]
        else:
            raise ValueError(f"trader_config.operation is invalid!: {self.config.operation}")

        FXBase.set_candles(candles)
        if self.need_request is False:
            return {"info": None}

        latest_candle: Dict[str, Any] = self.interface.call_oanda("current_price")
        LOGGER.info({"latest_candle": latest_candle})
        self.__update_latest_candle(latest_candle)
        return {"info": None}

    def __select_need_request(self, operation: str) -> bool:
        need_request: bool = True
        if operation in ("backtest", "forward_test"):
            need_request = i_face.ask_true_or_false(
                msg="[Trader] Which do you use ?  [1]: current_candles, [2]: static_candles :"
            )
        elif operation == "unittest":
            need_request = False
        return need_request

    def load_long_span_candles(self) -> None:
        long_span_candles: pd.DataFrame
        if self.need_request is False:
            long_span_candles = pd.read_csv("tests/fixtures/sample_candles_h4.csv")
        else:
            long_span_candles = self.__load_long_chart(granularity="D")

        long_span_candles["time"] = pd.to_datetime(long_span_candles["time"])
        long_span_candles.set_index("time", inplace=True)
        FXBase.set_long_span_candles(long_span_candles)
        # long_span_candles.resample('4H').ffill() # upsamplingしようとしたがいらなかった。

    def __load_long_chart(self, granularity: Optional[str] = None) -> pd.DataFrame:
        if granularity is None:
            granularity: str = self.config.get_entry_rules("granularity")  # type: ignore
        if self.days is None:
            raise RuntimeError("'days' must be specified, but is None.")

        return self.interface.load_candles_by_days(days=self.days, granularity=granularity)[
            "candles"
        ]

    def __update_latest_candle(self, latest_candle: Dict[str, Any]) -> None:
        """
        Update the latest candle,
        if either end of the new latest candle is beyond either end of old latest candle
        """
        candle_dict = FXBase.get_candles().iloc[-1].to_dict()
        FXBase.replace_latest_price("close", latest_candle["close"])
        if candle_dict["high"] < latest_candle["high"]:
            FXBase.replace_latest_price("high", latest_candle["high"])
        if candle_dict["low"] > latest_candle["low"]:
            FXBase.replace_latest_price("low", latest_candle["low"])
        LOGGER.info(
            {
                "[Client] Last_H4": candle_dict,
                "Current_M1": latest_candle,
                "[Client] New_H4": FXBase.get_candles().iloc[-1].to_dict(),
            }
        )

    def load_candles_by_duration_for_hist(
        self, instrument: str, start: datetime, end: datetime, granularity: str
    ) -> pd.DataFrame:
        """Prepare candles with specifying the range of datetime"""
        # 1. query from DynamoDB
        table_name: str = "{}_CANDLES".format(granularity)
        dynamo = DynamodbAccessor(instrument, table_name=table_name)
        candles: pd.DataFrame = dynamo.list_candles(
            # INFO: now preparing a little wide range of candles
            #   because 休日やらタイミングやらで、start, endを割り込むデータしか取得できないことがあるため
            # TODO: not using `3`days, it seems better to alter number of days according to granurality
            (start - timedelta(days=3)).isoformat(),
            (end + timedelta(days=1)).isoformat(),
        )
        if self.interface.accessable is False:
            print("[Manager] Skipped requesting candles from Oanda")
            return candles

        # 2. detect period of missing candles
        missing_start, missing_end = self.__detect_missings(candles, start, end)
        if missing_end <= missing_start:
            return candles

        # 3. complement missing candles using API
        missed_candles: pd.DataFrame = self.interface.load_candles_by_duration(
            missing_start, missing_end, granularity=granularity
        )["candles"]
        # INFO: If it is closed time between start and end, `missed_candles` is gonna be [].
        #   Then, skip insert and union.
        if len(missed_candles) > 0:
            dynamo.batch_insert(items=missed_candles.copy())
            candles = self.interface._OandaInterface__union_candles_distinct(
                candles, missed_candles
            )

        return candles

    def __detect_missings(
        self, candles: pd.DataFrame, required_start: datetime, required_end: datetime
    ) -> Tuple[datetime, datetime]:
        """
        Parameters
        ----------
        candles : pandas.DataFrame
            Columns :

        required_start : datetime
            Example : datetime(2020, 1, 2, 12, 34)
        required_end : datetime

        Returns
        -------
        Array: [missing_start, missing_end]
            missing_start : datetime
            missing_end : datetime
        """
        if len(candles) == 0:
            return required_start, required_end

        # 1. DBから取得したデータの先頭と末尾の日時を取得
        stocked_first: datetime = converter.str_to_datetime(candles.iloc[0]["time"])
        stocked_last: datetime = converter.str_to_datetime(candles.iloc[-1]["time"])
        ealiest: datetime = min(required_start, required_end, stocked_first, stocked_last)
        latest: datetime = max(required_start, required_end, stocked_first, stocked_last)

        # 2. どの期間のデータが不足しているのかを判別
        missing_start: datetime = required_start if ealiest == required_start else stocked_last
        missing_end: datetime = required_end if latest == required_end else stocked_first
        return missing_start, missing_end