dragonchain/dragonchain

View on GitHub
dragonchain/lib/matchmaking.py

Summary

Maintainability
B
6 hrs
Test Coverage
F
24%
# Copyright 2020 Dragonchain, Inc.
# Licensed under the Apache License, Version 2.0 (the "Apache License")
# with the following modification; you may not use this file except in
# compliance with the Apache License and the following modification to it:
# Section 6. Trademarks. is deleted and replaced with:
#      6. Trademarks. This License does not grant permission to use the trade
#         names, trademarks, service marks, or product names of the Licensor
#         and its affiliates, except as required to comply with Section 4(c) of
#         the License and to reproduce the content of the NOTICE file.
# You may obtain a copy of the Apache License at
#     http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the Apache License with the above modification is
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the Apache License for the specific
# language governing permissions and limitations under the Apache License.

import os
import json

import requests

from dragonchain.lib import authorization
from dragonchain.lib.dao import block_dao
from dragonchain.lib.dao import interchain_dao
from dragonchain.lib.interfaces import storage
from dragonchain.lib.database import redis
from dragonchain.lib import keys
from dragonchain import exceptions
from dragonchain import logger

LEVEL = os.environ["LEVEL"]
STAGE = os.environ["STAGE"]
REREGISTER_TIMING_KEY = "matchmaking:registration-still-current"
REREGISTER_TIME_AMOUNT = 1500  # 25 Min (Matchmaking forgets every 30)
REQUEST_TIMEOUT = 30
if STAGE == "prod":
    MATCHMAKING_ADDRESS = "https://matchmaking.api.dragonchain.com"
else:
    MATCHMAKING_ADDRESS = "https://matchmaking-dev.api.dragonchain.com"


_log = logger.get_logger()


def get_dragonchain_address(dragonchain_id: str) -> str:
    """Return the endpoint for a particular dragonchain
    Args:
        dragonchain_id: dragonchain id to fetch the endpoint
    Returns:
        String of dragonchain endpoint
    """
    return get_registration(dragonchain_id)["url"]


def get_matchmaking_config() -> dict:
    """Return the endpoint for a particular dragonchain
    Args:
        dragonchain_id: dragonchain id to fetch the endpoint
    Returns:
        String of dragonchain endpoint
    """
    config = {
        "level": int(os.environ["LEVEL"]),
        "url": os.environ["DRAGONCHAIN_ENDPOINT"],
        "scheme": os.environ.get("PROOF_SCHEME") or "trust",
        "hashAlgo": os.environ["HASH"],
        "version": os.environ["DRAGONCHAIN_VERSION"],
        "encryptionAlgo": os.environ["ENCRYPTION"],
    }

    if os.environ["LEVEL"] == "5":
        try:
            client = interchain_dao.get_default_interchain_client()
            config["network"] = client.get_network_string()
            config["interchainWallet"] = client.address
        except exceptions.NotFound:
            _log.warning("L5 chain does not have a default interchain network set")
        config["funded"] = bool(redis.get_sync("dc:isFunded", decode=False))
        config["broadcastInterval"] = float(os.environ.get("BROADCAST_INTERVAL") or "2")
    return config


def get_claim_request_dto(requirements: dict, block_id: str, transaction_count: int) -> dict:
    """Get the claim request DTO for matchmaking
    Args:
        requirements: matchmaking requirements for this block
        block_id: relevant block_id
        transactionCount: number of transactions in this block
    Returns:
        DTO as a dict
    """
    # Noqa for typing until we use TypedDict
    return {
        "numL2s": requirements["l2"].get("nodesRequired"),  # noqa: T484
        "numL3s": requirements["l3"].get("nodesRequired"),  # noqa: T484
        "numL4s": requirements["l4"].get("nodesRequired"),  # noqa: T484
        "numL5s": requirements["l5"].get("nodesRequired"),  # noqa: T484
        "blockId": str(block_id),
        "transactionCount": transaction_count,
    }


def update_funded_flag(flag_value: bool) -> None:
    key = "dc:isFunded"
    if flag_value:
        redis.set_sync(key, "a")  # Value does not matter
    else:
        redis.delete_sync(key)
    register()


def get_overwrite_no_response_dto(claim: dict, higher_level_node: str, higher_level: int) -> dict:
    """Get the DTO for updating a claim request which has an expired node
    Args:
        claim: Actual stored claim object to update
        higher_level_node: id of the node to replace
        higher_level: dragonchain level of the node to replace
    Returns:
        DTO to send to matchmaking in order to update the claim
    """
    return {"claim": claim, "dc_id": higher_level_node, "level": higher_level}


