Defelo/PyCrypCli

View on GitHub
PyCrypCli/commands/service.py

Summary

Maintainability
D
1 day
Test Coverage
import os
import time
from typing import Any, cast

from .command import command, CommandError
from .help import print_help
from .morphcoin import get_wallet_from_file
from ..context import DeviceContext, MainContext
from ..exceptions import (
    ServiceNotFoundError,
    AttackNotRunningError,
    AlreadyOwnThisServiceError,
    WalletNotFoundError,
    CannotDeleteEnforcedServiceError,
    CannotToggleDirectlyError,
    CouldNotStartServiceError,
    ServiceNotRunningError,
)
from ..models import Device, Service, PortscanService, BruteforceService, PublicService
from ..util import is_uuid


def get_service(context: DeviceContext, name: str) -> Service:
    try:
        return context.host.get_service_by_name(name)
    except ServiceNotFoundError:
        raise CommandError(f"The service '{name}' could not be found on this device")


def stop_bruteforce(context: DeviceContext, service: BruteforceService) -> None:
    try:
        access, _, target_device = service.stop()
    except AttackNotRunningError:
        raise CommandError("Bruteforce attack is not running.")
    if access:
        if context.ask("Access granted. Do you want to connect to the device? [yes|no] ", ["yes", "no"]) == "yes":
            handle_remote_connect(context, [target_device])
        else:
            print(f"To connect to the device type `remote connect {target_device}`")
    else:
        print("Access denied. The bruteforce attack was not successful")


@command("service", [DeviceContext])
def handle_service(context: DeviceContext, args: list[str]) -> None:
    """
    Create or use a service
    """

    if args:
        raise CommandError("Unknown subcommand.")
    print_help(context, handle_service)


@handle_service.subcommand("create")
def handle_service_create(context: DeviceContext, args: list[str]) -> None:
    """
    Create a new service
    """

    if len(args) not in (1, 2) or args[0] not in ("bruteforce", "portscan", "telnet", "ssh", "miner"):
        raise CommandError("usage: service create bruteforce|portscan|telnet|ssh|miner")

    extra: dict[str, Any] = {}
    if args[0] == "miner":
        if len(args) != 2:
            raise CommandError("usage: service create miner <wallet>")

        try:
            wallet_uuid: str = get_wallet_from_file(context, args[1]).uuid
        except CommandError:
            if is_uuid(args[1]):
                wallet_uuid = args[1]
            else:
                raise CommandError("Invalid wallet uuid")

        extra["wallet_uuid"] = wallet_uuid

    try:
        context.host.create_service(args[0], **extra)
        print("Service has been created")
    except AlreadyOwnThisServiceError:
        raise CommandError("You already created this service")
    except WalletNotFoundError:
        raise CommandError("Wallet does not exist.")


@handle_service.subcommand("list")
def handle_service_list(context: DeviceContext, _: Any) -> None:
    """
    List all services installed on this device
    """

    services: list[Service] = context.host.get_services()
    if not services:
        print("There are no services on this device.")
    else:
        print("Services:")
    for service in services:
        line: str = f" - [{['stopped', 'running'][service.running]}] {service.name}"
        if service.running_port is not None:
            line += f" on port {service.running_port}"
        print(line)


@handle_service.subcommand("delete")
def handle_service_delete(context: DeviceContext, args: list[str]) -> None:
    """
    Delete a service
    """

    if len(args) != 1 or args[0] not in ("bruteforce", "portscan", "telnet", "ssh", "miner"):
        raise CommandError("usage: service delete bruteforce|portscan|telnet|ssh|miner")

    service: Service = get_service(context, args[0])

    try:
        service.delete()
    except CannotDeleteEnforcedServiceError:
        raise CommandError("The service could not be deleted.")


@handle_service.subcommand("start")
def handle_service_start(context: DeviceContext, args: list[str]) -> None:
    """
    Start a service
    """

    if len(args) != 1 or args[0] not in ("telnet", "ssh"):
        raise CommandError("usage: service start telnet|ssh")

    service: Service = get_service(context, args[0])
    if service.running:
        raise CommandError("This service is already running.")

    try:
        service.toggle()
    except (CannotToggleDirectlyError, CouldNotStartServiceError):
        raise CommandError("The service could not be started.")


