xeroc/python-graphenelib

View on GitHub
graphenebase/ecdsa.py

Summary

Maintainability
D
1 day
Test Coverage
# -*- coding: utf-8 -*-
from __future__ import absolute_import

import time
import ecdsa
import hashlib
import struct
import logging

from binascii import hexlify

from .account import PrivateKey, PublicKey
from .utils import _bytes

log = logging.getLogger(__name__)

SECP256K1_MODULE = None
SECP256K1_AVAILABLE = False
CRYPTOGRAPHY_AVAILABLE = False
GMPY2_MODULE = False
if not SECP256K1_MODULE:  # pragma: no branch
    try:
        import secp256k1

        SECP256K1_MODULE = "secp256k1"
        SECP256K1_AVAILABLE = True
    except ImportError:
        try:
            import cryptography

            SECP256K1_MODULE = "cryptography"
            CRYPTOGRAPHY_AVAILABLE = True
        except ImportError:
            SECP256K1_MODULE = "ecdsa"

    try:  # pragma: no branch
        from cryptography.hazmat.backends import default_backend
        from cryptography.hazmat.primitives import hashes
        from cryptography.hazmat.primitives.asymmetric import ec
        from cryptography.hazmat.primitives.asymmetric.utils import (
            decode_dss_signature,
            encode_dss_signature,
        )
        from cryptography.exceptions import InvalidSignature

        CRYPTOGRAPHY_AVAILABLE = True
    except ImportError:
        CRYPTOGRAPHY_AVAILABLE = False
        log.debug("Cryptography not available")

log.debug("Using SECP256K1 module: %s" % SECP256K1_MODULE)


def _is_canonical(sig):
    sig = bytearray(sig)
    return (
        not (int(sig[0]) & 0x80)
        and not (sig[0] == 0 and not (int(sig[1]) & 0x80))
        and not (int(sig[32]) & 0x80)
        and not (sig[32] == 0 and not (int(sig[33]) & 0x80))
    )


def compressedPubkey(pk):
    if SECP256K1_MODULE == "cryptography" and not isinstance(
        pk, ecdsa.keys.VerifyingKey
    ):
        order = ecdsa.SECP256k1.order
        x = pk.public_numbers().x
        y = pk.public_numbers().y
    else:  # pragma: no cover
        order = pk.curve.generator.order()
        p = pk.pubkey.point
        x = p.x()
        y = p.y()
    x_str = ecdsa.util.number_to_string(x, order)
    return _bytes(chr(2 + (y & 1))) + x_str


