dragonchain/dragonchain

View on GitHub
dragonchain/lib/broadcast.py

Summary

Maintainability
A
2 hrs
Test Coverage
F
16%
# Copyright 2020 Dragonchain, Inc.
# Licensed under the Apache License, Version 2.0 (the "Apache License")
# with the following modification; you may not use this file except in
# compliance with the Apache License and the following modification to it:
# Section 6. Trademarks. is deleted and replaced with:
#      6. Trademarks. This License does not grant permission to use the trade
#         names, trademarks, service marks, or product names of the Licensor
#         and its affiliates, except as required to comply with Section 4(c) of
#         the License and to reproduce the content of the NOTICE file.
# You may obtain a copy of the Apache License at
#     http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the Apache License with the above modification is
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the Apache License for the specific
# language governing permissions and limitations under the Apache License.

import os
import json
from typing import TYPE_CHECKING, cast

import requests

from dragonchain import logger
from dragonchain import exceptions
from dragonchain.lib import authorization
from dragonchain.lib import matchmaking
from dragonchain.lib.database import redis

if TYPE_CHECKING:
    from dragonchain.lib.dto import l5_block_model
    from dragonchain.lib.dto import model

LEVEL = os.environ["LEVEL"]

_log = logger.get_logger()


def dispatch(block: "model.BlockModel") -> None:
    if LEVEL == "5":
        _log.info(f"[BROADCAST] Sending receipts for block {block.block_id} down to lower nodes")
        send_receipts(cast("l5_block_model.L5BlockModel", block))  # This should always be an l5 block when l5
    else:
        _log.info(f"[BROADCAST] Sending receipt for block down to lower node: {block.get_associated_l1_dcid()}")
        send_receipt(block)


def send_receipt(block: "model.BlockModel") -> None:
    try:
        dcid = block.get_associated_l1_dcid()
        full_path = "/v1/receipt"
        url = f"{matchmaking.get_dragonchain_address(dcid)}{full_path}"
        headers, data = authorization.generate_authenticated_request("POST", dcid, full_path, block.export_as_at_rest())
        _log.info(f"----> {url}")
        r = requests.post(url, data=data, headers=headers, timeout=30)
        _log.info(f"<---- {r.status_code} {r.text}")
        if r.status_code != 200:
            _log.info(f"[BROADCAST] WARNING: failed to transmit to {dcid} with error {r.text}")
        else:
            _log.info("[BROADCAST] Sucessful receipt sent down to L1")
    except Exception:
        _log.error(f"[BROADCAST] ERROR: Couldn't broadcast receipt down to {dcid}! Ignoring")


def send_receipts(l5_block: "l5_block_model.L5BlockModel") -> None:
    receipt_path = "/v1/receipt"
    get_claim_path = "/v1/claim"
    chain_id_set = set()
    _log.info(f"l5 block to loop {l5_block.__dict__}")
    _log.info(f"Sending receipts to {len(l5_block.l4_blocks)} lower nodes")
    for l4_block in l5_block.l4_blocks:
        try:
            block_dictionary = json.loads(l4_block)
            chain_id = block_dictionary["l1_dc_id"]
            block = block_dictionary["l1_block_id"]
            full_claim_path = f"{get_claim_path}/{block}"
            # Get the claim data for billing
            claim_url = f"{matchmaking.get_dragonchain_address(chain_id)}{full_claim_path}"
            headers, _ = authorization.generate_authenticated_request("GET", chain_id, full_claim_path)
            _log.info(f"getting claim for {block} from {chain_id}")
            try:
                _log.info(f"----> {claim_url}")
                r = requests.get(claim_url, headers=headers, timeout=30)
                _log.info(f"<---- {r.status_code} {r.text}")
            except Exception:
                _log.exception("Failed to get claim!")
            if r.status_code != 200:
                _log.error(f"Claim check failed! Rejecting block {block} from {chain_id}")
                continue
            else:
                claim = r.json()
                # Add this L5's proof to the block
                _log.info(f"Claim received from l1 {claim}")
                _log.info(f"data points blockid {l5_block.block_id}  signature {l5_block.proof}")
                block_data = {}
                block_data["blockId"] = l5_block.block_id
                block_data["signature"] = l5_block.proof
                claim["validations"]["l5"][l5_block.dc_id] = block_data
                chain_id_set.add(chain_id)
                _log.info(f"Sending filled claim {claim}")
                try:
                    claim_check_id = f"{chain_id}-{block}"
                    matchmaking.resolve_claim_check(claim_check_id)
                except exceptions.MatchmakingRetryableError:  # any 500-level server errors
                    _log.exception(f"Adding claim to failed queue.  Claim ID: {claim_check_id}")
                    redis.sadd_sync("mq:failed-claims", claim_check_id)  # using a set avoids duplicates
                except Exception:
                    _log.exception("Failure to finalize claim in matchmaking. Sending receipts to lower level nodes.")
        except Exception as e:
            _log.exception(f"[BROADCAST] Error while trying to broadcast down for l4 block {l4_block}\n{e}\n!Will ignore this broadcast!")

    payload = l5_block.export_as_at_rest()
    for chain_id in chain_id_set:
        try:
            headers, data = authorization.generate_authenticated_request("POST", chain_id, receipt_path, payload)
            url = f"{matchmaking.get_dragonchain_address(chain_id)}{receipt_path}"
            _log.info(f"----> {url}")
            r = requests.post(url, data=data, headers=headers, timeout=30)
            _log.info(f"<---- {r.status_code} {r.text}")
            if r.status_code != 200:
                # TODO failed to enqueue block to specific l1, consider another call to matchmaking, etc
                _log.info(f"[BROADCAST] WARNING: failed to transmit to {chain_id} with error {r.text}")
            else:
                _log.info("[BROADCAST] Sucessful receipt sent down to L1")
        except Exception:
            _log.error(f"[BROADCAST] ERROR: Couldn't broadcast receipt down to {chain_id}! Ignoring")