dragonchain/dragonchain

View on GitHub
dragonchain/transaction_processor/level_3_actions.py

Summary

Maintainability
A
1 hr
Test Coverage
A
97%
# Copyright 2020 Dragonchain, Inc.
# Licensed under the Apache License, Version 2.0 (the "Apache License")
# with the following modification; you may not use this file except in
# compliance with the Apache License and the following modification to it:
# Section 6. Trademarks. is deleted and replaced with:
#      6. Trademarks. This License does not grant permission to use the trade
#         names, trademarks, service marks, or product names of the Licensor
#         and its affiliates, except as required to comply with Section 4(c) of
#         the License and to reproduce the content of the NOTICE file.
# You may obtain a copy of the Apache License at
#     http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the Apache License with the above modification is
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the Apache License for the specific
# language governing permissions and limitations under the Apache License.

import os
import time
import math
from typing import Set, Union, Tuple, List, Iterable, TYPE_CHECKING

from dragonchain.lib.dao import block_dao
from dragonchain.lib.dto import l3_block_model
from dragonchain.lib import keys
from dragonchain.lib import broadcast
from dragonchain.lib import matchmaking
from dragonchain.lib import party
from dragonchain.lib import queue
from dragonchain import logger
from dragonchain.transaction_processor import shared_functions

if TYPE_CHECKING:
    from dragonchain.lib.dto import l2_block_model
    from dragonchain.lib.types import L1Headers


PROOF_SCHEME = os.environ["PROOF_SCHEME"].lower()
ADDRESS = os.environ["INTERNAL_ID"]

_log = logger.get_logger()


def execute() -> None:
    """Gets the next L2 block arrays from the queue and processes it"""
    matchmaking.renew_registration_if_necessary()
    t0 = time.time()

    l1_headers, l2_blocks = get_new_blocks()
    if l1_headers and l2_blocks:
        t1 = time.time()
        _log.info(f"[L3] Got next L2 block array from dcid: {l1_headers['dc_id']} blockid: {l1_headers['block_id']}")

        ddss, valid_block_count, regions, clouds = verify_blocks(l2_blocks, l1_headers)
        if not valid_block_count:
            _log.info("[L3] None of the L2 blocks sent up were valid. Not creating any block/verifications")
            clear_processing_blocks()
            recurse_if_necessary()
            return
        t2 = time.time()

        l3_block = create_block(l1_headers, ddss, valid_block_count, regions, clouds, l2_blocks)
        t3 = time.time()

        send_data(l3_block)
        t4 = time.time()

        # Clear our processing queue (finished successfully)
        clear_processing_blocks()

        total = t4 - t0
        _log.info(
            f"[L3] Processed {len(l2_blocks)} l2 blocks for l1 block id {l1_headers['dc_id']} with dcid {l1_headers['block_id']} in {total:.4f} seconds"
        )
        _log.info(f"[L3] Retrieving L2 block list from queue: {t1 - t0:.4f} sec ({((t1 - t0) / total) * 100:.1f}% of processing)")
        _log.info(f"[L3] Verified all L2 blocks in list: {t2 - t1:.4f} sec ({((t2 - t1) / total) * 100:.1f}% of processing)")
        _log.info(f"[L3] Creating block with proof: {t3 - t2:.4f} sec ({((t3 - t2) / total) * 100:.1f}% of processing)")
        _log.info(f"[L3] Uploading block and broadcasting down: {t4 - t3:.4f} sec ({((t4 - t3) / total) * 100:.1f}% of processing)")

        recurse_if_necessary()


def clear_processing_blocks() -> None:
    queue.clear_processing_queue()


def send_data(block: l3_block_model.L3BlockModel) -> None:
    _log.info("[L3] Uploading block")
    block_dao.insert_block(block)

    _log.info("[L3] Inserting complete. Broadcasting block")
    broadcast.dispatch(block)


def recurse_if_necessary() -> None:
    if queue.is_not_empty():
        _log.info("[L3] Another block is queue, immediately starting processing")
        execute()
    else:
        _log.info("[L3] Block processing complete and no new block to process. Waiting")


def get_new_blocks() -> Union[Tuple[None, None], Tuple["L1Headers", List["l2_block_model.L2BlockModel"]]]:
    # Safety check to recover after unexpected crash while creating last block if necessary
    queue.check_and_recover_processing_if_necessary()
    return queue.get_next_l2_blocks()


def get_verifying_keys(chain_id: str) -> keys.DCKeys:
    return keys.DCKeys(chain_id)


