dragonchain/dragonchain

View on GitHub
dragonchain/transaction_processor/level_4_actions.py

Summary

Maintainability
A
25 mins
Test Coverage
A
92%
# Copyright 2020 Dragonchain, Inc.
# Licensed under the Apache License, Version 2.0 (the "Apache License")
# with the following modification; you may not use this file except in
# compliance with the Apache License and the following modification to it:
# Section 6. Trademarks. is deleted and replaced with:
#      6. Trademarks. This License does not grant permission to use the trade
#         names, trademarks, service marks, or product names of the Licensor
#         and its affiliates, except as required to comply with Section 4(c) of
#         the License and to reproduce the content of the NOTICE file.
# You may obtain a copy of the Apache License at
#     http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the Apache License with the above modification is
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the Apache License for the specific
# language governing permissions and limitations under the Apache License.

import os
import time
import math
from typing import Set, Dict, List, Union, Tuple, Any, TYPE_CHECKING

from dragonchain.lib.dao import block_dao
from dragonchain.lib.dto import l4_block_model
from dragonchain.lib import matchmaking
from dragonchain.lib import broadcast
from dragonchain.lib import keys
from dragonchain.lib import party
from dragonchain.lib import queue
from dragonchain import logger
from dragonchain.transaction_processor import shared_functions

if TYPE_CHECKING:
    from dragonchain.lib.types import L1Headers
    from dragonchain.lib.dto import l3_block_model

PROOF_SCHEME = os.environ["PROOF_SCHEME"].lower()
ADDRESS = os.environ["INTERNAL_ID"]
CHAIN_NAME = os.environ["DRAGONCHAIN_NAME"]

_log = logger.get_logger()


def execute() -> None:
    """Gets the next L3 block content from the queue and processes it"""
    matchmaking.renew_registration_if_necessary()
    t0 = time.time()

    l1_headers, l3_blocks = get_new_blocks()
    if l3_blocks and l1_headers:
        _log.info(f"[L4] Got next L3 block array from dcid: {l1_headers['dc_id']} for blockid: {l1_headers['block_id']}")
        t1 = time.time()

        validations = verify_blocks(l3_blocks, l1_headers)
        t2 = time.time()

        l4_block = create_block(l1_headers, validations)
        t3 = time.time()

        send_data(l4_block)
        t4 = time.time()

        # Clear our processing queue (finished successfully)
        clear_processing_blocks()

        total = t4 - t0
        _log.info(f"[L4] Processed block l3 blocks in {total:.4f} seconds")
        _log.info(f"[L4] Retrieving L3 block from queue: {t1 - t0:.4f} sec ({((t1 - t0) / total) * 100:.1f}% of processing)")
        _log.info(f"[L4] Validating all L3 block proofs: {t2 - t1:.4f} sec ({((t2 - t1) / total) * 100:.1f}% of processing)")
        _log.info(f"[L4] Creating block with proof: {t3 - t2:.4f} sec ({((t3 - t2) / total) * 100:.1f}% of processing)")
        _log.info(f"[L4] Uploading block and broadcasting down: {t4 - t3:.4f} sec ({((t4 - t3) / total) * 100:.1f}% of processing)")

        recurse_if_necessary()
    else:
        # Clear our processing queue
        clear_processing_blocks()

    if l1_headers is not None and l3_blocks is None:
        try:
            _log.warning(f"Bad Block received from lower level. L1 Headers: {l1_headers}")
        except Exception:  # nosec (We don't care if l1_headers is an error/not defined and this log fails)
            pass

        # Clear our processing queue
        clear_processing_blocks()


def clear_processing_blocks() -> None:
    queue.clear_processing_queue()


def get_new_blocks() -> Union[Tuple[None, None], Tuple["L1Headers", List["l3_block_model.L3BlockModel"]]]:
    # Safety check to recover after unexpected crash while creating last block if necessary
    queue.check_and_recover_processing_if_necessary()
    return queue.get_next_l3_block()


