grapheneapi/rpc.py
# -*- coding: utf-8 -*-
import json
import time
import logging
import requests
import urllib
from .exceptions import RPCError, NumRetriesReached
log = logging.getLogger(__name__)
class Rpc:
"""This class allows to call API methods synchronously, without
callbacks.
:param str url: A single REST endpoint URL
:param int num_retries: Try x times to num_retries to a node on
disconnect, -1 for indefinitely
:param str proxy: Proxy URL (e.g. socks5://localhost:9050),
None by default.
Usage:
.. code-block:: python
ws = GrapheneHTTPRPC("https://api.node.com")
print(ws.get_account_count())
"""
def __init__(self, url, **kwargs):
self.api_id = {}
self._request_id = 0
self.setup_proxy(kwargs)
self.num_retries = kwargs.get("num_retries", 1)
self.user = kwargs.get("user")
self.password = kwargs.get("password")
self.url = url
def setup_proxy(self, options):
proxy_url = options.pop("proxy", None)
if proxy_url: # pragma: no cover
url = urllib.parse.urlparse(proxy_url)
self.proxy_host = url.hostname
self.proxy_port = url.port
self.proxy_type = url.scheme.lower()
self.proxy_user = url.username
self.proxy_pass = url.password
self.proxy_rdns = True
if not (url.scheme.endswith("h")):
self.proxy_rdns = False
else:
self.proxy_type = self.proxy_type[0 : len(self.proxy_type) - 1]
else:
# Defaults (tweakable)
self.proxy_host = options.pop("proxy_host", None)
self.proxy_port = options.pop("proxy_port", 80)
self.proxy_type = options.pop("proxy_type", "http")
self.proxy_user = options.pop("proxy_user", None)
self.proxy_pass = options.pop("proxy_pass", None)
self.proxy_rdns = False
if self.proxy_host:
log.info(
"Using proxy %s:%d %s"
% (self.proxy_host, self.proxy_port, self.proxy_type)
)
def get_proxy_url(self): # pragma: no cover
if not self.proxy_host:
return None
auth = ""
if self.proxy_user:
auth = "%s:%s@" % (self.proxy_user, self.proxy_pass)
url = (
self.proxy_type
+ "://"
+ auth
+ ("%s:%d" % (self.proxy_host, self.proxy_port))
)
return url
def get_request_id(self):
self._request_id += 1
return self._request_id
def connect(self):
pass
def disconnect(self):
pass
def parse_response(self, query, log_on_debug=True):
ret = {}
if isinstance(query, dict):
ret = query
else:
try:
ret = json.loads(query, strict=False)
except ValueError: # pragma: no cover pragma: no branch
raise ValueError("Client returned invalid format. Expected JSON!")
# log_on_debug allows to disable logging here even if debug logging is configured
if log_on_debug:
log.debug(json.dumps(query))
if "error" in ret: # pragma: no cover
if "detail" in ret["error"]:
raise RPCError(ret["error"]["detail"])
else:
if ret["error"]["message"] == "Execution error":
text = ret["error"]["data"]["stack"][0]["format"]
data = ret["error"]["data"]["stack"][0]["data"]
text = text.replace("${", "{")
raise RPCError(text.format(**data))
else:
raise RPCError(ret["error"]["message"])
else:
return ret["result"]
def rpcexec(*args, **kwargs):
raise Exception("Do not use RPC class directly, but Http or Websocket")
def __getattr__(self, name):
"""Map all methods to RPC calls and pass through the arguments"""
def method(*args, **kwargs):
# Sepcify the api to talk to
if "api_id" not in kwargs: # pragma: no cover
if "api" in kwargs:
if kwargs["api"] in self.api_id and self.api_id[kwargs["api"]]:
api_id = self.api_id[kwargs["api"]]
else:
api_id = kwargs["api"]
else:
api_id = 0
else: # pragma: no cover
api_id = kwargs["api_id"]
# let's be able to define the num_retries per query
self.num_retries = kwargs.get("num_retries", self.num_retries)
query = {
"method": "call",
"params": [api_id, name, list(args)],
"jsonrpc": "2.0",
"id": self.get_request_id(),
}
log.debug(json.dumps(query))
response = self.rpcexec(query)
log.debug(json.dumps(response))
message = self.parse_response(response)
return message
return method