def verify_blocks(l2_blocks: Iterable["l2_block_model.L2BlockModel"], l1_headers: "L1Headers") -> Tuple[int, int, List[str], List[str]]:
    ddss = 0
    l2_count = 0
    regions: Set[str] = set()
    clouds: Set[str] = set()
    checked: Set[str] = set()
    for block in l2_blocks:
        # We use a checked array with proofs (which are unique) to make sure we don't process
        # a block twice, and ensures the block we're looking at is actually relevant
        check = (
            block.proof not in checked
            and block.l1_dc_id == l1_headers["dc_id"]
            and block.l1_block_id == l1_headers["block_id"]
            and block.l1_proof == l1_headers["proof"]
        )

        if check:
            clouds, regions, ddss, l2_count = verify_block(block, clouds, regions, ddss, l2_count)
        else:
            _log.info(f"[L3] L2 block was duplicated or not relevant to this verification.\n{block.__dict__}")

        # Finally, add this block into our checked blocks list
        checked.add(block.proof)

    return ddss, l2_count, list(regions), list(clouds)


def verify_block(
    block: "l2_block_model.L2BlockModel", clouds: Set[str], regions: Set[str], ddss: int, l2_count: int
) -> Tuple[Set[str], Set[str], int, int]:
    try:
        l2_verify_keys = get_verifying_keys(block.dc_id)
        _log.info(f"[L3] Verifying proof for L2 block id {block.block_id} from {block.dc_id}")
        if l2_verify_keys.verify_block(block):
            l2_count += 1
            l2_ddss = block.current_ddss or "0"
            matchmaking_config = matchmaking.get_registration(block.dc_id)
            clouds.add(matchmaking_config["cloud"])
            regions.add(matchmaking_config["region"])
            ddss += int(float(l2_ddss))
            _log.info(f"[L3] Finished processing valid L2 block {block.block_id}")
        else:
            _log.info(f"[L3] Proof for L2 block id {block.block_id} from {block.dc_id} was invalid. Not including block in stats.")
    except Exception:
        _log.exception("[L3] Could not get L2's verifying keys. Not incrementing stats for this block.")

    return clouds, regions, ddss, l2_count


def get_next_block_info() -> Tuple[int, str]:
    previous = block_dao.get_last_block_proof()
    _log.info(f"[L3] Got previous block information: {previous}")
    if not previous:
        # Throws an exception if sanity check fails
        shared_functions.sanity_check_empty_chain()
        block_id = 1
        prev_proof = ""
    else:
        block_id = int(previous["block_id"]) + 1
        prev_proof = previous["proof"]

    _log.info(f"[L3] Block ID: {block_id}")

    return block_id, prev_proof


def create_block(
    l1_headers: "L1Headers",
    ddss: Union[str, float, int],
    valid_block_count: int,
    regions: List[str],
    clouds: List[str],
    l2_blocks: Iterable["l2_block_model.L2BlockModel"],
) -> l3_block_model.L3BlockModel:
    block_id, prev_proof = get_next_block_info()

    # Pull configuration from matchmaking directly to get DDSS (not stored locally)
    l2_proofs = []
    for block in l2_blocks:
        l2_proofs.append({"dc_id": block.dc_id, "block_id": block.block_id, "proof": block.proof})

    l3_block = l3_block_model.L3BlockModel(
        dc_id=keys.get_public_id(),
        current_ddss=party.get_address_ddss(ADDRESS),  # Get DDSS from party, cached hourly
        block_id=str(block_id),
        timestamp=str(math.floor(time.time())),
        prev_proof=prev_proof,
        scheme=PROOF_SCHEME,
        l1_dc_id=l1_headers["dc_id"],
        l1_block_id=l1_headers["block_id"],
        l1_proof=l1_headers["proof"],
        l2_proofs=l2_proofs,
        ddss=str(ddss),
        l2_count=str(valid_block_count),
        regions=regions,
        clouds=clouds,
    )

    sign_block(l3_block)

    return l3_block


def sign_block(l3_block: l3_block_model.L3BlockModel) -> None:
    if PROOF_SCHEME == "work":
        _log.info("[L3] Performing PoW on block")
        l3_block.proof, l3_block.nonce = keys.get_my_keys().pow_block(l3_block)
    else:
        _log.info("[L3] Signing block")
        l3_block.proof = keys.get_my_keys().sign_block(l3_block)
    _log.info(f"[L3] Finished Block:\n{l3_block.export_as_at_rest()}")