xeroc/python-graphenelib

View on GitHub
grapheneapi/api.py

Summary

Maintainability
A
3 hrs
Test Coverage
# -*- coding: utf-8 -*-
import logging
from collections import Counter
from itertools import cycle
from time import sleep
from .exceptions import RPCError, NumRetriesReached

from .websocket import Websocket
from .http import Http

log = logging.getLogger(__name__)


class Api:
    def __init__(self, urls, user=None, password=None, connect=True, **kwargs):

        # Some internal variables
        self._connections = dict()
        self._url_counter = Counter()

        # Let's store user and password in kwargs as well
        self.user = user
        self.password = password
        kwargs.update(dict(user=user, password=password))
        self._kwargs = kwargs

        # How often do we accept retries?
        self.num_retries = kwargs.pop("num_retries", 1)

        if not isinstance(urls, list):
            urls = [urls]

        for url in urls:
            self._url_counter[url] = 0

        self.url = urls[0]
        self._active_url = None

        # Let's also be able to deal with infinite connection
        self.urls = cycle(urls)
        self._cnt_retries = 0
        self._network = None

        # Connect!
        if connect:
            self.connect()
            self._network = self.get_network()

    # Get chain parameters
    @property
    def chain_params(self):
        if self._network is None:
            self._network = self.get_network()
        return self._network

    def get_network(self):
        return self.get_chain_properties()

    def updated_connection(self):
        if self.url[:2] == "ws":
            return Websocket(self.url, **self._kwargs)
        elif self.url[:4] == "http":
            return Http(self.url, **self._kwargs)
        else:
            raise ValueError("Only support http(s) and ws(s) connections!")

    @property
    def connection(self):
        if self._active_url != self.url:
            log.debug(
                "Updating connection from {} to {}".format(self._active_url, self.url)
            )
            self._active_connection = self.updated_connection()
            self._active_url = self.url
        return self._active_connection

    def connect(self):
        try:
            self.connection.connect()
        except Exception as e:
            log.warning(str(e))
            self.error_url()
            self.next()
        self.register_apis()

    def find_next(self):
        """Find the next url in the list"""
        if int(self.num_retries) < 0:  # pragma: no cover
            self._cnt_retries += 1
            sleeptime = (self._cnt_retries - 1) * 2 if self._cnt_retries < 10 else 10
            if sleeptime:
                log.warning(
                    "Lost connection to node during rpcexec(): %s (%d/%d) "
                    % (self.url, self._cnt_retries, self.num_retries)
                    + "Retrying in %d seconds" % sleeptime
                )
                sleep(sleeptime)
            return next(self.urls)

        urls = [
            k
            for k, v in self._url_counter.items()
            if (
                # Only provide URLS if num_retries is bigger equal 0,
                # i.e. we want to do reconnects at all
                int(self.num_retries) >= 0
                # the counter for this host/endpoint should be smaller than
                # num_retries
                and v <= self.num_retries
                # let's not retry with the same URL *if* we have others
                # available
                and (k != self.url or len(self._url_counter) == 1)
            )
        ]
        if not len(urls):
            raise NumRetriesReached
        url = urls[0]
        return url

    def reset_counter(self):
        """reset the failed connection counters"""
        self._cnt_retries = 0
        for i in self._url_counter:
            self._url_counter[i] = 0

    def error_url(self):  # pragma: no cover
        if self.url in self._url_counter:
            self._url_counter[self.url] += 1
        else:
            self._url_counter[self.url] = 1

    def next(self):
        self.connection.disconnect()
        self.url = self.find_next()
        self.connect()

    def post_process_exception(self, exception):
        raise exception

    @property
    def api_id(self):
        """This allows to list api_ids, if they have been registered through
        api_register() -- LEGACY

        In previous API version, one would connect and register to APIs
        like this

        .. code-block:: python

            self.api_id["database"] = self.database(api_id=1)
            self.api_id["history"] = self.history(api_id=1)
            self.api_id["network_broadcast"] = self.network_broadcast(
                api_id=1)

        """
        return self.connection.api_id

    def register_apis(self):  # pragma: no cover
        """This method is called right after connection and has previously
        been used to register to different APIs within the backend that are
        considered default. The requirement to register to APIs has been
        removed in some systems.
        """
        pass

    def __getattr__(self, name):
        """Proxies RPC calls to actual Websocket or Http instance.

        Connection-related errors catched here and handled.
        """

        def func(*args, **kwargs):
            while True:
                try:
                    # RPC method called on actual Websocket or Http class
                    func = self.connection.__getattr__(name)
                    r = func(*args, **kwargs)
                    self.reset_counter()
                    break
                except KeyboardInterrupt:  # pragma: no cover
                    raise
                except RPCError as e:  # pragma: no cover
                    """When the backend actual returns an error"""
                    self.post_process_exception(e)
                    # the above line should raise. Let's be sure to at least
                    # break
                    break  # pragma: no cover
                except IOError:  # pragma: no cover
                    import traceback

                    log.debug(traceback.format_exc())
                    log.warning("Connection was closed remotely.")
                    log.warning("Reconnecting ...")
                    self.error_url()
                    self.next()
                except Exception as e:  # pragma: no cover
                    """When something fails talking to the backend"""
                    import traceback

                    log.debug(traceback.format_exc())
                    log.warning(str(e))
                    log.warning("Reconnecting ...")
                    self.error_url()
                    self.next()

            return r

        return func