holgern/beem

View on GitHub
beem/utils.py

Summary

Maintainability
F
4 days
Test Coverage
C
70%
# -*- coding: utf-8 -*-
import re
import json
import time as timenow
import math
from datetime import datetime, tzinfo, timedelta, date, time
import pytz
import difflib
from ruamel.yaml import YAML
import difflib
import secrets
import string
from beemgraphenebase.account import PasswordKey
import ast
import os

timeFormat = "%Y-%m-%dT%H:%M:%S"
# https://github.com/matiasb/python-unidiff/blob/master/unidiff/constants.py#L37
# @@ (source offset, length) (target offset, length) @@ (section header)
RE_HUNK_HEADER = re.compile(
    r"^@@ -(\d+)(?:,(\d+))? \+(\d+)(?:,(\d+))?\ @@[ ]?(.*)$", flags=re.MULTILINE
)


def formatTime(t):
    """ Properly Format Time for permlinks
    """
    if isinstance(t, float):
        return datetime.utcfromtimestamp(t).strftime("%Y%m%dt%H%M%S%Z")
    if isinstance(t, (datetime, date, time)):
        return t.strftime("%Y%m%dt%H%M%S%Z")


def addTzInfo(t, timezone="UTC"):
    """Returns a datetime object with tzinfo added"""
    if t and isinstance(t, (datetime, date, time)) and t.tzinfo is None:
        utc = pytz.timezone(timezone)
        t = utc.localize(t)
    return t


def formatTimeString(t):
    """ Properly Format Time for permlinks
    """
    if isinstance(t, (datetime, date, time)):
        return t.strftime(timeFormat)
    return addTzInfo(datetime.strptime(t, timeFormat))


def formatToTimeStamp(t):
    """ Returns a timestamp integer

        :param datetime t: datetime object
        :return: Timestamp as integer
    """
    if isinstance(t, (datetime, date, time)):
        t = addTzInfo(t)
    else:
        t = formatTimeString(t)
    epoch = addTzInfo(datetime(1970, 1, 1))
    return int((t - epoch).total_seconds())


def formatTimeFromNow(secs=0):
    """ Properly Format Time that is `x` seconds in the future

        :param int secs: Seconds to go in the future (`x>0`) or the
                         past (`x<0`)
        :return: Properly formated time for Graphene (`%Y-%m-%dT%H:%M:%S`)
        :rtype: str

    """
    return datetime.utcfromtimestamp(timenow.time() + int(secs)).strftime(timeFormat)


def formatTimedelta(td):
    """Format timedelta to String
    """
    if not isinstance(td, timedelta):
        return ""
    days, seconds = td.days, td.seconds
    hours = days * 24 + seconds // 3600
    minutes = (seconds % 3600) // 60
    seconds = seconds % 60
    return "%d:%s:%s" % (hours, str(minutes).zfill(2), str(seconds).zfill(2))


def parse_time(block_time):
    """Take a string representation of time from the blockchain, and parse it
       into datetime object.
    """
    utc = pytz.timezone("UTC")
    return utc.localize(datetime.strptime(block_time, timeFormat))


def assets_from_string(text):
    """Correctly split a string containing an asset pair.

    Splits the string into two assets with the separator being on of the
    following: ``:``, ``/``, or ``-``.
    """
    return re.split(r"[\-:\/]", text)


def sanitize_permlink(permlink):
    permlink = permlink.strip()
    permlink = re.sub(r"_|\s|\.", "-", permlink)
    permlink = re.sub(r"[^\w-]", "", permlink)
    permlink = re.sub(r"[^a-zA-Z0-9-]", "", permlink)
    permlink = permlink.lower()
    return permlink


def derive_permlink(title, parent_permlink=None, parent_author=None,
                    max_permlink_length=256, with_suffix=True):
    """Derive a permlink from a comment title (for root level
    comments) or the parent permlink and optionally the parent
    author (for replies).

    """
    suffix = "-" + formatTime(datetime.utcnow()) + "z"
    if parent_permlink and parent_author:
        prefix = "re-" + sanitize_permlink(parent_author) + "-"
        if with_suffix:
            rem_chars = max_permlink_length - len(suffix) - len(prefix)
        else:
            rem_chars = max_permlink_length - len(prefix)
        body = sanitize_permlink(parent_permlink)[:rem_chars]
        if with_suffix:
            return prefix + body + suffix
        else:
            return prefix + body
    elif parent_permlink:
        prefix = "re-"
        if with_suffix:
            rem_chars = max_permlink_length - len(suffix) - len(prefix)
        else:
            rem_chars = max_permlink_length - len(prefix)
        body = sanitize_permlink(parent_permlink)[:rem_chars]
        if with_suffix:
            return prefix + body + suffix
        else:
            return prefix + body
    else:
        if with_suffix:
            rem_chars = max_permlink_length - len(suffix)
        else:
            rem_chars = max_permlink_length
        body = sanitize_permlink(title)[:rem_chars]
        if len(body) == 0:  # empty title or title consisted of only special chars
            return suffix[1:]  # use timestamp only, strip leading "-"
        if with_suffix:
            return body + suffix
        else:
            return body


