DogsTailFarmer/martin-binance

View on GitHub
martin_binance/client.py

Summary

Maintainability
A
0 mins
Test Coverage
"""
gRPC async client for exchanges-wrapper
"""
__author__ = "Jerry Fedorenko"
__copyright__ = "Copyright © 2021 Jerry Fedorenko aka VM"
__license__ = "MIT"
__version__ = "3.0.6"
__maintainer__ = "Jerry Fedorenko"
__contact__ = "https://github.com/DogsTailFarmer"

import asyncio
import random
import logging

# noinspection PyPackageRequirements
import grpclib.exceptions
import shortuuid

from exchanges_wrapper import martin as mr, Channel, Status, GRPCError

logger = logging.getLogger('logger.client')
stream_handler = logging.StreamHandler()
stream_handler.setFormatter(logging.Formatter(fmt="[%(asctime)s: %(levelname)s] %(message)s"))
stream_handler.setLevel(logging.WARNING)
logger.addHandler(stream_handler)


class Trade:
    def __init__(self, account_name, rate_limiter, symbol):
        self.channel = None
        self.stub = None
        self.account_name = account_name
        self.rate_limiter = rate_limiter
        self.symbol = symbol
        self.client = None
        self.wait_connection = False
        self.trade_id = shortuuid.uuid()

    async def get_client(self):
        if self.wait_connection:
            return False
        self.wait_connection = True
        client = None
        while client is None:
            self.channel = Channel('127.0.0.1', 50051)
            self.stub = mr.MartinStub(self.channel)
            try:
                client = await self.connect()
            except UserWarning as ex:
                logger.warning(ex)
                client = None
                self.channel.close()
                await asyncio.sleep(random.randint(5, 30))
            else:
                self.client = client
                self.wait_connection = False
                return True

    async def connect(self):
        try:
            _client = await self.stub.open_client_connection(
                mr.OpenClientConnectionRequest(
                    trade_id=self.trade_id,
                    account_name=self.account_name,
                    rate_limiter=self.rate_limiter,
                    symbol=self.symbol
                )
            )
        except asyncio.CancelledError:
            pass  # Task cancellation should not be logged as an error
        except ConnectionRefusedError as ex:
            raise UserWarning(f"{ex}, reconnect...") from None
        except GRPCError as ex:
            status_code = ex.status
            logger.warning(f"Exception on register client: {status_code.name}, {ex.message}")
            if status_code == Status.FAILED_PRECONDITION:
                raise SystemExit(1) from ex
            raise UserWarning
        else:
            logger.info(f"gRPC session started for client_id: {_client.client_id}, trade_id: {self.trade_id}")
            return _client

    async def send_request(self, _request, _request_type, **kwargs):
        if not self.client:
            raise UserWarning("Send gRPC request failed, not active client session")
        kwargs['client_id'] = self.client.client_id
        kwargs['trade_id'] = self.trade_id
        try:
            res = await _request(_request_type(**kwargs))
        except asyncio.CancelledError:
            pass  # Task cancellation should not be logged as an error
        except grpclib.exceptions.StreamTerminatedError:
            raise UserWarning("Have not connection to gRPC server")
        except ConnectionRefusedError:
            raise UserWarning("Connection to gRPC server broken")
        except GRPCError as ex:
            status_code = ex.status
            logger.debug(f"Send request {_request}: {status_code.name}, {ex.message}")
            if status_code == Status.UNAVAILABLE:
                self.client = None
                raise UserWarning("Wait connection to gRPC server") from None
            raise
        except Exception as ex:
            logger.error(f"Exception on send request {ex}")
        else:
            return res

    async def for_request(self, _request, _request_type, **kwargs):
        if not self.client:
            raise UserWarning("Start gRPC request loop failed, not active client session")
        kwargs['client_id'] = self.client.client_id
        kwargs['trade_id'] = self.trade_id
        try:
            async for res in _request(_request_type(**kwargs)):
                yield res
        except asyncio.CancelledError:
            pass  # Task cancellation should not be logged as an error
        except grpclib.exceptions.StreamTerminatedError:
            pass  # handling in send_request()
        except GRPCError as ex:
            status_code = ex.status
            logger.warning(f"Exception on WSS loop: {status_code.name}, {ex.message}")
            raise
        except Exception as ex:
            logger.debug(f"for_request: {ex}")