@handle_service.subcommand("stop")
def handle_service_stop(context: DeviceContext, args: list[str]) -> None:
    """
    Stop a service
    """

    if len(args) != 1 or args[0] not in ("telnet", "ssh"):
        raise CommandError("usage: service stop telnet|ssh")

    service: Service = get_service(context, args[0])
    if not service.running:
        raise CommandError("This service is not running.")

    try:
        service.toggle()
    except CannotToggleDirectlyError:
        raise CommandError("The service could not be stopped.")


@handle_service.subcommand("portscan")
def handle_portscan(context: DeviceContext, args: list[str]) -> None:
    """
    Perform a portscan
    """

    if len(args) != 1:
        raise CommandError("usage: service portscan <device>")

    target: str = args[0]
    if not is_uuid(target):
        raise CommandError("Invalid target")

    try:
        service: PortscanService = PortscanService.get_portscan_service(context.client, context.host.uuid)
    except ServiceNotFoundError:
        raise CommandError("You have to create a portscan service before you can use it.")

    services: list[PublicService] = service.scan(target)
    context.last_portscan = target, services
    if not services:
        print("That device doesn't have any running services")
    for s in services:
        print(f" - {s.name} on port {s.running_port} (UUID: {s.uuid})")


@handle_service.subcommand("bruteforce")
def handle_bruteforce(context: DeviceContext, args: list[str]) -> None:
    """
    Start a bruteforce attack
    """

    duration_arg: str = "100%"
    duration: int = 0
    chance: float | None = None
    if len(args) in (1, 2) and args[0] in ("ssh", "telnet"):
        if context.last_portscan is None:
            raise CommandError("You have to portscan your target first to find open ports.")

        target_device, services = context.last_portscan
        for service in services:
            if service.name == args[0]:
                target_service: str = service.uuid
                break
        else:
            raise CommandError(f"Service '{args[0]}' is not running on target device.")
        if len(args) == 2:
            duration_arg = args[1]
    elif len(args) in (2, 3):
        target_device = args[0]
        target_service = args[1]
        if not is_uuid(target_device):
            raise CommandError("Invalid target device")
        if not is_uuid(target_service):
            raise CommandError("Invalid target service")

        if len(args) == 3:
            duration_arg = args[2]
    else:
        raise CommandError(
            "usage: service bruteforce <target-device> <target-service> [duration|success_chance]\n"
            "       service bruteforce ssh|telnet [duration|success_chance]"
        )

    if duration_arg.endswith("%"):
        error = "Success chance has to be a positive number between 0 and 100"
        try:
            chance = float(duration_arg[:-1]) / 100
        except ValueError:
            raise CommandError(error)
        if chance < 0 or chance > 1:
            raise CommandError(error)
    else:
        if not duration_arg.isnumeric():
            raise CommandError("Duration has to be a positive integer")
        duration = int(duration_arg)

    try:
        bruteforce_service: BruteforceService = BruteforceService.get_bruteforce_service(
            context.client, context.host.uuid
        )
    except ServiceNotFoundError:
        raise CommandError("You have to create a bruteforce service before you can use it.")

    if bruteforce_service.running:
        print("You are already attacking a device.")
        print(f"Target device: {bruteforce_service.target_device_uuid}")
        if context.ask("Do you want to stop this attack? [yes|no] ", ["yes", "no"]) == "yes":
            stop_bruteforce(context, bruteforce_service)
        return

    try:
        bruteforce_service.attack(target_device, target_service)
        if chance is not None:
            bruteforce_service.update()
            duration = round((chance + 0.1) * 20 / bruteforce_service.speed)
    except ServiceNotFoundError:
        raise CommandError("The target service does not exist.")
    except ServiceNotRunningError:
        raise CommandError("The target service is not running and cannot be exploited.")

    print("You started a bruteforce attack")
    width = os.get_terminal_size().columns - 31
    steps = 17
    d = duration * steps
    i = 0
    last_check: float = 0
    try:
        context.update_presence(
            state=f"Logged in: {context.username}@{context.root_context.host}",
            details="Hacking Remote Device",
            end=int(time.time()) + duration,
            large_image="cryptic",
            large_text="Cryptic",
        )
        for i in range(d):
            if time.time() - last_check > 1:
                last_check = time.time()
                bruteforce_service.update()
                if not bruteforce_service.running:
                    print("\rBruteforce attack has been aborted.")
                    return

            progress: int = int(i / d * width)
            j = i // steps
            progress_bar = "[" + "=" * progress + ">" + " " * (width - progress) + "]"
            text = f"\rBruteforcing {j // 60:02d}:{j % 60:02d} {progress_bar} ({i / d * 100:.1f}%) "
            print(end=text, flush=True)
            time.sleep(1 / steps)
        i = (i + 1) // steps
        print(f"\rBruteforcing {i // 60:02d}:{i % 60:02d} [" + "=" * width + ">] (100%) ")
    except KeyboardInterrupt:
        print()
    context.main_loop_presence()
    stop_bruteforce(context, bruteforce_service)