def resolve_authorperm(identifier):
    """Correctly split a string containing an authorperm.

    Splits the string into author and permlink with the
    following separator: ``/``.

    Examples:

        .. code-block:: python

            >>> from beem.utils import resolve_authorperm
            >>> author, permlink = resolve_authorperm('https://d.tube/#!/v/pottlund/m5cqkd1a')
            >>> author, permlink = resolve_authorperm("https://steemit.com/witness-category/@gtg/24lfrm-gtg-witness-log")
            >>> author, permlink = resolve_authorperm("@gtg/24lfrm-gtg-witness-log")
            >>> author, permlink = resolve_authorperm("https://busy.org/@gtg/24lfrm-gtg-witness-log")

    """
    # without any http(s)
    match = re.match(r"@?([\w\-\.]*)/([\w\-]*)", identifier)
    if hasattr(match, "group"):
        return match.group(1), match.group(2)
    # dtube url
    match = re.match(r"([\w\-\.]+[^#?\s]+)/#!/v/?([\w\-\.]*)/([\w\-]*)", identifier)
    if hasattr(match, "group"):
        return match.group(2), match.group(3)
    # url
    match = re.match(r"([\w\-\.]+[^#?\s]+)/@?([\w\-\.]*)/([\w\-]*)", identifier)
    if not hasattr(match, "group"):
        raise ValueError("Invalid identifier")
    return match.group(2), match.group(3)


def construct_authorperm(*args):
    """ Create a post identifier from comment/post object or arguments.
    Examples:

        .. code-block:: python

            >>> from beem.utils import construct_authorperm
            >>> print(construct_authorperm('username', 'permlink'))
            @username/permlink
            >>> print(construct_authorperm({'author': 'username', 'permlink': 'permlink'}))
            @username/permlink

    """
    username_prefix = "@"
    if len(args) == 1:
        op = args[0]
        author, permlink = op["author"], op["permlink"]
    elif len(args) == 2:
        author, permlink = args
    else:
        raise ValueError("construct_identifier() received unparsable arguments")

    fields = dict(prefix=username_prefix, author=author, permlink=permlink)
    return "{prefix}{author}/{permlink}".format(**fields)


def resolve_root_identifier(url):
    m = re.match(r"/([^/]*)/@([^/]*)/([^#]*).*", url)
    if not m:
        return "", ""
    else:
        category = m.group(1)
        author = m.group(2)
        permlink = m.group(3)
        return construct_authorperm(author, permlink), category


def resolve_authorpermvoter(identifier):
    """Correctly split a string containing an authorpermvoter.

    Splits the string into author and permlink with the
    following separator: ``/`` and ``|``.
    """
    pos = identifier.find("|")
    if pos < 0:
        raise ValueError("Invalid identifier")
    [author, permlink] = resolve_authorperm(identifier[:pos])
    return author, permlink, identifier[pos + 1 :]


def construct_authorpermvoter(*args):
    """ Create a vote identifier from vote object or arguments.
    Examples:

        .. code-block:: python

            >>> from beem.utils import construct_authorpermvoter
            >>> print(construct_authorpermvoter('username', 'permlink', 'voter'))
            @username/permlink|voter
            >>> print(construct_authorpermvoter({'author': 'username', 'permlink': 'permlink', 'voter': 'voter'}))
            @username/permlink|voter

    """
    username_prefix = "@"
    if len(args) == 1:
        op = args[0]
        if "authorperm" in op:
            authorperm, voter = op["authorperm"], op["voter"]
            [author, permlink] = resolve_authorperm(authorperm)
        else:
            author, permlink, voter = op["author"], op["permlink"], op["voter"]
    elif len(args) == 2:
        authorperm, voter = args
        [author, permlink] = resolve_authorperm(authorperm)
    elif len(args) == 3:
        author, permlink, voter = args
    else:
        raise ValueError("construct_identifier() received unparsable arguments")

    fields = dict(prefix=username_prefix, author=author, permlink=permlink, voter=voter)
    return "{prefix}{author}/{permlink}|{voter}".format(**fields)