def register(retry: bool = True) -> None:
    """Register self with matchmaking
    Args:
        retry: Whether or not to retry this request if it fails
    """
    path = "/registration"
    body = get_matchmaking_config()
    body["token"] = os.environ.get("REGISTRATION_TOKEN")
    make_matchmaking_request("POST", path, body, retry)
    redis.set_sync(REREGISTER_TIMING_KEY, "a", ex=REREGISTER_TIME_AMOUNT)  # Value doesn't matter


def renew_registration_if_necessary() -> None:
    """Check and renew matchmaking registration if it's time"""
    if not redis.get_sync(REREGISTER_TIMING_KEY):
        register()


def get_registration(dc_id: str) -> dict:
    """Retrieve matchmaking config for any registered chain
    Args:
        dc_id: chain id to get registration data for
    Returns:
        Dictionary of the identity json returned by matchmaking
    Raises:
        exceptions.NotFound if matchmaking sends back an empty response
    """
    path = f"/registration/{dc_id}"
    response = make_matchmaking_request("GET", path, authenticated=False)
    registration = response.json()
    if not registration:
        raise exceptions.NotFound(f"Registration not found for {dc_id}")
    return registration


def update_registration(new_data: dict) -> None:
    try:
        _log.info(f"[MATCHMAKING] Putting matchmaking config in storage: {new_data}")
        matchmaking_config = storage.get_json_from_object("MATCHMAKING_CONFIG.json")
        matchmaking_config.update(new_data)
        storage.put_object_as_json("MATCHMAKING_CONFIG.json", matchmaking_config)
        register()
    except Exception:
        raise exceptions.MatchmakingError("Failure updating matchmaking data")


def create_claim_check(block_id: str, requirements: dict) -> dict:
    """Call matchmaking to create a claimcheck for a block
    Args:
        block_id: block id to create a claim check for
        requirements: requirements dict for this chain
    Returns:
        Parsed claim check (as dict) from matchmaking
    """
    broadcast_dto = block_dao.get_broadcast_dto(2, block_id)
    transaction_count = len(broadcast_dto["payload"]["transactions"])
    claim_request_dto = get_claim_request_dto(requirements, block_id, transaction_count)
    path = "/claim-check"
    response = make_matchmaking_request("POST", path, claim_request_dto)
    claim_check = response.json()
    cache_claim_check(block_id, claim_check)
    return claim_check


def get_claim_check(block_id: str) -> dict:
    """Get a claim check that already exists (memoized)
    Args:
        block_id: the block id of the claim check to fetch
    Returns:
        Parsed claim check (as dict)
    """
    claim_check = redis.hget_sync("broadcast:claimcheck", block_id, decode=False)
    if claim_check is not None:
        return json.loads(claim_check)
    path = f"/claim-check?blockId={block_id}&dcId={keys.get_public_id()}"
    response = make_matchmaking_request("GET", path)
    new_claim_check = response.json()
    cache_claim_check(block_id, new_claim_check)
    return new_claim_check


def get_or_create_claim_check(block_id: str, requirements: dict) -> dict:
    """Get a claim check for a block if it already exists, else create a new one
    Args:
        block_id: block_id for the claim check
        requirements: requirements for the claim check (if it needs to create one)
    Returns:
        Dict of claim check
    """
    try:
        return get_claim_check(block_id)
    except Exception:  # nosec (We don't care why getting a claim check failed, we will simply request a new one from matchmaking)
        pass
    return create_claim_check(block_id, requirements)


def get_claimed_txn_count(l1_dc_id: str, block_id: str) -> int:
    """Get the amount of transactions that were declared for a certain claim check
    Args:
        l1_dc_id: the id of the chain with the claim check
        block_id: the block id of the claim check
    Returns:
        Number of transactions matchmaking reported for this claim check
    """
    path = f"/claim-check?dcId={l1_dc_id}&blockId={block_id}&transactionCount=1"
    response = make_matchmaking_request("GET", path)
    claim_check = response.json()
    return claim_check["transactionCount"]


def overwrite_no_response_node(block_id: str, higher_level: int, higher_level_node: str) -> dict:
    """Update a claim check which has had a node that didn't respond in time
    Args:
        block_id: The block id for the claim that needs to be updated
        higher_level: the level of the node to replace
        higher_level_node: the id of the node to replace
    Returns:
        Updated claim check
    """
    claim = get_claim_check(block_id)
    body = get_overwrite_no_response_dto(claim, higher_level_node, higher_level)
    return update_claim_check(block_id, body)


