noma/lnd.py

Summary

Maintainability
A
3 hrs
Test Coverage
"""
LND related functionality
"""
import pathlib
from subprocess import call, run
from os import path
from json import dumps
import base64
from requests import get, post
import noma.config as cfg


def check_wallet():
    """
    This will either import an existing seed (or our own generated one),
    or use LND to create one.
    It will also create a password either randomly or use an existing password
    provided)

    :return str: Status
    """
    if cfg.LND_PATH.exists():
        if not cfg.WALLET_PATH.exists():
            create_wallet()
        else:
            print("❌ Error: LND not initialized")
            print("Wallet already exists!")
            print("Please backup and move: " + str(cfg.WALLET_PATH))
            print("and then restart lnd")

    else:
        print("❌ Error: lnd directory does not exist!")


def encodemacaroons(macaroonfile=cfg.MACAROON_PATH, tlsfile=cfg.TLS_CERT_PATH):
    """base64url encode macaroon and TLS certificate"""
    if path.exists(str(macaroonfile)) and path.exists(str(tlsfile)):
        with open(path.expanduser(macaroonfile), "rb") as f:
            macaroon_bytes = f.read()
        with open(path.expanduser(tlsfile), "rb") as f:
            tls_bytes = f.read()
        macaroonencoded = base64.urlsafe_b64encode(macaroon_bytes)
        tlsdecoded = tls_bytes.decode("utf-8")
        tlstrim = (
            tlsdecoded.replace("\n", "")
            .replace("-----BEGIN CERTIFICATE-----", "")
            .replace("-----END CERTIFICATE-----", "")
            .replace("+", "-")
            .replace("/", "_")
            .replace("=", "")
        )
        tlsencoded = tlstrim.encode("utf-8")

        return {
            "status": "OK",
            "certificate": tlsencoded,
            "macaroon": macaroonencoded,
        }
    else:
        return {"status": "File Not Found"}


def connectstring(
    hostname=cfg.URL_GRPC,
    macaroonfile=cfg.MACAROON_PATH,
    tlsfile=cfg.TLS_CERT_PATH,
):
    """Show lndconnect string for remote wallets such as Zap"""
    result = encodemacaroons(macaroonfile, tlsfile)
    if result["status"] == "OK":
        macaroon_string = str(result["macaroon"], "utf-8")
        cert_string = str(result["certificate"], "utf-8")
        print(
            "lndconnect://"
            + hostname
            + "?cert="
            + cert_string
            + "&macaroon="
            + macaroon_string
        )
    else:
        print(result["status"])


def autounlock():
    """Auto-unlock lnd using password.txt, tls.cert"""

    password_str = open(str(cfg.PASSWORD_FILE_PATH), "r").read().rstrip()
    password_bytes = str(password_str).encode("utf-8")
    data = {"wallet_password": base64.b64encode(password_bytes).decode()}
    try:
        response = post(
            cfg.URL_UNLOCKWALLET, verify=cfg.TLS_CERT_PATH, data=dumps(data)
        )
    except Exception:
        # Silence connection errors when lnd is not running
        pass
    else:
        try:
            print(response.json())
        except Exception:
            # JSON will fail to decode when unlocked already since response is
            # empty
            pass


def get_kv(key, section="", config_path=""):
    """
    Parse key-value config files and print out values

    :param key: left part of key value pair
    :param config_path: path to config file
    :param section: [section] of the kv pair
    :return: value of key
    """
    from configparser import ConfigParser

    if not config_path:
        config_path = cfg.LND_CONF
    if not section:
        section = "Application Options"

    parser = ConfigParser(strict=False)
    with open(config_path) as lines:
        parser.read_file(lines)
        return parser.get(section, key)


def set_kv(key, value, section="", config_path=""):
    """
    Parse key-value config files and write them out with a key-value change

    Note: comments are lost!

    :param key: left part of key value pair
    :param value: right part of key value pair
    :param section: optional name of section to set in
    :param config_path: path to file

    :return:
    """
    from configparser import ConfigParser

    if not section:
        section = "Application Options"
    if not config_path:
        config_path = cfg.LND_CONF
    parser = ConfigParser(strict=False)
    with open(config_path) as lines:
        parser.read_file(lines)
        parser.set(section, key, value)
        with open(config_path, "w") as file:
            parser.write(file, space_around_delimiters=False)
            file.close()


def setup_tor(version=""):
    """Add tor hidden service to lnd"""
    if not version:
        version = "v3"
    hostname_path = "/var/lib/tor/lnd-{}/hostname".format(version)
    try:
        print("Adding externalip directive to lnd for tor")
        with open(hostname_path, "r") as hostname:
            set_kv("externalip", hostname.read(), "Application Options")
    except Exception as error:
        print(error.__class__.__name__, ":", error)


def set_bitcoind(password, user="", lnd_config=""):
    """Add bitcoind rpc username and password to lnd"""
    if not user:
        user = "lncm"
    if not lnd_config:
        lnd_config = cfg.LND_CONF
    if pathlib.Path(lnd_config).is_file():
        set_kv("bitcoind.rpcuser", user, "Bitcoind", lnd_config)
        set_kv("bitcoind.rpcpass", password, "Bitcoind", lnd_config)


def autoconnect(list_path=""):
    """Auto-connect to a list of nodes in lnd/autoconnect.txt"""
    print("Connecting to:")
    if not list_path:
        list_path = pathlib.Path(cfg.LND_PATH / "autoconnect.txt")

    with open(list_path) as address_list:
        for address in address_list:
            print(address.strip())
            call(
                [
                    "docker",
                    "exec",
                    cfg.LND_MODE + "_lnd_1",
                    "lncli",
                    "connect",
                    address.strip(),
                ]
            )