def reputation_to_score(rep):
    """Converts the account reputation value into the reputation score"""
    if isinstance(rep, str):
        rep = int(rep)
    if rep == 0:
        return 25.0
    score = max([math.log10(abs(rep)) - 9, 0])
    if rep < 0:
        score *= -1
    score = (score * 9.0) + 25.0
    return score


def remove_from_dict(obj, keys=list(), keep_keys=True):
    """ Prune a class or dictionary of all but keys (keep_keys=True).
        Prune a class or dictionary of specified keys.(keep_keys=False).
    """
    if type(obj) == dict:
        items = list(obj.items())
    elif isinstance(obj, dict):
        items = list(obj.items())
    else:
        items = list(obj.__dict__.items())
    if keep_keys:
        return {k: v for k, v in items if k in keys}
    else:
        return {k: v for k, v in items if k not in keys}


def make_patch(a, b):
    import diff_match_patch as dmp_module
    dmp = dmp_module.diff_match_patch()
    patch = dmp.patch_make(a, b)
    patch_text = dmp.patch_toText(patch)   
    return patch_text


def findall_patch_hunks(body=None):
    return RE_HUNK_HEADER.findall(body)


def derive_beneficiaries(beneficiaries):
    beneficiaries_list = []
    beneficiaries_accounts = []
    beneficiaries_sum = 0
    if not isinstance(beneficiaries, list):
        beneficiaries = beneficiaries.split(",")

    for w in beneficiaries:
        account_name = w.strip().split(":")[0]
        if account_name[0] == "@":
            account_name = account_name[1:]
        if account_name in beneficiaries_accounts:
            continue
        if w.find(":") == -1:
            percentage = -1
        else:
            percentage = w.strip().split(":")[1]
            if "%" in percentage:
                percentage = percentage.strip().split("%")[0].strip()
            percentage = float(percentage)
            beneficiaries_sum += percentage
        beneficiaries_list.append(
            {"account": account_name, "weight": int(percentage * 100)}
        )
        beneficiaries_accounts.append(account_name)

    missing = 0
    for bene in beneficiaries_list:
        if bene["weight"] < 0:
            missing += 1
    index = 0
    for bene in beneficiaries_list:
        if bene["weight"] < 0:
            beneficiaries_list[index]["weight"] = int(
                (int(100 * 100) - int(beneficiaries_sum * 100)) / missing
            )
        index += 1
    sorted_beneficiaries = sorted(
        beneficiaries_list, key=lambda beneficiaries_list: beneficiaries_list["account"]
    )
    return sorted_beneficiaries


def derive_tags(tags):
    tags_list = []
    if len(tags.split(",")) > 1:
        for tag in tags.split(","):
            tags_list.append(tag.strip())
    elif len(tags.split(" ")) > 1:
        for tag in tags.split(" "):
            tags_list.append(tag.strip())
    elif len(tags) > 0:
        tags_list.append(tags.strip())
    return tags_list


def seperate_yaml_dict_from_body(content):
    parameter = {}
    body = ""
    if len(content.split("---\n")) > 1:
        body = content[content.find("---\n", 1) + 4 :]
        yaml_content = content[content.find("---\n") + 4 : content.find("---\n", 1)]
        yaml=YAML(typ="safe")
        parameter = yaml.load(yaml_content)
        if not isinstance(parameter, dict):
            parameter = yaml.load(yaml_content.replace(":", ": ").replace("  ", " "))
    else:
        body = content
    return body, parameter