def update_claim_check(block_id: str, data: dict) -> dict:
    """Update a claim check that already exists
    Args:
        block_id: block id of the claim check to update
        data: to update the claim check with
    Returns:
        Updated claim check from matchmaking
    """
    unique_id = f"{keys.get_public_id()}-{block_id}"
    path = f"/claim-check/{unique_id}"
    response = make_matchmaking_request("PUT", path, data)
    claim_check = response.json()
    cache_claim_check(block_id, claim_check)
    return claim_check


def add_receipt(l1_block_id: str, level: int, dc_id: str, block_id: str, proof: str) -> None:
    """Add a receipt to a claim check and cache the new claim check
    Args:
        l1_block_id: the level 1 block id of the claim check
        level: the level of the verification
        dc_id: the chain id of the verification
        block_id: the block id of the verification
        proof: the proof from the block of the verification
    """
    claim_check = get_claim_check(l1_block_id)
    try:
        data = {}
        data["blockId"] = block_id
        data["signature"] = proof
        _log.info(f"Adding to local matchmaking claim: {data}")
        claim_check["validations"][f"l{level}"][dc_id] = data
        cache_claim_check(block_id, claim_check)
    except Exception:
        _log.exception("Failure to add receipt to claimcheck")


def resolve_claim_check(claim_check_id: str) -> None:
    """Call matchmaking to delete a claim check for some reason
    Args:
        claim_check_id: the id of the claim check to delete
    """
    make_matchmaking_request("DELETE", f"/claim-check/{claim_check_id}")


def make_matchmaking_request(
    http_verb: str, path: str, json_content: dict = None, retry: bool = True, authenticated: bool = True
) -> requests.Response:
    """Make an authenticated request to matchmaking and return the response
    Args:
        http_verb: GET, POST, etc
        path: path of the request
        json_content: OPTIONAL if the request needs a post body, include it as a dictionary
        retry: boolean whether or not to recursively retry on recoverable errors (i.e. no auth, missing registration)
            Note: This should not be provided manually, and is only for recursive calls within the function it
        authenticated: boolean whether or not this matchmaking endpoint is authenticated
    Returns:
        Requests response object
    Raises:
        exceptions.MatchmakingError when unexpected matchmaking error occurs
        exceptions.InsufficientFunds when matchmaking responds with payment required
        exceptions.NotFound when matchmaking responds with a 404
    """
    if json_content is None:
        json_content = {}
    http_verb = http_verb.upper()
    _log.info(f"[MATCHMAKING] Performing {http_verb} request to {path} with data: {json_content}")
    headers, data = None, None
    if authenticated:
        headers, data = authorization.generate_authenticated_request(http_verb, "matchmaking", path, json_content)
    else:
        data = json.dumps(json_content, separators=(",", ":")).encode("utf-8") if json_content else b""
        headers = {"Content-Type": "application/json"} if json_content else {}

    response = requests.request(method=http_verb, url=f"{MATCHMAKING_ADDRESS}{path}", headers=headers, data=data, timeout=REQUEST_TIMEOUT)

    if response.status_code < 200 or response.status_code >= 300:
        if retry and response.status_code == 401 and authenticated:
            _log.warning("[MATCHMAKING] received 401 from matchmaking. Registering new key with matchmaking and trying again")
            authorization.register_new_key_with_matchmaking()
            return make_matchmaking_request(http_verb=http_verb, path=path, json_content=json_content, retry=False, authenticated=authenticated)
        elif retry and response.status_code == 403 and authenticated:
            _log.warning(
                "[MATCHMAKING] received 403 from matchmaking. Registration is expired or Dragon Net config is invalid. Re-registering and trying again"
            )
            register(retry=False)
            return make_matchmaking_request(http_verb=http_verb, path=path, json_content=json_content, retry=False, authenticated=authenticated)
        elif response.status_code == 402:
            raise exceptions.InsufficientFunds("received insufficient funds (402) from matchmaking")
        elif response.status_code == 404:
            raise exceptions.NotFound("Not found (404) from matchmaking")
        elif response.status_code == 409:
            raise exceptions.UnableToUpdate("Matchmaking could not find enough nodes to verify this block")
        elif response.status_code >= 500:
            raise exceptions.MatchmakingRetryableError(f"[MATCHMAKING] Server error {response.status_code} from matchmaking")
        raise exceptions.MatchmakingError(
            f"Received unexpected response code {response.status_code} from matchmaking with response:\n{response.text}"
        )

    return response


def cache_claim_check(block_id: str, claim_check: dict) -> None:
    """Cache a claim check in redis
    Args:
        block_id: the block id of the claim check that's being cached
        claim_check: the actual claim check to cache
    """
    try:
        redis.hset_sync("broadcast:claimcheck", str(block_id), json.dumps(claim_check, separators=(",", ":")))
    except Exception:
        _log.exception("Failure uploading claim to storage")