def recover_public_key(digest, signature, i, message=None):
    """Recover the public key from the the signature"""

    # See http: //www.secg.org/download/aid-780/sec1-v2.pdf section 4.1.6 primarily
    curve = ecdsa.SECP256k1.curve
    G = ecdsa.SECP256k1.generator
    order = ecdsa.SECP256k1.order
    yp = i % 2
    r, s = ecdsa.util.sigdecode_string(signature, order)
    # 1.1
    x = r + (i // 2) * order
    # 1.3. This actually calculates for either effectively 02||X or 03||X depending on 'k' instead of always for 02||X as specified.
    # This substitutes for the lack of reversing R later on. -R actually is defined to be just flipping the y-coordinate in the elliptic curve.
    alpha = ((x * x * x) + (curve.a() * x) + curve.b()) % curve.p()
    beta = ecdsa.numbertheory.square_root_mod_prime(alpha, curve.p())
    y = beta if (beta - yp) % 2 == 0 else curve.p() - beta
    # 1.4 Constructor of Point is supposed to check if nR is at infinity.
    R = ecdsa.ellipticcurve.Point(curve, x, y, order)
    # 1.5 Compute e
    e = ecdsa.util.string_to_number(digest)
    # 1.6 Compute Q = r^-1(sR - eG)
    Q = ecdsa.numbertheory.inverse_mod(r, order) * (s * R + (-e % order) * G)

    if SECP256K1_MODULE == "cryptography" and message is not None:
        if not isinstance(message, bytes):
            message = bytes(message, "utf-8")  # pragma: no cover
        sigder = encode_dss_signature(r, s)
        public_key = ec.EllipticCurvePublicNumbers(
            Q._Point__x, Q._Point__y, ec.SECP256K1()
        ).public_key(default_backend())
        public_key.verify(sigder, message, ec.ECDSA(hashes.SHA256()))
        return public_key
    else:
        # Not strictly necessary, but let's verify the message for paranoia's sake.
        if not ecdsa.VerifyingKey.from_public_point(
            Q, curve=ecdsa.SECP256k1
        ).verify_digest(
            signature, digest, sigdecode=ecdsa.util.sigdecode_string
        ):  # pragma: no cover
            return None  # pragma: no cover
        return ecdsa.VerifyingKey.from_public_point(
            Q, curve=ecdsa.SECP256k1
        )  # pragma: no cover


def recoverPubkeyParameter(message, digest, signature, pubkey):
    """Use to derive a number that allows to easily recover the
    public key from the signature
    """
    if not isinstance(message, bytes):
        message = bytes(message, "utf-8")  # pragma: no cover
    for i in range(0, 4):
        if SECP256K1_MODULE == "secp256k1":  # pragma: no cover
            sig = pubkey.ecdsa_recoverable_deserialize(signature, i)
            p = secp256k1.PublicKey(pubkey.ecdsa_recover(message, sig))
            if p.serialize() == pubkey.serialize():
                return i
        elif SECP256K1_MODULE == "cryptography" and not isinstance(pubkey, PublicKey):
            p = recover_public_key(digest, signature, i, message)
            p_comp = hexlify(compressedPubkey(p))
            pubkey_comp = hexlify(compressedPubkey(pubkey))
            if p_comp == pubkey_comp:
                return i
        else:  # pragma: no cover
            p = recover_public_key(digest, signature, i)
            p_comp = hexlify(compressedPubkey(p))
            p_string = hexlify(p.to_string())
            if isinstance(pubkey, PublicKey):  # pragma: no cover
                pubkey_string = bytes(repr(pubkey), "ascii")
            else:  # pragma: no cover
                pubkey_string = hexlify(pubkey.to_string())
            if p_string == pubkey_string or p_comp == pubkey_string:  # pragma: no cover
                return i


def sign_message(message, wif, hashfn=hashlib.sha256):
    """Sign a digest with a wif key

    :param str wif: Private key in
    """

    if not isinstance(message, bytes):
        message = bytes(message, "utf-8")

    digest = hashfn(message).digest()
    priv_key = PrivateKey(wif)
    p = bytes(priv_key)

    if SECP256K1_MODULE == "secp256k1":
        ndata = secp256k1.ffi.new("const int *ndata")
        ndata[0] = 0
        while True:
            ndata[0] += 1
            privkey = secp256k1.PrivateKey(p, raw=True)
            sig = secp256k1.ffi.new("secp256k1_ecdsa_recoverable_signature *")
            signed = secp256k1.lib.secp256k1_ecdsa_sign_recoverable(
                secp256k1.secp256k1_ctx,
                sig,
                digest,
                privkey.private_key,
                secp256k1.ffi.NULL,
                ndata,
            )
            if not signed == 1:  # pragma: no cover
                raise AssertionError()
            signature, i = privkey.ecdsa_recoverable_serialize(sig)
            if _is_canonical(signature):
                i += 4  # compressed
                i += 27  # compact
                break
    elif SECP256K1_MODULE == "cryptography":
        cnt = 0
        private_key = ec.derive_private_key(
            int(repr(priv_key), 16), ec.SECP256K1(), default_backend()
        )
        public_key = private_key.public_key()
        while True:
            cnt += 1
            if not cnt % 20:  # pragma: no cover
                log.info(
                    "Still searching for a canonical signature. Tried %d times already!"
                    % cnt
                )
            order = ecdsa.SECP256k1.order
            # signer = private_key.signer(ec.ECDSA(hashes.SHA256()))
            # signer.update(message)
            # sigder = signer.finalize()
            sigder = private_key.sign(message, ec.ECDSA(hashes.SHA256()))
            r, s = decode_dss_signature(sigder)
            signature = ecdsa.util.sigencode_string(r, s, order)
            # Make sure signature is canonical!
            #
            sigder = bytearray(sigder)
            lenR = sigder[3]
            lenS = sigder[5 + lenR]
            if lenR == 32 and lenS == 32:
                # Derive the recovery parameter
                #
                i = recoverPubkeyParameter(message, digest, signature, public_key)
                i += 4  # compressed
                i += 27  # compact
                break
    else:  # pragma: no branch # pragma: no cover
        cnt = 0
        sk = ecdsa.SigningKey.from_string(p, curve=ecdsa.SECP256k1)
        while 1:
            cnt += 1
            if not cnt % 20:  # pragma: no branch
                log.info(
                    "Still searching for a canonical signature. Tried %d times already!"
                    % cnt
                )

            # Deterministic k
            #
            k = ecdsa.rfc6979.generate_k(
                sk.curve.generator.order(),
                sk.privkey.secret_multiplier,
                hashlib.sha256,
                hashlib.sha256(
                    digest
                    + struct.pack(
                        "d", time.time()
                    )  # use the local time to randomize the signature
                ).digest(),
            )

            # Sign message
            #
            sigder = sk.sign_digest(digest, sigencode=ecdsa.util.sigencode_der, k=k)

            # Reformating of signature
            #
            r, s = ecdsa.util.sigdecode_der(sigder, sk.curve.generator.order())
            signature = ecdsa.util.sigencode_string(r, s, sk.curve.generator.order())

            # Make sure signature is canonical!
            #
            sigder = bytearray(sigder)
            lenR = sigder[3]
            lenS = sigder[5 + lenR]
            if lenR == 32 and lenS == 32:
                # Derive the recovery parameter
                #
                i = recoverPubkeyParameter(
                    message, digest, signature, sk.get_verifying_key()
                )
                i += 4  # compressed
                i += 27  # compact
                break

    # pack signature
    #
    sigstr = struct.pack("<B", i)
    sigstr += signature

    return sigstr


def verify_message(message, signature, hashfn=hashlib.sha256):
    if not isinstance(message, bytes):
        message = bytes(message, "utf-8")
    if not isinstance(signature, bytes):  # pragma: no cover
        signature = bytes(signature, "utf-8")
    if not isinstance(message, bytes):
        raise AssertionError()
    if not isinstance(signature, bytes):
        raise AssertionError()
    digest = hashfn(message).digest()
    sig = signature[1:]
    # TODO: 4 means we use compressed keys.
    # Grapehen uses compressed keys by default even though it would still allow
    # uncompressed keys to be used. This library so far expects compressed keys
    # due to this line:
    recoverParameter = bytearray(signature)[0] - 4 - 27  # recover parameter only

    if SECP256K1_MODULE == "secp256k1":
        # Placeholder
        pub = secp256k1.PublicKey()
        # Recover raw signature
        sig = pub.ecdsa_recoverable_deserialize(sig, recoverParameter)
        # Recover PublicKey
        verifyPub = secp256k1.PublicKey(pub.ecdsa_recover(message, sig))
        # Convert recoverable sig to normal sig
        normalSig = verifyPub.ecdsa_recoverable_convert(sig)
        # Verify
        verifyPub.ecdsa_verify(message, normalSig)
        phex = verifyPub.serialize(compressed=True)
    elif SECP256K1_MODULE == "cryptography":
        p = recover_public_key(digest, sig, recoverParameter, message)
        order = ecdsa.SECP256k1.order
        r, s = ecdsa.util.sigdecode_string(sig, order)
        sigder = encode_dss_signature(r, s)
        p.verify(sigder, message, ec.ECDSA(hashes.SHA256()))
        phex = compressedPubkey(p)
    else:  # pragma: no branch  # pragma: no cover
        p = recover_public_key(digest, sig, recoverParameter)
        # Will throw an exception of not valid
        p.verify_digest(sig, digest, sigdecode=ecdsa.util.sigdecode_string)
        phex = compressedPubkey(p)

    return phex


# def pointToPubkey(x, y, order=None):  # pragma: no cover
#     """ This code is untested und thus not commented in. Waiting for unit tests of
#         the original author.
#     """
#     order = order or ecdsa.SECP256k1.order
#     x_str = ecdsa.util.number_to_string(x, order)
#     return _bytes(chr(2 + (y & 1))) + x_str   # pragma: no cover
#


def tweakaddPubkey(pk, digest256, SECP256K1_MODULE=SECP256K1_MODULE):
    if SECP256K1_MODULE == "secp256k1":
        tmp_key = secp256k1.PublicKey(pubkey=bytes(pk), raw=True)
        new_key = tmp_key.tweak_add(digest256)  # <-- add
        raw_key = hexlify(new_key.serialize()).decode("ascii")
    else:
        raise Exception("Must have secp256k1 for `tweak_add`")
        # raw_key = ecmult(pk, 1, digest256, SECP256K1_MODULE)
    return PublicKey(raw_key, prefix=pk.prefix)


#
# def tweakmulPubkey(pk, digest256, SECP256K1_MODULE=SECP256K1_MODULE):
#     if SECP256K1_MODULE == "secp256k1":
#         tmp_key = secp256k1.PublicKey(pubkey=bytes(pk), raw=True)
#         new_key = tmp_key.tweak_mul(digest256)  # <-- mul
#         raw_key = hexlify(new_key.serialize()).decode('ascii')
#     else:
#         raw_key = ecmult(pk, digest256, 0, SECP256K1_MODULE)
#
#     return PublicKey(raw_key, prefix=pk.prefix)
#
#
# def ecmult(pk, scalarA, scalarB, SECP256K1_MODULE=SECP256K1_MODULE):
#     if SECP256K1_MODULE == "cryptography" and not isinstance(pk, ecdsa.keys.VerifyingKey):
#         order = ecdsa.SECP256k1.order
#         x = pk.public_numbers().x
#         y = pk.public_numbers().y
#     else:
#         p = pk.point()
#         order = ecdsa.SECP256k1.order
#         x = p.x()
#         y = p.y()
#
#     curve = ecdsa.SECP256k1.curve
#     if isinstance(scalarA, int):
#         scalar1 = scalarA
#     else:
#         scalar1 = ecdsa.ecdsa.string_to_int(scalarA)
#
#     if isinstance(scalarB, int):
#         scalar2 = scalarB
#     else:
#         scalar2 = ecdsa.ecdsa.string_to_int(scalarB)
#
#     acc = ecdsa.ellipticcurve.INFINITY
#     r = ecdsa.ellipticcurve.Point(curve, x, y, order)
#
#     i = 256
#     mask = 1
#     while i > 0:
#         if scalar1 & mask:
#             acc = acc + r
#
#         if scalar2 & mask:
#             acc = acc + r  # NO! Something else must happen!
#
#         r = r * 2
#         i -= 1
#         mask <<= 1
#
#     return hexlify(pointToPubkey(acc.x(), acc.y(), order)).decode('ascii')