beem/utils.py
# -*- coding: utf-8 -*-
import re
import json
import time as timenow
import math
from datetime import datetime, tzinfo, timedelta, date, time
import pytz
import difflib
from ruamel.yaml import YAML
import difflib
import secrets
import string
from beemgraphenebase.account import PasswordKey
import ast
import os
timeFormat = "%Y-%m-%dT%H:%M:%S"
# https://github.com/matiasb/python-unidiff/blob/master/unidiff/constants.py#L37
# @@ (source offset, length) (target offset, length) @@ (section header)
RE_HUNK_HEADER = re.compile(
r"^@@ -(\d+)(?:,(\d+))? \+(\d+)(?:,(\d+))?\ @@[ ]?(.*)$", flags=re.MULTILINE
)
def formatTime(t):
""" Properly Format Time for permlinks
"""
if isinstance(t, float):
return datetime.utcfromtimestamp(t).strftime("%Y%m%dt%H%M%S%Z")
if isinstance(t, (datetime, date, time)):
return t.strftime("%Y%m%dt%H%M%S%Z")
def addTzInfo(t, timezone="UTC"):
"""Returns a datetime object with tzinfo added"""
if t and isinstance(t, (datetime, date, time)) and t.tzinfo is None:
utc = pytz.timezone(timezone)
t = utc.localize(t)
return t
def formatTimeString(t):
""" Properly Format Time for permlinks
"""
if isinstance(t, (datetime, date, time)):
return t.strftime(timeFormat)
return addTzInfo(datetime.strptime(t, timeFormat))
def formatToTimeStamp(t):
""" Returns a timestamp integer
:param datetime t: datetime object
:return: Timestamp as integer
"""
if isinstance(t, (datetime, date, time)):
t = addTzInfo(t)
else:
t = formatTimeString(t)
epoch = addTzInfo(datetime(1970, 1, 1))
return int((t - epoch).total_seconds())
def formatTimeFromNow(secs=0):
""" Properly Format Time that is `x` seconds in the future
:param int secs: Seconds to go in the future (`x>0`) or the
past (`x<0`)
:return: Properly formated time for Graphene (`%Y-%m-%dT%H:%M:%S`)
:rtype: str
"""
return datetime.utcfromtimestamp(timenow.time() + int(secs)).strftime(timeFormat)
def formatTimedelta(td):
"""Format timedelta to String
"""
if not isinstance(td, timedelta):
return ""
days, seconds = td.days, td.seconds
hours = days * 24 + seconds // 3600
minutes = (seconds % 3600) // 60
seconds = seconds % 60
return "%d:%s:%s" % (hours, str(minutes).zfill(2), str(seconds).zfill(2))
def parse_time(block_time):
"""Take a string representation of time from the blockchain, and parse it
into datetime object.
"""
utc = pytz.timezone("UTC")
return utc.localize(datetime.strptime(block_time, timeFormat))
def assets_from_string(text):
"""Correctly split a string containing an asset pair.
Splits the string into two assets with the separator being on of the
following: ``:``, ``/``, or ``-``.
"""
return re.split(r"[\-:\/]", text)
def sanitize_permlink(permlink):
permlink = permlink.strip()
permlink = re.sub(r"_|\s|\.", "-", permlink)
permlink = re.sub(r"[^\w-]", "", permlink)
permlink = re.sub(r"[^a-zA-Z0-9-]", "", permlink)
permlink = permlink.lower()
return permlink
def derive_permlink(title, parent_permlink=None, parent_author=None,
max_permlink_length=256, with_suffix=True):
"""Derive a permlink from a comment title (for root level
comments) or the parent permlink and optionally the parent
author (for replies).
"""
suffix = "-" + formatTime(datetime.utcnow()) + "z"
if parent_permlink and parent_author:
prefix = "re-" + sanitize_permlink(parent_author) + "-"
if with_suffix:
rem_chars = max_permlink_length - len(suffix) - len(prefix)
else:
rem_chars = max_permlink_length - len(prefix)
body = sanitize_permlink(parent_permlink)[:rem_chars]
if with_suffix:
return prefix + body + suffix
else:
return prefix + body
elif parent_permlink:
prefix = "re-"
if with_suffix:
rem_chars = max_permlink_length - len(suffix) - len(prefix)
else:
rem_chars = max_permlink_length - len(prefix)
body = sanitize_permlink(parent_permlink)[:rem_chars]
if with_suffix:
return prefix + body + suffix
else:
return prefix + body
else:
if with_suffix:
rem_chars = max_permlink_length - len(suffix)
else:
rem_chars = max_permlink_length
body = sanitize_permlink(title)[:rem_chars]
if len(body) == 0: # empty title or title consisted of only special chars
return suffix[1:] # use timestamp only, strip leading "-"
if with_suffix:
return body + suffix
else:
return body
def resolve_authorperm(identifier):
"""Correctly split a string containing an authorperm.
Splits the string into author and permlink with the
following separator: ``/``.
Examples:
.. code-block:: python
>>> from beem.utils import resolve_authorperm
>>> author, permlink = resolve_authorperm('https://d.tube/#!/v/pottlund/m5cqkd1a')
>>> author, permlink = resolve_authorperm("https://steemit.com/witness-category/@gtg/24lfrm-gtg-witness-log")
>>> author, permlink = resolve_authorperm("@gtg/24lfrm-gtg-witness-log")
>>> author, permlink = resolve_authorperm("https://busy.org/@gtg/24lfrm-gtg-witness-log")
"""
# without any http(s)
match = re.match(r"@?([\w\-\.]*)/([\w\-]*)", identifier)
if hasattr(match, "group"):
return match.group(1), match.group(2)
# dtube url
match = re.match(r"([\w\-\.]+[^#?\s]+)/#!/v/?([\w\-\.]*)/([\w\-]*)", identifier)
if hasattr(match, "group"):
return match.group(2), match.group(3)
# url
match = re.match(r"([\w\-\.]+[^#?\s]+)/@?([\w\-\.]*)/([\w\-]*)", identifier)
if not hasattr(match, "group"):
raise ValueError("Invalid identifier")
return match.group(2), match.group(3)
def construct_authorperm(*args):
""" Create a post identifier from comment/post object or arguments.
Examples:
.. code-block:: python
>>> from beem.utils import construct_authorperm
>>> print(construct_authorperm('username', 'permlink'))
@username/permlink
>>> print(construct_authorperm({'author': 'username', 'permlink': 'permlink'}))
@username/permlink
"""
username_prefix = "@"
if len(args) == 1:
op = args[0]
author, permlink = op["author"], op["permlink"]
elif len(args) == 2:
author, permlink = args
else:
raise ValueError("construct_identifier() received unparsable arguments")
fields = dict(prefix=username_prefix, author=author, permlink=permlink)
return "{prefix}{author}/{permlink}".format(**fields)
def resolve_root_identifier(url):
m = re.match(r"/([^/]*)/@([^/]*)/([^#]*).*", url)
if not m:
return "", ""
else:
category = m.group(1)
author = m.group(2)
permlink = m.group(3)
return construct_authorperm(author, permlink), category
def resolve_authorpermvoter(identifier):
"""Correctly split a string containing an authorpermvoter.
Splits the string into author and permlink with the
following separator: ``/`` and ``|``.
"""
pos = identifier.find("|")
if pos < 0:
raise ValueError("Invalid identifier")
[author, permlink] = resolve_authorperm(identifier[:pos])
return author, permlink, identifier[pos + 1 :]
def construct_authorpermvoter(*args):
""" Create a vote identifier from vote object or arguments.
Examples:
.. code-block:: python
>>> from beem.utils import construct_authorpermvoter
>>> print(construct_authorpermvoter('username', 'permlink', 'voter'))
@username/permlink|voter
>>> print(construct_authorpermvoter({'author': 'username', 'permlink': 'permlink', 'voter': 'voter'}))
@username/permlink|voter
"""
username_prefix = "@"
if len(args) == 1:
op = args[0]
if "authorperm" in op:
authorperm, voter = op["authorperm"], op["voter"]
[author, permlink] = resolve_authorperm(authorperm)
else:
author, permlink, voter = op["author"], op["permlink"], op["voter"]
elif len(args) == 2:
authorperm, voter = args
[author, permlink] = resolve_authorperm(authorperm)
elif len(args) == 3:
author, permlink, voter = args
else:
raise ValueError("construct_identifier() received unparsable arguments")
fields = dict(prefix=username_prefix, author=author, permlink=permlink, voter=voter)
return "{prefix}{author}/{permlink}|{voter}".format(**fields)
def reputation_to_score(rep):
"""Converts the account reputation value into the reputation score"""
if isinstance(rep, str):
rep = int(rep)
if rep == 0:
return 25.0
score = max([math.log10(abs(rep)) - 9, 0])
if rep < 0:
score *= -1
score = (score * 9.0) + 25.0
return score
def remove_from_dict(obj, keys=list(), keep_keys=True):
""" Prune a class or dictionary of all but keys (keep_keys=True).
Prune a class or dictionary of specified keys.(keep_keys=False).
"""
if type(obj) == dict:
items = list(obj.items())
elif isinstance(obj, dict):
items = list(obj.items())
else:
items = list(obj.__dict__.items())
if keep_keys:
return {k: v for k, v in items if k in keys}
else:
return {k: v for k, v in items if k not in keys}
def make_patch(a, b):
import diff_match_patch as dmp_module
dmp = dmp_module.diff_match_patch()
patch = dmp.patch_make(a, b)
patch_text = dmp.patch_toText(patch)
return patch_text
def findall_patch_hunks(body=None):
return RE_HUNK_HEADER.findall(body)
def derive_beneficiaries(beneficiaries):
beneficiaries_list = []
beneficiaries_accounts = []
beneficiaries_sum = 0
if not isinstance(beneficiaries, list):
beneficiaries = beneficiaries.split(",")
for w in beneficiaries:
account_name = w.strip().split(":")[0]
if account_name[0] == "@":
account_name = account_name[1:]
if account_name in beneficiaries_accounts:
continue
if w.find(":") == -1:
percentage = -1
else:
percentage = w.strip().split(":")[1]
if "%" in percentage:
percentage = percentage.strip().split("%")[0].strip()
percentage = float(percentage)
beneficiaries_sum += percentage
beneficiaries_list.append(
{"account": account_name, "weight": int(percentage * 100)}
)
beneficiaries_accounts.append(account_name)
missing = 0
for bene in beneficiaries_list:
if bene["weight"] < 0:
missing += 1
index = 0
for bene in beneficiaries_list:
if bene["weight"] < 0:
beneficiaries_list[index]["weight"] = int(
(int(100 * 100) - int(beneficiaries_sum * 100)) / missing
)
index += 1
sorted_beneficiaries = sorted(
beneficiaries_list, key=lambda beneficiaries_list: beneficiaries_list["account"]
)
return sorted_beneficiaries
def derive_tags(tags):
tags_list = []
if len(tags.split(",")) > 1:
for tag in tags.split(","):
tags_list.append(tag.strip())
elif len(tags.split(" ")) > 1:
for tag in tags.split(" "):
tags_list.append(tag.strip())
elif len(tags) > 0:
tags_list.append(tags.strip())
return tags_list
def seperate_yaml_dict_from_body(content):
parameter = {}
body = ""
if len(content.split("---\n")) > 1:
body = content[content.find("---\n", 1) + 4 :]
yaml_content = content[content.find("---\n") + 4 : content.find("---\n", 1)]
yaml=YAML(typ="safe")
parameter = yaml.load(yaml_content)
if not isinstance(parameter, dict):
parameter = yaml.load(yaml_content.replace(":", ": ").replace(" ", " "))
else:
body = content
return body, parameter
def create_yaml_header(comment, json_metadata={}, reply_identifier=None):
yaml_prefix = '---\n'
if comment["title"] != "":
yaml_prefix += 'title: "%s"\n' % comment["title"]
if "permlink" in comment:
yaml_prefix += 'permlink: %s\n' % comment["permlink"]
yaml_prefix += 'author: %s\n' % comment["author"]
if "author" in json_metadata:
yaml_prefix += 'authored by: %s\n' % json_metadata["author"]
if "description" in json_metadata:
yaml_prefix += 'description: "%s"\n' % json_metadata["description"]
if "canonical_url" in json_metadata:
yaml_prefix += 'canonical_url: %s\n' % json_metadata["canonical_url"]
if "app" in json_metadata:
yaml_prefix += 'app: %s\n' % json_metadata["app"]
if "last_update" in comment:
yaml_prefix += 'last_update: %s\n' % comment["last_update"]
elif "updated" in comment:
yaml_prefix += 'last_update: %s\n' % comment["updated"]
yaml_prefix += 'max_accepted_payout: %s\n' % str(comment["max_accepted_payout"])
if "percent_steem_dollars" in comment:
yaml_prefix += 'percent_steem_dollars: %s\n' % str(comment["percent_steem_dollars"])
elif "percent_hbd" in comment:
yaml_prefix += 'percent_hbd: %s\n' % str(comment["percent_hbd"])
if "tags" in json_metadata:
if len(json_metadata["tags"]) > 0 and comment["category"] != json_metadata["tags"][0] and len(comment["category"]) > 0:
yaml_prefix += 'community: %s\n' % comment["category"]
yaml_prefix += 'tags: %s\n' % ",".join(json_metadata["tags"])
if "beneficiaries" in comment:
beneficiaries = []
for b in comment["beneficiaries"]:
beneficiaries.append("%s:%.2f%%" % (b["account"], b["weight"] / 10000 * 100))
if len(beneficiaries) > 0:
yaml_prefix += 'beneficiaries: %s\n' % ",".join(beneficiaries)
if reply_identifier is not None:
yaml_prefix += 'reply_identifier: %s\n' % reply_identifier
yaml_prefix += '---\n'
return yaml_prefix
def load_dirty_json(dirty_json):
regex_replace = [(r"([ \{,:\[])(u)?'([^']+)'", r'\1"\3"'), (r" False([, \}\]])", r' false\1'), (r" True([, \}\]])", r' true\1')]
for r, s in regex_replace:
dirty_json = re.sub(r, s, dirty_json)
clean_json = json.loads(dirty_json)
return clean_json
def create_new_password(length=32):
"""Creates a random password containing alphanumeric chars with at least 1 number and 1 upper and lower char"""
alphabet = string.ascii_letters + string.digits
while True:
import_password = ''.join(secrets.choice(alphabet) for i in range(length))
if (any(c.islower() for c in import_password) and any(c.isupper() for c in import_password) and any(c.isdigit() for c in import_password)):
break
return import_password
def import_coldcard_wif(filename):
"""Reads a exported coldcard Wif text file and returns the WIF and used path"""
next_var = ""
import_password = ""
path = ""
with open(filename) as fp:
for line in fp:
if line.strip() == "":
continue
if line.strip() == "WIF (privkey):":
next_var = "wif"
continue
elif "Path Used" in line.strip():
next_var = "path"
continue
if next_var == "wif":
import_password = line.strip()
elif next_var == "path":
path = line
next_var = ""
return import_password, path.lstrip().replace("\n", "")
def generate_password(import_password, wif=1):
if wif > 0:
password = import_password
for _ in range(wif):
pk = PasswordKey("", password, role="")
password = str(pk.get_private())
password = 'P' + password
else:
password = import_password
return password
def import_pubkeys(import_pub):
if not os.path.isfile(import_pub):
raise Exception("File %s does not exist!" % import_pub)
with open(import_pub) as fp:
pubkeys = fp.read()
if pubkeys.find('\0') > 0:
with open(import_pub, encoding='utf-16') as fp:
pubkeys = fp.read()
pubkeys = ast.literal_eval(pubkeys)
owner = pubkeys["owner"]
active = pubkeys["active"]
posting = pubkeys["posting"]
memo = pubkeys["memo"]
return owner, active, posting, memo
def import_custom_json(jsonid, json_data):
data = {}
if isinstance(json_data, tuple) and len(json_data) > 1:
key = None
for j in json_data:
if key is None:
key = j
else:
data[key] = j
key = None
if key is not None:
print("Value is missing for key: %s" % key)
return None
else:
try:
with open(json_data[0], 'r') as f:
data = json.load(f)
except:
print("%s is not a valid file or json field" % json_data)
return None
for d in data:
if isinstance(data[d], str) and data[d][0] == "{" and data[d][-1] == "}":
field = {}
for keyvalue in data[d][1:-1].split(","):
key = keyvalue.split(":")[0].strip()
value = keyvalue.split(":")[1].strip()
if jsonid == "ssc-mainnet1" and key == "quantity":
value = float(value)
field[key] = value
data[d] = field
return data