def create_yaml_header(comment, json_metadata={}, reply_identifier=None):
    yaml_prefix = '---\n'
    if comment["title"] != "":
        yaml_prefix += 'title: "%s"\n' % comment["title"]
    if "permlink" in comment:
        yaml_prefix += 'permlink: %s\n' % comment["permlink"]
    yaml_prefix += 'author: %s\n' % comment["author"]
    if "author" in json_metadata:
        yaml_prefix += 'authored by: %s\n' % json_metadata["author"]
    if "description" in json_metadata:
        yaml_prefix += 'description: "%s"\n' % json_metadata["description"]
    if "canonical_url" in json_metadata:
        yaml_prefix += 'canonical_url: %s\n' % json_metadata["canonical_url"]
    if "app" in json_metadata:
        yaml_prefix += 'app: %s\n' % json_metadata["app"]
    if "last_update" in comment:
        yaml_prefix += 'last_update: %s\n' % comment["last_update"]
    elif "updated" in comment:
        yaml_prefix += 'last_update: %s\n' % comment["updated"]
    yaml_prefix += 'max_accepted_payout: %s\n' % str(comment["max_accepted_payout"])
    if "percent_steem_dollars" in comment:
        yaml_prefix += 'percent_steem_dollars: %s\n' %  str(comment["percent_steem_dollars"])
    elif "percent_hbd" in comment:
        yaml_prefix += 'percent_hbd: %s\n' %  str(comment["percent_hbd"])
    if "tags" in json_metadata:
        if len(json_metadata["tags"]) > 0 and comment["category"] != json_metadata["tags"][0] and len(comment["category"]) > 0:
            yaml_prefix += 'community: %s\n' % comment["category"]
        yaml_prefix += 'tags: %s\n' % ",".join(json_metadata["tags"])
    if "beneficiaries" in comment:
        beneficiaries = []
        for b in comment["beneficiaries"]:
            beneficiaries.append("%s:%.2f%%" % (b["account"], b["weight"] / 10000 * 100))
        if len(beneficiaries) > 0:
            yaml_prefix += 'beneficiaries: %s\n' % ",".join(beneficiaries)
    if reply_identifier is not None:
        yaml_prefix += 'reply_identifier: %s\n' % reply_identifier
    yaml_prefix += '---\n'
    return yaml_prefix

    
def load_dirty_json(dirty_json):
    regex_replace = [(r"([ \{,:\[])(u)?'([^']+)'", r'\1"\3"'), (r" False([, \}\]])", r' false\1'), (r" True([, \}\]])", r' true\1')]
    for r, s in regex_replace:
        dirty_json = re.sub(r, s, dirty_json)
    clean_json = json.loads(dirty_json)
    return clean_json    


def create_new_password(length=32):
    """Creates a random password containing alphanumeric chars with at least 1 number and 1 upper and lower char"""
    alphabet = string.ascii_letters + string.digits
    while True:
        import_password = ''.join(secrets.choice(alphabet) for i in range(length))
        if (any(c.islower() for c in import_password) and any(c.isupper() for c in import_password) and any(c.isdigit() for c in import_password)):
            break
    return import_password


def import_coldcard_wif(filename):
    """Reads a exported coldcard Wif text file and returns the WIF and used path"""
    next_var = ""
    import_password = ""
    path = ""
    with open(filename) as fp: 
        for line in fp:
            if line.strip() == "":
                continue
            if line.strip() == "WIF (privkey):":
                next_var = "wif"
                continue
            elif "Path Used" in line.strip():
                next_var = "path"
                continue
            if next_var == "wif":
                import_password = line.strip()
            elif next_var == "path":
                path = line
            next_var = ""
    return import_password, path.lstrip().replace("\n", "")


def generate_password(import_password, wif=1):
    if wif > 0:
        password = import_password
        for _ in range(wif):
            pk = PasswordKey("", password, role="")
            password = str(pk.get_private())
        password = 'P' + password
    else:
        password = import_password
    return password


def import_pubkeys(import_pub):
    if not os.path.isfile(import_pub):
        raise Exception("File %s does not exist!" % import_pub)
    with open(import_pub) as fp:
        pubkeys = fp.read()
    if pubkeys.find('\0') > 0:
        with open(import_pub, encoding='utf-16') as fp:
            pubkeys = fp.read()
    pubkeys = ast.literal_eval(pubkeys)
    owner = pubkeys["owner"]
    active = pubkeys["active"]
    posting = pubkeys["posting"]
    memo = pubkeys["memo"]
    return owner, active, posting, memo


def import_custom_json(jsonid, json_data):
    data = {}
    if isinstance(json_data, tuple) and len(json_data) > 1:
        key = None
        for j in json_data:
            if key is None:
                key = j
            else:
                data[key] = j
                key = None
        if key is not None:
            print("Value is missing for key: %s" % key)
            return None
    else:
        try:
            with open(json_data[0], 'r') as f:
                data = json.load(f)
        except:
            print("%s is not a valid file or json field" % json_data)
            return None
    for d in data:
        if isinstance(data[d], str) and data[d][0] == "{" and data[d][-1] == "}":
            field = {}
            for keyvalue in data[d][1:-1].split(","):
                key = keyvalue.split(":")[0].strip()
                value = keyvalue.split(":")[1].strip()
                if jsonid == "ssc-mainnet1" and key == "quantity":
                    value = float(value)
                field[key] = value
            data[d] = field
    return data