@handle_service_create.completer()
def service_create_completer(context: DeviceContext, args: list[str]) -> list[str]:
    if len(args) == 1:
        return ["bruteforce", "portscan", "ssh", "telnet", "miner"]
    if len(args) == 2 and args[0] == "miner":
        return context.file_path_completer(args[1])
    return []


@handle_service_delete.completer()
def service_delete_completer(_: Any, args: list[str]) -> list[str]:
    if len(args) == 1:
        return ["bruteforce", "portscan", "telnet", "miner"]
    return []


@handle_service_start.completer()
@handle_service_stop.completer()
@handle_bruteforce.completer()
def service_completer(_: Any, args: list[str]) -> list[str]:
    if len(args) == 1:
        return ["ssh", "telnet"]
    return []


@command("spot", [DeviceContext])
def handle_spot(context: DeviceContext, _: Any) -> None:
    """
    Find a random device in the network
    """

    device: Device = Device.spot(context.client)
    print(f"Name: '{device.name}'" + " [hacked]" * device.part_owner())
    print(f"UUID: {device.uuid}")
    handle_portscan(context, [device.uuid])


@command("remote", [MainContext, DeviceContext])
def handle_remote(context: MainContext, args: list[str]) -> None:
    """
    Manage and connect to the devices you hacked before
    """

    if args:
        raise CommandError("Unknown subcommand.")
    print_help(context, handle_remote)


@handle_remote.subcommand("list")
def handle_remote_list(context: MainContext, _: Any) -> None:
    """
    List remote devices
    """

    devices: list[Device] = context.get_hacked_devices()

    if not devices:
        print("You don't have access to any remote device.")
    else:
        print("Remote devices:")
    for device in devices:
        print(f" - [{['off', 'on'][device.powered_on]}] {device.name} (UUID: {device.uuid})")


@handle_remote.subcommand("connect")
def handle_remote_connect(context: MainContext, args: list[str]) -> None:
    """
    Connect to a remote device
    """

    if len(args) != 1:
        print("usage: remote connect <name|uuid>")
        return

    name: str = args[0]
    if is_uuid(name):
        device: Device = Device.get_device(context.client, name)
        if device is None:
            raise CommandError("This device does not exist or you have no permission to access it.")
    else:
        found_devices: list[Device] = []
        for device in context.get_hacked_devices():
            if device.name == name:
                found_devices.append(device)

        if not found_devices:
            raise CommandError(f"There is no device with the name '{name}'.")
        if len(found_devices) > 1:
            raise CommandError(f"There is more than one device with the name '{name}'. You need to specify its UUID.")

        device = found_devices[0]

    print(f"Connecting to {device.name} (UUID: {device.uuid})")
    if device.part_owner():
        context.open(DeviceContext(context.root_context, cast(str, context.session_token), device))
    else:
        raise CommandError("This device does not exist or you have no permission to access it.")


@handle_remote_connect.completer()
def remote_completer(context: MainContext, args: list[str]) -> list[str]:
    if len(args) == 1:
        device_names: list[str] = [device.name for device in context.get_hacked_devices()]
        return [name for name in device_names if device_names.count(name) == 1]
    return []