holgern/beem

View on GitHub
examples/benchmark_nodes2.py

Summary

Maintainability
C
1 day
Test Coverage
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from __future__ import unicode_literals
import sys
from datetime import datetime, timedelta
import time
import io
from timeit import default_timer as timer
import logging
from prettytable import PrettyTable
from beem.blockchain import Blockchain
from beem.account import Account
from beem.block import Block
from beem.steem import Steem
from beem.utils import parse_time, formatTimedelta, construct_authorperm, resolve_authorperm, resolve_authorpermvoter, construct_authorpermvoter, formatTimeString
from beem.comment import Comment
from beem.nodelist import NodeList
from beem.vote import Vote
from beemapi.exceptions import NumRetriesReached
FUTURES_MODULE = None
if not FUTURES_MODULE:
    try:
        from concurrent.futures import ThreadPoolExecutor, wait, as_completed
        FUTURES_MODULE = "futures"
    except ImportError:
        FUTURES_MODULE = None
log = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO)
quit_thread = False


def benchmark_node(node, how_many_minutes=10, how_many_seconds=30):
    block_count = 0
    history_count = 0
    access_time = 0
    follow_time = 0
    blockchain_version = u'0.0.0'
    successful = True
    error_msg = None
    start_total = timer()
    max_batch_size = None
    threading = False
    thread_num = 16

    authorpermvoter = u"@gtg/steem-pressure-4-need-for-speed|gandalf"
    [author, permlink, voter] = resolve_authorpermvoter(authorpermvoter)
    authorperm = construct_authorperm(author, permlink)
    last_block_id = 19273700
    try:
        stm = Steem(node=node, num_retries=3, num_retries_call=3, timeout=30)
        blockchain = Blockchain(steem_instance=stm)
        blockchain_version = stm.get_blockchain_version()

        last_block = Block(last_block_id, steem_instance=stm)

        stopTime = last_block.time() + timedelta(seconds=how_many_minutes * 60)
        total_transaction = 0

        start = timer()
        for entry in blockchain.blocks(start=last_block_id, max_batch_size=max_batch_size, threading=threading, thread_num=thread_num):
            block_no = entry.identifier
            block_count += 1
            if "block" in entry:
                trxs = entry["block"]["transactions"]
            else:
                trxs = entry["transactions"]

            for tx in trxs:
                for op in tx["operations"]:
                    total_transaction += 1
            if "block" in entry:
                block_time = parse_time(entry["block"]["timestamp"])
            else:
                block_time = parse_time(entry["timestamp"])

            if block_time > stopTime:
                last_block_id = block_no
                break
            if timer() - start > how_many_seconds or quit_thread:
                break
    except NumRetriesReached:
        error_msg = 'NumRetriesReached'
        block_count = -1
    except KeyboardInterrupt:
        error_msg = 'KeyboardInterrupt'
        # quit = True
    except Exception as e:
        error_msg = str(e)
        block_count = -1

    try:
        stm = Steem(node=node, num_retries=3, num_retries_call=3, timeout=30)
        account = Account("gtg", steem_instance=stm)
        blockchain_version = stm.get_blockchain_version()

        start = timer()
        for acc_op in account.history_reverse(batch_size=100):
            history_count += 1
            if timer() - start > how_many_seconds or quit_thread:
                break
    except NumRetriesReached:
        error_msg = 'NumRetriesReached'
        history_count = -1
        successful = False
    except KeyboardInterrupt:
        error_msg = 'KeyboardInterrupt'
        history_count = -1
        successful = False
        # quit = True
    except Exception as e:
        error_msg = str(e)
        history_count = -1
        successful = False

    try:
        stm = Steem(node=node, num_retries=3, num_retries_call=3, timeout=30)
        account = Account("gtg", steem_instance=stm)
        blockchain_version = stm.get_blockchain_version()

        start = timer()
        Vote(authorpermvoter, steem_instance=stm)
        stop = timer()
        vote_time = stop - start
        start = timer()
        Comment(authorperm, steem_instance=stm)
        stop = timer()
        comment_time = stop - start
        start = timer()
        Account(author, steem_instance=stm)
        stop = timer()
        account_time = stop - start
        start = timer()
        account.get_followers()
        stop = timer()
        follow_time = stop - start
        access_time = (vote_time + comment_time + account_time + follow_time) / 4.0
    except NumRetriesReached:
        error_msg = 'NumRetriesReached'
        access_time = -1
    except KeyboardInterrupt:
        error_msg = 'KeyboardInterrupt'
        # quit = True
    except Exception as e:
        error_msg = str(e)
        access_time = -1
    return {'successful': successful, 'node': node, 'error': error_msg,
            'total_duration': timer() - start_total, 'block_count': block_count,
            'history_count': history_count, 'access_time': access_time, 'follow_time': follow_time,
            'version': blockchain_version}


if __name__ == "__main__":
    how_many_seconds = 30
    how_many_minutes = 10
    threading = True
    set_default_nodes = False
    quit_thread = False
    benchmark_time = timer()

    nodelist = NodeList()
    nodes = nodelist.get_nodes(normal=True, appbase=True, dev=True)
    t = PrettyTable(["node", "N blocks", "N acc hist", "dur. call in s"])
    t.align = "l"
    t2 = PrettyTable(["node", "version"])
    t2.align = "l"
    working_nodes = []
    results = []
    if threading and FUTURES_MODULE:
        pool = ThreadPoolExecutor(max_workers=len(nodes) + 1)
        futures = []
        for node in nodes:
            futures.append(pool.submit(benchmark_node, node, how_many_minutes, how_many_seconds))
        try:
            results = [r.result() for r in as_completed(futures)]
        except KeyboardInterrupt:
            quit_thread = True
            print("benchmark aborted.")
    else:
        for node in nodes:
            print("Current node:", node)
            result = benchmark_node(node, how_many_minutes, how_many_seconds)
            results.append(result)
    for result in results:
        t2.add_row([result["node"], result["version"]])
    print(t2)
    print("\n")

    sortedList = sorted(results, key=lambda self: self["history_count"], reverse=True)
    for result in sortedList:
        if result["successful"]:
            t.add_row([
                result["node"],
                result["block_count"],
                result["history_count"],
                ("%.2f" % (result["access_time"]))
            ])
            working_nodes.append(result["node"])
    print(t)
    print("\n")
    print("Total benchmark time: %.2f s\n" % (timer() - benchmark_time))
    if set_default_nodes:
        stm = Steem(offline=True)
        stm.set_default_nodes(working_nodes)
    else:
        print("beempy set nodes " + str(working_nodes))