DogsTailFarmer/martin-binance

View on GitHub
martin_binance/service/funds_rate_exporter.py

Summary

Maintainability
A
2 hrs
Test Coverage
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
####################################################################
# Config for prometheus_client
# See README.md for detail
####################################################################
__author__ = "Jerry Fedorenko"
__copyright__ = "Copyright © 2021 Jerry Fedorenko aka VM"
__license__ = "MIT"
__version__ = "3.0.0"
__maintainer__ = "Jerry Fedorenko"
__contact__ = 'https://github.com/DogsTailFarmer'

import os
import time
import sqlite3
import psutil
from requests import Session
import toml
import platform
from prometheus_client import start_http_server, Gauge

from martin_binance import Path, CONFIG_FILE, DB_FILE
from exchanges_wrapper import CONFIG_FILE as SRV_CONFIG_FILE

# region Import parameters

if not CONFIG_FILE.exists():
    if platform.system() == 'Darwin':
        user = (lambda: os.environ["USERNAME"] if "C:" in os.getcwd() else os.environ["USER"])()
        WORK_PATH = Path("Users", user, ".margin")
    else:
        WORK_PATH = Path().resolve()
    CONFIG_FILE = Path(WORK_PATH, "ms_cfg.toml")
    SRV_CONFIG_FILE = Path(WORK_PATH, "exch_srv_cfg.toml")
    DB_FILE = Path(WORK_PATH, "funds_rate.db")

config = toml.load(str(CONFIG_FILE)).get('Exporter')
accounts = toml.load(str(SRV_CONFIG_FILE)).get('accounts')

names = {acc['name']: acc['exchange'] for acc in accounts}
# external port for prometheus
PORT = config.get('port')

# sec delay for .db polling
SLEEP_TIME_S = config.get('sleep_time_s')

# Server name
VPS_NAME = config.get('vps_name')

# CoinMarketCap
URL = config.get('url')
API = config.get('api')
request_delay = 60 / config.get('rate_limit')

#  endregion

CURRENCY_RATE_LAST_TIME = int(time.time())

# region Metric declare
STATUS_ALARM = Gauge("margin_alarm", "1 when not order", ['exchange', 'pair', 'vps_name'])
REQUEST_DELAY_G = Gauge("request_delay_g", "request delay in sec", ['vps_name'])

SUM_F_PROFIT = Gauge("margin_f_profit", "first profit", ['exchange', 'pair', 'vps_name'])
SUM_S_PROFIT = Gauge("margin_s_profit", "second profit", ['exchange', 'pair', 'vps_name'])
LAST_RATE = Gauge("margin_last_rate", "pair last rate", ['exchange', 'pair', 'vps_name'])
LAST_RATE_USD = Gauge("margin_last_rate_usd", "last rate second coin to USD", ['exchange', 'pair', 'vps_name'])
SUM_PROFIT = Gauge("margin_sum_profit", "sum profit on last rate", ['exchange', 'pair', 'vps_name'])
SUM_PROFIT_USD = Gauge("margin_sum_profit_usd", "sum profit on last rate on USD", ['exchange', 'pair', 'vps_name'])

CYCLE_COUNT = Gauge("margin_cycle_count", "cycle count", ['exchange', 'pair', 'vps_name'])
BUY_COUNT = Gauge("margin_buy_count", "cycle buy count", ['exchange', 'pair', 'vps_name'])
SELL_COUNT = Gauge("margin_sell_count", "cycle sell count", ['exchange', 'pair', 'vps_name'])
BUY_TIME = Gauge("margin_buy_time", "cycle buy time", ['exchange', 'pair', 'vps_name'])
SELL_TIME = Gauge("margin_sell_time", "cycle sell time", ['exchange', 'pair', 'vps_name'])

BUY_INTEREST = Gauge("margin_buy_interest", "sum buy interest", ['exchange', 'pair', 'vps_name'])
SELL_INTEREST = Gauge("margin_sell_interest", "sum sell interest", ['exchange', 'pair', 'vps_name'])

