dragonchain/dragonchain

View on GitHub
dragonchain/lib/keys.py

Summary

Maintainability
C
1 day
Test Coverage
F
40%
# Copyright 2020 Dragonchain, Inc.
# Licensed under the Apache License, Version 2.0 (the "Apache License")
# with the following modification; you may not use this file except in
# compliance with the Apache License and the following modification to it:
# Section 6. Trademarks. is deleted and replaced with:
#      6. Trademarks. This License does not grant permission to use the trade
#         names, trademarks, service marks, or product names of the Licensor
#         and its affiliates, except as required to comply with Section 4(c) of
#         the License and to reproduce the content of the NOTICE file.
# You may obtain a copy of the Apache License at
#     http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the Apache License with the above modification is
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the Apache License for the specific
# language governing permissions and limitations under the Apache License.

import os
import enum
import base64
from typing import cast, Tuple, Optional, TYPE_CHECKING

import base58
import secp256k1

from dragonchain.lib import crypto
from dragonchain.lib import matchmaking
from dragonchain.lib.interfaces import secrets
from dragonchain import exceptions

if TYPE_CHECKING:
    from dragonchain.lib.dto import model
    from dragonchain.lib.dto import transaction_model
    from dragonchain.lib.dto import l1_block_model  # noqa: F401
    from dragonchain.lib.dto import l2_block_model  # noqa: F401
    from dragonchain.lib.dto import l3_block_model  # noqa: F401
    from dragonchain.lib.dto import l4_block_model  # noqa: F401
    from dragonchain.lib.dto import l5_block_model  # noqa: F401

LEVEL = os.environ["LEVEL"]
PROOF_SCHEME = os.environ["PROOF_SCHEME"]
HASH = os.environ["HASH"]
ENCRYPTION = os.environ["ENCRYPTION"]


_my_keys = None
_public_id = ""


def get_public_id() -> str:
    """Memo-ized retrieval/calculation for the chain's derived public id
    Returns:
        String of the chain's public_id
    """
    global _public_id
    if not _public_id:
        keys = get_my_keys()
        if not keys.priv:
            raise RuntimeError("My keys doesn't have private key set")
        _public_id = crypto.calculate_public_id(keys.priv.pubkey.serialize())
    return _public_id


def get_my_keys() -> "DCKeys":
    """
    Memo-ized retrieval for the keys owned by this chain
    """
    global _my_keys
    if _my_keys is None:
        _my_keys = DCKeys(pull_keys=False).initialize(
            level=int(LEVEL), scheme=PROOF_SCHEME, private_key_string=secrets.get_dc_secret("private-key"), hash_type=HASH, encryption=ENCRYPTION
        )
    return _my_keys


class SupportedSchemes(enum.Enum):
    trust = 1
    work = 2