def send_data(block: l4_block_model.L4BlockModel) -> None:
    _log.info("[L4] Block created. Uploading data to storage")
    block_dao.insert_block(block)

    _log.info("[L4] Inserting complete. Broadcasting block")
    broadcast.dispatch(block)


def get_verifying_keys(chain_id: str) -> keys.DCKeys:
    return keys.DCKeys(chain_id)


def recurse_if_necessary() -> None:
    if queue.is_not_empty():
        _log.info("[L4] Another block is queue, immediately starting processing")
        execute()
    else:
        _log.info("[L4] Block processing complete and no new block to process. Waiting")


def verify_blocks(l3_blocks: List["l3_block_model.L3BlockModel"], l1_headers: "L1Headers") -> List[Dict[str, Any]]:
    validations = []
    checked: Set[str] = set()
    for block in l3_blocks:
        # We use a checked array with proofs (which are unique) to make sure we don't process
        # a block twice, and ensure the block we're looking at is actually relevant
        check = (
            block.proof not in checked
            and block.l1_dc_id == l1_headers["dc_id"]
            and block.l1_block_id == l1_headers["block_id"]
            and block.l1_proof == l1_headers["proof"]
        )

        if check:
            validations.append(verify_block(block))
        else:
            _log.info(f"[L4] L3 block was duplicated or not relevant to this verification.\nBlock in question: {block.__dict__}")
        # Finally, add this block into our checked blocks list
        checked.add(block.proof)

    return validations


def verify_block(block: "l3_block_model.L3BlockModel") -> Dict[str, Any]:
    try:
        verify_keys = get_verifying_keys(block.dc_id)
        _log.info(f"[L4] Verifying proof for L2 block id {block.block_id} from {block.dc_id}")
        if verify_keys.verify_block(block):
            verification = True
            _log.info(f"[L4] Finished processing valid L3 block {block.block_id}")
        else:
            verification = False
            _log.info(f"[L4] Proof for L3 block id {block.block_id} from {block.dc_id} was invalid")
    except Exception:
        _log.exception("[L4] Could not get L3 chain's verifying keys. Marking block as invalid.")
        verification = False
    return {"l3_dc_id": block.dc_id, "l3_block_id": block.block_id, "l3_proof": block.proof, "valid": verification}


def create_block(l1_headers: "L1Headers", validations: List[Dict[str, Any]]) -> l4_block_model.L4BlockModel:
    block_id, prev_proof = get_next_block_info()

    l4_block = l4_block_model.L4BlockModel(
        dc_id=keys.get_public_id(),
        current_ddss=party.get_address_ddss(ADDRESS),  # Get DDSS from party, cached hourly
        block_id=str(block_id),
        timestamp=str(math.floor(time.time())),
        prev_proof=prev_proof,
        scheme=PROOF_SCHEME,
        l1_dc_id=l1_headers["dc_id"],
        l1_block_id=l1_headers["block_id"],
        l1_proof=l1_headers["proof"],
        validations=validations,
        chain_name=CHAIN_NAME,
    )

    sign_block(l4_block)

    return l4_block


def get_next_block_info() -> Tuple[int, str]:
    previous = block_dao.get_last_block_proof()
    _log.info(f"[L4] Got previous block information: {previous}")
    if not previous:
        # Throws an exception if sanity check fails
        shared_functions.sanity_check_empty_chain()
        block_id = 1
        prev_proof = ""
    else:
        block_id = int(previous["block_id"]) + 1
        prev_proof = previous["proof"]
        _log.info(f"[L4] Got previous block id, incremented current block id to {block_id}")

    return block_id, prev_proof


def sign_block(l4_block: l4_block_model.L4BlockModel) -> None:
    if PROOF_SCHEME == "work":
        _log.info("[L4] Performing PoW on block")
        l4_block.proof, l4_block.nonce = keys.get_my_keys().pow_block(l4_block)
    else:
        _log.info("[L4] Signing block")
        l4_block.proof = keys.get_my_keys().sign_block(l4_block)
    _log.info(f"[L4] Finished Block:\n{l4_block.export_as_at_rest()}")