def check():
    """Check lnd filesystem structure"""
    if cfg.LND_PATH.is_dir():
        print("✅ lnd directory exists")
    else:
        print("❌ lnd directory missing")

    if cfg.LND_CONF.is_file():
        print("✅ lnd.conf exists")
    else:
        print("❌ lnd.conf missing")

    if cfg.LND_PATH.is_dir() and cfg.LND_CONF.is_file():
        return True
    return False


def backup():
    """Export and backup latest channel.db from lnd via ssh"""
    # secure remote backups via scp
    if cfg.CHANNEL_BACKUP.is_file():
        # scp options:
        # -B for non-interactive batch mode
        # -p to preserve modification & access time, modes
        complete = run(["scp",
                        "-B",
                        "-i {}".format(cfg.SSH_IDENTITY),
                        "-p",
                        "-P {}".format(cfg.SSH_PORT),
                        "{}".format(cfg.CHANNEL_BACKUP),
                        "{}".format(cfg.SSH_TARGET)])
        return complete.returncode
    print("Error: channel.backup not found")
    return exit(1)


def savepeers():
    """Save list of peers to file on disk for reconnecting"""
    # TODO: export list of peers to text file on disk
    print("Not implemented yet")


def randompass(string_length=10):
    """Generate random password"""
    from random import choice
    from string import ascii_letters

    letters = ascii_letters
    return "".join(choice(letters) for i in range(string_length))


def _write_password(password_str):
    """Write a generated password to file, either the TEMP_PASSWORD_FILE_PATH
    or the PASSWORD_FILE_PATH depending on whether SAVE_PASSWORD_CONTROL_FILE
    exists."""
    if not path.exists(str(cfg.SAVE_PASSWORD_CONTROL_FILE)):
        # Use temporary file if there is a password control file there
        temp_password_file = open(str(cfg.PASSWORD_FILE_PATH), "w")
        temp_password_file.write(password_str)
        temp_password_file.close()
    else:
        # Use password.txt if password_control_file exists
        password_file = open(str(cfg.PASSWORD_FILE_PATH), "w")
        password_file.write(password_str)
        password_file.close()


def _wallet_password():
    """Either load the wallet password from PASSWORD_FILE_PATH, or generate a new
    password, save it to file, and in either case return the password"""
    # Check if there is an existing file, if not generate a random password
    if not path.exists(str(cfg.PASSWORD_FILE_PATH)):
        # password file doesnt exist
        password_str = randompass(string_length=15)
        _write_password(password_str)
    else:
        # Get password from file if password file already exists
        password_str = open(str(cfg.PASSWORD_FILE_PATH), "r").read().rstrip()
    return password_str


def _generate_and_save_seed():
    """Generate a wallet seed, save it to SEED_FILENAME, and return it"""
    mnemonic = None
    return_data = get(cfg.URL_GENSEED, verify=str(cfg.TLS_CERT_PATH))
    if return_data.status_code == 200:
        json_seed_creation = return_data.json()
        mnemonic = json_seed_creation["cipher_seed_mnemonic"]
        seed_file = open(str(cfg.SEED_FILENAME), "w")
        for word in mnemonic:
            seed_file.write(word + "\n")
        seed_file.close()
    # Data doesnt get set if cant create the seed but that is fine, handle
    # it later
    return mnemonic


def _load_seed():
    """Load the wallet seed from SEED_FILENAME and return it"""
    # Seed exists
    seed_file = open(str(cfg.SEED_FILENAME), "r")
    seed_file_words = seed_file.readlines()
    mnemonic = []
    for importword in seed_file_words:
        mnemonic.append(importword.replace("\n", ""))
    return mnemonic


def _wallet_data(password_str):
    """Build and return the wallet `data` dict with the mnemonic and wallet
    password"""
    # Convert password to byte encoded
    password_bytes = str(password_str).encode("utf-8")
    # Send request to generate seed if seed file doesnt exist
    if not path.exists(str(cfg.SEED_FILENAME)):
        mnemonic = _generate_and_save_seed()
    else:
        mnemonic = _load_seed()
    if mnemonic:
        # Generate init wallet file from what was posted
        return {
            "cipher_seed_mnemonic": mnemonic,
            "wallet_password": base64.b64encode(password_bytes).decode(),
        }
    return {}


def create_wallet():
    """
    1. Check if there's already a wallet. If there is, then exit.
    2. Check for password.txt
    3. If doesn't exist then check for whether we should save the password
    (SAVE_PASSWORD_CONTROL_FILE exists) or not
    4. If password.txt exists import password in.
    5. If password.txt doesn't exist and we don't save the password, create a
    password and save it in temporary path as defined in PASSWORD_FILE_PATH
    6. Now start the wallet creation. Look for a seed defined in SEED_FILENAME,
    if not existing then generate a wallet based on the seed by LND.
    """
    password_str = _wallet_password()

    # Step 1 get seed from web or file
    data = _wallet_data(password_str)

    # Step 2: Create wallet
    if data:
        # Data is defined so proceed
        return_data = post(
            cfg.URL_INITWALLET, verify=str(cfg.TLS_CERT_PATH), data=dumps(data)
        )
        if return_data.status_code == 200:
            print("✅ Create wallet is successful")
        else:
            print("❌ Create wallet is not successful")
    else:
        print("❌ Error: cannot proceed, wallet data is not defined")


if __name__ == "__main__":
    print("This file is not meant to be run directly")