class DCKeys(object):
    """
    Class for holding keys to sign/verify transactions/blocks of individual dragonchains
    """

    def __init__(self, dc_id: str = "", pull_keys: bool = True):
        """Constructor to retrieve and initialize keys for a dragonchain
        Args:
            dc_id: string of the dragonchain id to initialize the keys (defaults to self)
            pull_keys: boolean whether or not to pull key info from storage (if own keys) or matchmaking (if other chain keys) on initialization
        """
        if pull_keys:
            identity = matchmaking.get_registration(dc_id)
            self.initialize(
                level=int(identity["level"]),
                scheme=identity["proofScheme"],
                public_key_string=dc_id,
                hash_type=identity["hashAlgo"],
                encryption=identity["encryptionAlgo"],
            )

    def initialize(
        self,
        level: int = 1,
        scheme: str = "trust",
        private_key_string: Optional[str] = None,
        public_key_string: Optional[str] = None,
        hash_type: str = "blake2b",
        encryption: str = "secp256k1",
    ) -> "DCKeys":
        """Initializes the internal state of the keys object with local data that is passed in
        Args:
            level: integer of the dragonchain level that owns these key(s)
            private_key_string: base64 encoded private key string, setting this will automatically derive the public key
            public_key_string: base64 encoded public key string, only used for verification functions
        Returns:
            Self (initialized DCKeys instance)
        """
        self.priv = None
        self.pub = None
        self.set_scheme(scheme)
        self.set_level(level)
        self.set_hash(hash_type)
        self.set_encryption(encryption)
        if public_key_string is not None:
            self.set_public_key(public_key_string)
        if private_key_string is not None:
            self.set_private_key(private_key_string)
        return self

    def set_scheme(self, scheme: str) -> None:
        """Sets the dragonchain scheme for this instantiation of the class
        Args:
            scheme: string of a valid proof scheme
        Raises:
            exceptions.NotImplementedError when invalid scheme
        """
        scheme = scheme.lower()
        if scheme == "trust":
            self.scheme = SupportedSchemes.trust
        elif scheme == "work":
            self.scheme = SupportedSchemes.work
        else:
            raise NotImplementedError("Unimplemented or invalid scheme type")

    def set_level(self, level: int) -> None:
        """Sets the dragonchain level for this instantiation of the class
        Args:
            level: int between 1 and 5
        Raises:
            exceptions.TypeError when invalid level
        """
        if not isinstance(level, int) or level > 5 or level < 1:
            raise TypeError("Level was not a valid int")
        self.level = level

    def set_encryption(self, encryption_type: str) -> None:
        """Sets the encryption type for this instanation of the class
        Args:
            encryption_type: sring fo the encryption type (eg secp256k1)
        Raises:
            NotImplementedError when invalid encryption_type
        """
        # Modify this statement as support for other encryption algorithms change in crypto.py
        if encryption_type == "secp256k1":
            self.encryption = crypto.SupportedEncryption.secp256k1
        else:
            raise NotImplementedError(f"Encryption type {encryption_type} not implemented")

    def set_hash(self, hash_type: str) -> None:
        """Sets the hash type for this instantiation of the class
        Args:
            hash_type: string of the hash type (eg 'blake2b')
        Raises:
            NotImplementedError when invalid hash_type
        """
        # Modify this statement as support for other hashing algorithms change in crypto.py
        if hash_type == "blake2b":
            self.hash = crypto.SupportedHashes.blake2b
        elif hash_type == "sha256":
            self.hash = crypto.SupportedHashes.sha256
        elif hash_type == "sha3_256":
            self.hash = crypto.SupportedHashes.sha3_256
        else:
            raise NotImplementedError(f"Hash type {hash_type} not implemented")

    def set_private_key(self, private_key_string: str) -> None:
        """Sets the private (and derives the public) key for this instantiation of the class
        Note: This must be called AFTER self.encryption (set_encryption) is already set
        Args:
            private_key_string: base64 encoded private key string
        Raises:
            NotImplementedError when invalid encryption type on self
        """
        decoded_key = base64.b64decode(private_key_string)
        # Modify this statement as support for other encryption algorithms change
        if self.encryption == crypto.SupportedEncryption.secp256k1:
            priv_key = secp256k1.PrivateKey(privkey=decoded_key, raw=True)
            self.priv = priv_key
            self.pub = priv_key.pubkey
        else:
            raise NotImplementedError("Encryption algorithm not implemented")

    def set_public_key(self, public_key_string: str) -> None:
        """Sets the public key (and clears the private key) for this instantiation of the class
        Note: This must be called AFTER self.encryption (set_encryption) is already set
        Args:
            public_key_string: base58 encoded public key string (public dragonchain id)
        Raises:
            NotImplementedError when invalid encryption_type
        """
        decoded_key = base58.b58decode(public_key_string)
        # Modify this statement as support for other encryption algorithms change
        if self.encryption == crypto.SupportedEncryption.secp256k1:
            self.pub = secp256k1.PublicKey(pubkey=decoded_key, raw=True)
        else:
            raise NotImplementedError("Encryption algorithm not implemented")

        self.priv = None

    def check_signature(self, content: bytes, signature: str, hash_type: Optional[crypto.SupportedHashes] = None) -> bool:
        """Check a generic signature for some bytes of data with these keys
        Args:
            content: content bytes that were signed
            signature: Base64 encoded signature (as a string)
            hash_type: SupportedHashes enum of hash type used to check signature (defaults to keys hash type if not provided)
        Returns:
            boolean if signature is valid or not
        """
        if hash_type is None:
            hash_type = self.hash
        return crypto.check_generic_signature(self.encryption, hash_type, self.pub, content, base64.b64decode(signature))

    def make_signature(self, content: bytes, hash_type: Optional[crypto.SupportedHashes] = None) -> str:
        """Make a generic signature for some bytes of data
        Args:
            content: content bytes to sign
            hash_type: Hash type to use for this signature. Defaults to chain's hash type if not explicitly provided
        Returns:
            base64 encoded signature string
        Raises:
            RuntimeError when no private key is set
        """
        if self.priv is None:
            raise RuntimeError("No private key has been set for signing")
        if hash_type is None:
            hash_type = self.hash
        return crypto.make_generic_signature(self.encryption, hash_type, self.priv, content)

    def make_binance_signature(self, content: bytes) -> str:
        """Make a generic signature for some bytes of data
        Args:
            content: json bytes to sign
        Returns:
            base64 encoded signature string
        Raises:
            RuntimeError when no private key is set
        """
        if self.priv is None:
            raise RuntimeError("No private key has been set for signing")
        message = crypto.hash_bytes(hash_type=crypto.SupportedHashes.sha256, bytes_param=content)
        return crypto.encrypt_secp256k1_message_compact(self.priv, message)

    def sign_block(self, signable_block: "model.BlockModel") -> str:
        """Sign a block with this class' keys
        Args:
            block: BlockModel to sign
        Returns:
            Base64 encoded string of the block signature
        Raises:
            exceptions.InvalidNodeLevel when invalid level on self
            RuntimeError when no private key is set on self
        """
        if self.priv is None:
            raise RuntimeError("No private key has been set for signing")
        if self.level == 1:
            return crypto.sign_l1_block(self.hash, self.encryption, self.priv, signable_block)
        elif self.level == 2:
            return crypto.sign_l2_block(self.hash, self.encryption, self.priv, signable_block)
        elif self.level == 3:
            return crypto.sign_l3_block(self.hash, self.encryption, self.priv, signable_block)
        elif self.level == 4:
            return crypto.sign_l4_block(self.hash, self.encryption, self.priv, signable_block)
        elif self.level == 5:
            return crypto.sign_l5_block(self.hash, self.encryption, self.priv, signable_block)
        else:
            raise exceptions.InvalidNodeLevel(f"Node level {self.level} not implemented yet")

    def hash_l5_for_public_broadcast(self, signable_block: "l5_block_model.L5BlockModel") -> str:
        """Hash a block with this class' keys
        Args:
            block: BlockModel to sign
        Returns:
            Base64 encoded string of the block hash
        """
        return base64.b64encode(crypto.hash_l5_block(self.hash, signable_block)).decode("ascii")

    def pow_block(self, signable_block: "model.BlockModel") -> Tuple[str, int]:
        """Do proof of work on a block
        Args:
            block: BlockModel to do proof of work on
        Returns:
            Tuple where index 0 is a Base64 encoded string of the generated hash and index 1 is the nonce
        Raises:
            exceptions.InvalidNodeLevel when invalid level on self
        """
        if self.level == 1:
            return crypto.pow_l1_block(self.hash, cast("l1_block_model.L1BlockModel", signable_block))
        elif self.level == 2:
            return crypto.pow_l2_block(self.hash, cast("l2_block_model.L2BlockModel", signable_block))
        elif self.level == 3:
            return crypto.pow_l3_block(self.hash, cast("l3_block_model.L3BlockModel", signable_block))
        elif self.level == 4:
            return crypto.pow_l4_block(self.hash, cast("l4_block_model.L4BlockModel", signable_block))
        else:
            raise exceptions.InvalidNodeLevel(f"Node level {self.level} not implemented yet")

    def verify_block(self, block: "model.BlockModel") -> bool:  # noqa: C901
        """Verify a block with this class' keys
        Args:
            block: BlockModel to verify
        Returns:
            Boolean if valid block (according to these keys)
        Raises:
            NotImplementedError when invalid scheme on self
        """
        if self.scheme == SupportedSchemes.trust and self.pub is None:
            RuntimeError("No public key has been set for verifying block signature")
        if self.level == 1:
            if self.scheme == SupportedSchemes.trust:
                return crypto.verify_l1_block_trust(self.hash, self.encryption, self.pub, cast("l1_block_model.L1BlockModel", block))
            elif self.scheme == SupportedSchemes.work:
                return crypto.verify_l1_block_pow(self.hash, cast("l1_block_model.L1BlockModel", block))
            else:
                raise NotImplementedError(f"Proof scheme {self.scheme} not implemented")
        elif self.level == 2:
            if self.scheme == SupportedSchemes.trust:
                return crypto.verify_l2_block_trust(self.hash, self.encryption, self.pub, cast("l2_block_model.L2BlockModel", block))
            elif self.scheme == SupportedSchemes.work:
                return crypto.verify_l2_block_pow(self.hash, cast("l2_block_model.L2BlockModel", block))
            else:
                raise NotImplementedError(f"Proof scheme {self.scheme} not implemented")
        elif self.level == 3:
            if self.scheme == SupportedSchemes.trust:
                return crypto.verify_l3_block_trust(self.hash, self.encryption, self.pub, cast("l3_block_model.L3BlockModel", block))
            elif self.scheme == SupportedSchemes.work:
                return crypto.verify_l3_block_pow(self.hash, cast("l3_block_model.L3BlockModel", block))
            else:
                raise NotImplementedError(f"Proof scheme {self.scheme} not implemented")
        elif self.level == 4:
            if self.scheme == SupportedSchemes.trust:
                return crypto.verify_l4_block_trust(self.hash, self.encryption, self.pub, cast("l4_block_model.L4BlockModel", block))
            elif self.scheme == SupportedSchemes.work:
                return crypto.verify_l4_block_pow(self.hash, cast("l4_block_model.L4BlockModel", block))
            else:
                raise NotImplementedError(f"Proof scheme {self.scheme} not implemented")
        elif self.level == 5:
            if self.scheme == SupportedSchemes.trust:
                return crypto.verify_l5_block_trust(self.hash, self.encryption, self.pub, cast("l5_block_model.L5BlockModel", block))
            else:
                raise NotImplementedError(f"Proof scheme {self.scheme} not implemented")
        else:
            raise exceptions.InvalidNodeLevel(f"Node level {self.level} nonexistent")

    def sign_transaction(self, signable_tx: "transaction_model.TransactionModel") -> Tuple[str, str]:
        """Sign a transaction
        Args:
            signable_tx: TransactionModel to sign
        Returns:
            Tuple of strings where index 0 is the base64 encoded full hash and index 1 is the base64 encoded stripped signature
        Raises:
            exceptions.InvalidNodeLevel when self is not level 1
            RuntimeError when no private key is set on self
        """
        if self.level != 1:
            raise exceptions.InvalidNodeLevel("Transactions only exist on level 1 nodes")
        if self.priv is None:
            raise RuntimeError("No private key has been set for signing")

        return crypto.sign_transaction(self.hash, self.encryption, self.priv, signable_tx)

    def verify_stripped_transaction(self, stripped_tx: "transaction_model.TransactionModel") -> bool:
        """Verify a stripped signature on a transaction
        Args:
            stripped_tx: TransactionModel to verify (just the stripped signature)
        Returns:
            Boolean if valid signed transaction
        Raises:
            exceptions.InvalidNodeLevel when self is not level 1
            RuntimeError when no public key is set on self
        """
        if self.level != 1:
            raise exceptions.InvalidNodeLevel("Transactions only exist on level 1 nodes")
        if self.pub is None:
            raise RuntimeError("No public key has been set for verifying")

        return crypto.verify_stripped_transaction(self.hash, self.encryption, self.pub, stripped_tx)

    def verify_full_transaction(self, full_tx: "transaction_model.TransactionModel") -> bool:
        """Verify a full transaction
        Args:
            full_tx: TransactionModel to verify
        Returns:
            Boolean if valid signed and hashed transaction
        Raises:
            exceptions.InvalidNodeLevel when self is not level 1
            RuntimeError when no public key is set on self
        """
        if self.level != 1:
            raise exceptions.InvalidNodeLevel("Transactions only exist on level 1 nodes")
        if self.pub is None:
            raise RuntimeError("No public key has been set for verifying")

        return crypto.verify_full_transaction(self.hash, self.encryption, self.pub, full_tx)