F_BALANCE = Gauge("margin_f_balance", "first balance amount", ['exchange', 'pair', 'vps_name'])
S_BALANCE = Gauge("margin_s_balance", "second balance amount", ['exchange', 'pair', 'vps_name'])
TOTAL_BALANCE = Gauge("margin_balance", "total balance amount by last rate", ['exchange', 'pair', 'vps_name'])

BALANCE_USD = Gauge("margin_balance_usd", "balance amount in USD", ['name', 'exchange', 'currency', 'vps_name'])

# VPS control
VPS_CPU = Gauge("margin_vps_cpu", "average cpu load", ['vps_name'])
VPS_MEMORY = Gauge("margin_vps_memory", "average memory use in %", ['vps_name'])

# Cycle parameters
CYCLE_BUY = Gauge("margin_cycle_buy", "cycle buy", ['exchange', 'pair', 'vps_name'])
OVER_PRICE = Gauge("margin_over_price", "over price", ['exchange', 'pair', 'vps_name'])
F_DEPO = Gauge("margin_f_depo", "first depo", ['exchange', 'pair', 'vps_name'])
S_DEPO = Gauge("margin_s_depo", "second depo", ['exchange', 'pair', 'vps_name'])

''' Cycle parameters for future use
PRICE_SHIFT = Gauge("margin_price_shift", "price shift", ['exchange', 'pair'])
PROFIT = Gauge("margin_profit", "profit", ['exchange', 'pair'])
ORDER_Q = Gauge("margin_order_q", "order_q", ['exchange', 'pair'])
MARTIN = Gauge("margin_martin", "martin", ['exchange', 'pair'])
LINEAR_GRID_K = Gauge("margin_linear_grid_k", "linear_grid_k", ['exchange', 'pair'])
ADAPTIVE_TRADE_CONDITION = Gauge("margin_adaptive_trade_condition", "adaptive_trade_condition", ['exchange', 'pair'])
KB = Gauge("margin_kb", "bollinger band k bottom", ['exchange', 'pair'])
KT = Gauge("margin_kt", "bollinger band k top", ['exchange', 'pair'])
'''
# endregion


def get_rate(_currency_rate) -> {}:
    global request_delay
    replace = {
        'UST': 'USDT',
        'IOT': 'MIOTA',
        'LUNA': 'LUNC',
        'LUNA2': 'LUNA',
        'TESTUSDT': 'USDT',
        'TESTBTC': 'BTC'
    }
    headers = {'Accepts': 'application/json', 'X-CMC_PRO_API_KEY': API}
    session = Session()
    session.headers.update(headers)

    for currency in _currency_rate:
        _currency = replace.get(currency, currency)
        price = -1
        parameters = {'amount': 1, 'symbol': 'USD', 'convert': _currency}
        try:
            response = session.get(URL, params=parameters)
        except Exception as er:
            print(er)
        else:
            if response.status_code == 429:
                time.sleep(61)
                request_delay *= 1.5
                try:
                    response = session.get(URL, params=parameters)
                except Exception as er:
                    print(er)
            if response.status_code == 200:
                data = response.json()
                price = data['data'][0]['quote'][_currency]['price'] or -1
        _currency_rate[currency] = price
        time.sleep(request_delay)
    return _currency_rate


def db_handler(sql_conn, _currency_rate, currency_rate_last_time):
    global request_delay
    cursor = sql_conn.cursor()
    # Aggregate score for pair on exchange
    cursor.execute('SELECT tex.name, tf.id_exchange,\
                    tf.f_currency, tf.s_currency,\
                    count(*) as cycle_count,\
                    sum(f_profit) as sum_f_profit,\
                    sum(s_profit) as sum_s_profit\
                    FROM t_funds as tf LEFT JOIN t_exchange tex USING(id_exchange)\
                    GROUP BY tex.name, tf.id_exchange, tf.f_currency, tf.s_currency')
    records = cursor.fetchall()
    # Get assets
    cursor.execute('SELECT tex.id_exchange, tex.name, ta.currency, ta.value\
                    FROM t_asset as ta LEFT JOIN t_exchange tex USING(id_exchange)\
                    WHERE ta.value > 0')
    assets = cursor.fetchall()
    # Create dict of used currencies
    for asset in assets:
        _currency_rate.setdefault(asset[2])
    for row in records:
        _currency_rate.setdefault(str(row[2]))
        _currency_rate.setdefault(str(row[3]))
    # Get currency rate for all currency from CoinMarketCap in relation to USD
    time_for_refresh = time.time() - currency_rate_last_time > 86400
    if None in _currency_rate.values() or time_for_refresh:
        get_rate(_currency_rate)
        currency_rate_last_time = int(time.time())
        REQUEST_DELAY_G.labels(VPS_NAME).set(request_delay)
        if request_delay > 60:
            request_delay = 60 / config.get('rate_limit')
    #
    F_BALANCE.clear()
    S_BALANCE.clear()
    TOTAL_BALANCE.clear()
    BALANCE_USD.clear()
    CYCLE_BUY.clear()
    F_DEPO.clear()
    S_DEPO.clear()
    OVER_PRICE.clear()
    #
    for row in records:
        # print(f"row: {row}")
        exchange = str(row[0])
        id_exchange = int(row[1])
        f_currency = str(row[2])
        s_currency = str(row[3])
        pair = f"{f_currency}/{s_currency}"
        cycle_count = int(row[4])
        CYCLE_COUNT.labels(exchange, pair, VPS_NAME).set(cycle_count)
        sum_f_profit = float(row[5])
        SUM_F_PROFIT.labels(exchange, pair, VPS_NAME).set(sum_f_profit)
        sum_s_profit = float(row[6])
        SUM_S_PROFIT.labels(exchange, pair, VPS_NAME).set(sum_s_profit)
        # Alarm
        cursor.execute('SELECT order_buy, order_sell\
                        FROM t_orders\
                        WHERE id_exchange=:id_exchange\
                        AND f_currency=:f_currency\
                        AND s_currency=:s_currency',
                       {'id_exchange': id_exchange, 'f_currency': f_currency, 's_currency': s_currency})
        status_alarm = cursor.fetchone()
        alarm = 0
        if status_alarm:
            order_buy = int(status_alarm[0])
            order_sell = int(status_alarm[1])
            alarm = 0 if order_buy and order_sell else 1
        STATUS_ALARM.labels(exchange, pair, VPS_NAME).set(alarm)
        # Last rate
        cursor.execute('SELECT rate\
                        FROM t_funds\
                        WHERE id_exchange=:id_exchange\
                        AND f_currency=:f_currency\
                        AND s_currency=:s_currency\
                        ORDER BY id DESC LIMIT 1',
                       {'id_exchange': id_exchange, 'f_currency': f_currency, 's_currency': s_currency})
        if last_rate_row := cursor.fetchone():
            last_rate = float(last_rate_row[0])
            LAST_RATE.labels(exchange, pair, VPS_NAME).set(last_rate)
        else:
            last_rate = 0.0
        # Sum profit
        sum_profit = sum_f_profit * last_rate + sum_s_profit
        SUM_PROFIT.labels(exchange, pair, VPS_NAME).set(sum_profit)
        # Convert sum profit to USD by last rate
        sum_profit_usd = -1
        if _currency_rate.get(s_currency):
            try:
                sum_profit_usd = sum_profit / _currency_rate[s_currency]
                LAST_RATE_USD.labels(exchange, pair, VPS_NAME).set(_currency_rate[s_currency])
            except ZeroDivisionError:
                sum_profit_usd = -1
        SUM_PROFIT_USD.labels(exchange, pair, VPS_NAME).set(sum_profit_usd)
        # Sum interest income and cycle count, calculated by each buy and sell cycle
        cursor.execute('SELECT count(*), sum(100 * s_profit / s_depo), sum(cycle_time)\
                        FROM t_funds\
                        WHERE id_exchange=:id_exchange\
                        AND f_currency=:f_currency\
                        AND s_currency=:s_currency\
                        AND cycle_buy = 1',
                       {'id_exchange': id_exchange, 'f_currency': f_currency, 's_currency': s_currency})
        cycle_buy_row = cursor.fetchone()

        cycle_buy_count = int(cycle_buy_row[0]) if cycle_buy_row[0] else 0
        cycle_buy_interest = float(cycle_buy_row[1]) if cycle_buy_row[1] else 0.0
        cycle_buy_time = float(cycle_buy_row[2]) if cycle_buy_row[2] else 0.0

        cursor.execute('SELECT count(*), sum(100 * f_profit / f_depo), sum(cycle_time)\
                        FROM t_funds\
                        WHERE id_exchange=:id_exchange\
                        AND f_currency=:f_currency\
                        AND s_currency=:s_currency\
                        AND cycle_buy = 0',
                       {'id_exchange': id_exchange, 'f_currency': f_currency, 's_currency': s_currency})
        cycle_sell_row = cursor.fetchone()

        cycle_sell_count = int(cycle_sell_row[0]) if cycle_sell_row[0] else 0
        cycle_sell_interest = float(cycle_sell_row[1]) if cycle_sell_row[1] else 0.0
        cycle_sell_time = float(cycle_sell_row[2]) if cycle_sell_row[2] else 0.0

        BUY_COUNT.labels(exchange, pair, VPS_NAME).set(cycle_buy_count)
        BUY_TIME.labels(exchange, pair, VPS_NAME).set(cycle_buy_time)
        BUY_INTEREST.labels(exchange, pair, VPS_NAME).set(cycle_buy_interest)

        SELL_COUNT.labels(exchange, pair, VPS_NAME).set(cycle_sell_count)
        SELL_TIME.labels(exchange, pair, VPS_NAME).set(cycle_sell_time)
        SELL_INTEREST.labels(exchange, pair, VPS_NAME).set(cycle_sell_interest)

        # Balance amount
        cursor.execute('SELECT f_balance, s_balance, cycle_buy, f_depo, s_depo, over_price\
                        FROM t_funds\
                        WHERE id_exchange=:id_exchange\
                        AND f_currency=:f_currency\
                        AND s_currency=:s_currency\
                        ORDER BY id DESC LIMIT 1',
                       {'id_exchange': id_exchange, 'f_currency': f_currency, 's_currency': s_currency})
        if balance_row := cursor.fetchone():
            f_balance = balance_row[0]
            s_balance = balance_row[1]
            balance = f_balance * last_rate + s_balance
            F_BALANCE.labels(exchange, pair, VPS_NAME).set(f_balance)
            S_BALANCE.labels(exchange, pair, VPS_NAME).set(s_balance)
            TOTAL_BALANCE.labels(exchange, pair, VPS_NAME).set(balance)
            # Cycle parameters
            CYCLE_BUY.labels(exchange, pair, VPS_NAME).set(balance_row[2])
            F_DEPO.labels(exchange, pair, VPS_NAME).set(balance_row[3])
            S_DEPO.labels(exchange, pair, VPS_NAME).set(balance_row[4])
            OVER_PRICE.labels(exchange, pair, VPS_NAME).set(balance_row[5])

    for asset in assets:
        if _rate := _currency_rate.get(asset[2]):
            try:
                usd_amount = asset[3] / _rate
            except ZeroDivisionError:
                usd_amount = -1
            if usd_amount >= 1.0:
                BALANCE_USD.labels(names.get(asset[1]), asset[1], asset[2], VPS_NAME).set(usd_amount)

    cursor.close()
    return currency_rate_last_time


if __name__ == '__main__':
    # Start up the server to expose the metrics.
    currency_rate = {}
    start_http_server(PORT)
    sqlite_connection = None
    try:
        sqlite_connection = sqlite3.connect(DB_FILE, check_same_thread=False, timeout=10)
    except sqlite3.Error as error:
        print("SQLite error:", error)
    while True:
        try:
            CURRENCY_RATE_LAST_TIME = db_handler(sqlite_connection, currency_rate, CURRENCY_RATE_LAST_TIME)
        except sqlite3.Error as error:
            print("DB operational error:", error)
        VPS_CPU.labels(VPS_NAME).set(100 * psutil.getloadavg()[0] / psutil.cpu_count())
        #
        memory = psutil.virtual_memory()
        swap = psutil.swap_memory()
        total_used_percent = 100 * float(swap.used + memory.used) / (swap.total + memory.total)
        VPS_MEMORY.labels(VPS_NAME).set(total_used_percent)
        #
        time.sleep(SLEEP_TIME_S)