localstack/localstack

View on GitHub
localstack/utils/container_utils/container_client.py

Summary

Maintainability
F
1 wk
Test Coverage
import dataclasses
import io
import ipaddress
import logging
import os
import re
import shlex
import sys
import tarfile
import tempfile
from abc import ABCMeta, abstractmethod
from enum import Enum, unique
from pathlib import Path
from typing import Dict, List, Literal, NamedTuple, Optional, Protocol, Tuple, Union, get_args

from localstack import config
from localstack.utils.collections import HashableList, ensure_list
from localstack.utils.files import TMP_FILES, rm_rf, save_file
from localstack.utils.no_exit_argument_parser import NoExitArgumentParser
from localstack.utils.strings import short_uid

LOG = logging.getLogger(__name__)

# list of well-known image repo prefixes that should be stripped off to canonicalize image names
WELL_KNOWN_IMAGE_REPO_PREFIXES = ("localhost/", "docker.io/library/")


@unique
class DockerContainerStatus(Enum):
    DOWN = -1
    NON_EXISTENT = 0
    UP = 1
    PAUSED = 2


class ContainerException(Exception):
    def __init__(self, message=None, stdout=None, stderr=None) -> None:
        self.message = message or "Error during the communication with the docker daemon"
        self.stdout = stdout
        self.stderr = stderr


class NoSuchObject(ContainerException):
    def __init__(self, object_id: str, message=None, stdout=None, stderr=None) -> None:
        message = message or f"Docker object {object_id} not found"
        super().__init__(message, stdout, stderr)
        self.object_id = object_id


class NoSuchContainer(ContainerException):
    def __init__(self, container_name_or_id: str, message=None, stdout=None, stderr=None) -> None:
        message = message or f"Docker container {container_name_or_id} not found"
        super().__init__(message, stdout, stderr)
        self.container_name_or_id = container_name_or_id


class NoSuchImage(ContainerException):
    def __init__(self, image_name: str, message=None, stdout=None, stderr=None) -> None:
        message = message or f"Docker image {image_name} not found"
        super().__init__(message, stdout, stderr)
        self.image_name = image_name


class NoSuchNetwork(ContainerException):
    def __init__(self, network_name: str, message=None, stdout=None, stderr=None) -> None:
        message = message or f"Docker network {network_name} not found"
        super().__init__(message, stdout, stderr)
        self.network_name = network_name


class RegistryConnectionError(ContainerException):
    def __init__(self, details: str, message=None, stdout=None, stderr=None) -> None:
        message = message or f"Connection error: {details}"
        super().__init__(message, stdout, stderr)
        self.details = details


class DockerNotAvailable(ContainerException):
    def __init__(self, message=None, stdout=None, stderr=None) -> None:
        message = message or "Docker not available"
        super().__init__(message, stdout, stderr)


class AccessDenied(ContainerException):
    def __init__(self, object_name: str, message=None, stdout=None, stderr=None) -> None:
        message = message or f"Access denied to {object_name}"
        super().__init__(message, stdout, stderr)
        self.object_name = object_name


class CancellableStream(Protocol):
    """Describes a generator that can be closed. Borrowed from ``docker.types.daemon``."""

    def __iter__(self):
        raise NotImplementedError

    def __next__(self):
        raise NotImplementedError

    def close(self):
        raise NotImplementedError


class DockerPlatform(str):
    """Platform in the format ``os[/arch[/variant]]``"""

    linux_amd64 = "linux/amd64"
    linux_arm64 = "linux/arm64"


@dataclasses.dataclass
class Ulimit:
    """The ``ulimit`` settings for the container.
    See https://www.tutorialspoint.com/setting-ulimit-values-on-docker-containers
    """

    name: str
    soft_limit: int
    hard_limit: Optional[int] = None

    def __repr__(self):
        """Format: <type>=<soft limit>[:<hard limit>]"""
        ulimit_string = f"{self.name}={self.soft_limit}"
        if self.hard_limit:
            ulimit_string += f":{self.hard_limit}"
        return ulimit_string


# defines the type for port mappings (source->target port range)
PortRange = Union[List, HashableList]
# defines the protocol for a port range ("tcp" or "udp")
PortProtocol = str


def isinstance_union(obj, class_or_tuple):
    # that's some dirty hack
    if sys.version_info < (3, 10):
        return isinstance(obj, get_args(PortRange))
    else:
        return isinstance(obj, class_or_tuple)


class PortMappings:
    """Maps source to target port ranges for Docker port mappings."""

    # bind host to be used for defining port mappings
    bind_host: str
    # maps `from` port range to `to` port range for port mappings
    mappings: Dict[Tuple[PortRange, PortProtocol], List]

    def __init__(self, bind_host: str = None):
        self.bind_host = bind_host if bind_host else ""
        self.mappings = {}

    def add(
        self,
        port: Union[int, PortRange],
        mapped: Union[int, PortRange] = None,
        protocol: PortProtocol = "tcp",
    ):
        mapped = mapped or port
        if isinstance_union(port, PortRange):
            for i in range(port[1] - port[0] + 1):
                if isinstance_union(mapped, PortRange):
                    self.add(port[0] + i, mapped[0] + i, protocol)
                else:
                    self.add(port[0] + i, mapped, protocol)
            return
        if port is None or int(port) < 0:
            raise Exception(f"Unable to add mapping for invalid port: {port}")
        if self.contains(port, protocol):
            return
        bisected_host_port = None
        for (from_range, from_protocol), to_range in self.mappings.items():
            if not from_protocol == protocol:
                continue
            if not self.in_expanded_range(port, from_range):
                continue
            if not self.in_expanded_range(mapped, to_range):
                continue
            from_range_len = from_range[1] - from_range[0]
            to_range_len = to_range[1] - to_range[0]
            is_uniform = from_range_len == to_range_len
            if is_uniform:
                self.expand_range(port, from_range, protocol=protocol, remap=True)
                self.expand_range(mapped, to_range, protocol=protocol)
            else:
                if not self.in_range(mapped, to_range):
                    continue
                # extending a 1 to 1 mapping to be many to 1
                elif from_range_len == 1:
                    self.expand_range(port, from_range, protocol=protocol, remap=True)
                # splitting a uniform mapping
                else:
                    bisected_port_index = mapped - to_range[0]
                    bisected_host_port = from_range[0] + bisected_port_index
                    self.bisect_range(mapped, to_range, protocol=protocol)
                    self.bisect_range(bisected_host_port, from_range, protocol=protocol, remap=True)
                    break
            return
        if bisected_host_port is None:
            port_range = [port, port]
        elif bisected_host_port < port:
            port_range = [bisected_host_port, port]
        else:
            port_range = [port, bisected_host_port]
        protocol = str(protocol or "tcp").lower()
        self.mappings[(HashableList(port_range), protocol)] = [mapped, mapped]

    def to_str(self) -> str:
        bind_address = f"{self.bind_host}:" if self.bind_host else ""

        def entry(k, v):
            from_range, protocol = k
            to_range = v
            # use /<protocol> suffix if the protocol is not"tcp"
            protocol_suffix = f"/{protocol}" if protocol != "tcp" else ""
            if from_range[0] == from_range[1] and to_range[0] == to_range[1]:
                return f"-p {bind_address}{from_range[0]}:{to_range[0]}{protocol_suffix}"
            if from_range[0] != from_range[1] and to_range[0] == to_range[1]:
                return f"-p {bind_address}{from_range[0]}-{from_range[1]}:{to_range[0]}{protocol_suffix}"
            return f"-p {bind_address}{from_range[0]}-{from_range[1]}:{to_range[0]}-{to_range[1]}{protocol_suffix}"

        return " ".join([entry(k, v) for k, v in self.mappings.items()])

    def to_list(self) -> List[str]:  # TODO test
        bind_address = f"{self.bind_host}:" if self.bind_host else ""

        def entry(k, v):
            from_range, protocol = k
            to_range = v
            protocol_suffix = f"/{protocol}" if protocol != "tcp" else ""
            if from_range[0] == from_range[1] and to_range[0] == to_range[1]:
                return ["-p", f"{bind_address}{from_range[0]}:{to_range[0]}{protocol_suffix}"]
            return [
                "-p",
                f"{bind_address}{from_range[0]}-{from_range[1]}:{to_range[0]}-{to_range[1]}{protocol_suffix}",
            ]

        return [item for k, v in self.mappings.items() for item in entry(k, v)]

    def to_dict(self) -> Dict[str, Union[Tuple[str, Union[int, List[int]]], int]]:
        bind_address = self.bind_host or ""

        def bind_port(bind_address, host_port):
            if host_port == 0:
                return None
            elif bind_address:
                return (bind_address, host_port)
            else:
                return host_port

        def entry(k, v):
            from_range, protocol = k
            to_range = v
            protocol_suffix = f"/{protocol}"
            if from_range[0] != from_range[1] and to_range[0] == to_range[1]:
                container_port = to_range[0]
                host_ports = list(range(from_range[0], from_range[1] + 1))
                return [
                    (
                        f"{container_port}{protocol_suffix}",
                        (bind_address, host_ports) if bind_address else host_ports,
                    )
                ]
            return [
                (
                    f"{container_port}{protocol_suffix}",
                    bind_port(bind_address, host_port),
                )
                for container_port, host_port in zip(
                    range(to_range[0], to_range[1] + 1), range(from_range[0], from_range[1] + 1)
                )
            ]

        items = [item for k, v in self.mappings.items() for item in entry(k, v)]
        return dict(items)

    def contains(self, port: int, protocol: PortProtocol = "tcp") -> bool:
        for from_range_w_protocol, to_range in self.mappings.items():
            from_protocol = from_range_w_protocol[1]
            if from_protocol == protocol:
                from_range = from_range_w_protocol[0]
                if self.in_range(port, from_range):
                    return True

    def in_range(self, port: int, range: PortRange) -> bool:
        return port >= range[0] and port <= range[1]

    def in_expanded_range(self, port: int, range: PortRange):
        return port >= range[0] - 1 and port <= range[1] + 1

    def expand_range(
        self, port: int, range: PortRange, protocol: PortProtocol = "tcp", remap: bool = False
    ):
        """
        Expand the given port range by the given port. If remap==True, put the updated range into self.mappings
        """
        if self.in_range(port, range):
            return
        new_range = list(range) if remap else range
        if port == range[0] - 1:
            new_range[0] = port
        elif port == range[1] + 1:
            new_range[1] = port
        else:
            raise Exception(f"Unable to add port {port} to existing range {range}")
        if remap:
            self._remap_range(range, new_range, protocol=protocol)

    def bisect_range(
        self, port: int, range: PortRange, protocol: PortProtocol = "tcp", remap: bool = False
    ):
        """
        Bisect a port range, at the provided port. This is needed in some cases when adding a
        non-uniform host to port mapping adjacent to an existing port range.
        If remap==True, put the updated range into self.mappings
        """
        if not self.in_range(port, range):
            return
        new_range = list(range) if remap else range
        if port == range[0]:
            new_range[0] = port + 1
        else:
            new_range[1] = port - 1
        if remap:
            self._remap_range(range, new_range, protocol)

    def _remap_range(self, old_key: PortRange, new_key: PortRange, protocol: PortProtocol):
        self.mappings[(HashableList(new_key), protocol)] = self.mappings.pop(
            (HashableList(old_key), protocol)
        )

    def __repr__(self):
        return f"<PortMappings: {self.to_dict()}>"


SimpleVolumeBind = Tuple[str, str]
"""Type alias for a simple version of VolumeBind"""


@dataclasses.dataclass
class VolumeBind:
    """Represents a --volume argument run/create command. When using VolumeBind to bind-mount a file or directory
    that does not yet exist on the Docker host, -v creates the endpoint for you. It is always created as a directory.
    """

    host_dir: str
    container_dir: str
    read_only: bool = False

    def to_str(self) -> str:
        args = []

        if self.host_dir:
            args.append(self.host_dir)

        if not self.container_dir:
            raise ValueError("no container dir specified")

        args.append(self.container_dir)

        if self.read_only:
            args.append("ro")

        return ":".join(args)

    @classmethod
    def parse(cls, param: str) -> "VolumeBind":
        parts = param.split(":")
        if 1 > len(parts) > 3:
            raise ValueError(f"Cannot parse volume bind {param}")

        volume = cls(parts[0], parts[1])
        if len(parts) == 3:
            if "ro" in parts[2].split(","):
                volume.read_only = True
        return volume


class VolumeMappings:
    mappings: List[Union[SimpleVolumeBind, VolumeBind]]

    def __init__(self, mappings: List[Union[SimpleVolumeBind, VolumeBind]] = None):
        self.mappings = mappings if mappings is not None else []

    def add(self, mapping: Union[SimpleVolumeBind, VolumeBind]):
        self.append(mapping)

    def append(
        self,
        mapping: Union[
            SimpleVolumeBind,
            VolumeBind,
        ],
    ):
        self.mappings.append(mapping)

    def find_target_mapping(
        self, container_dir: str
    ) -> Optional[Union[SimpleVolumeBind, VolumeBind]]:
        """
        Looks through the volumes and returns the one where the container dir matches ``container_dir``.
        Returns None if there is no volume mapping to the given container directory.

        :param container_dir: the target of the volume mapping, i.e., the path in the container
        :return: the volume mapping or None
        """
        for volume in self.mappings:
            target_dir = volume[1] if isinstance(volume, tuple) else volume.container_dir
            if container_dir == target_dir:
                return volume
        return None

    def __iter__(self):
        return self.mappings.__iter__()

    def __repr__(self):
        return self.mappings.__repr__()


VolumeType = Literal["bind", "volume"]


class VolumeInfo(NamedTuple):
    """Container volume information."""

    type: VolumeType
    source: str
    destination: str
    mode: str
    rw: bool
    propagation: str
    name: Optional[str] = None
    driver: Optional[str] = None


@dataclasses.dataclass
class ContainerConfiguration:
    image_name: str
    name: Optional[str] = None
    volumes: VolumeMappings = dataclasses.field(default_factory=VolumeMappings)
    ports: PortMappings = dataclasses.field(default_factory=PortMappings)
    exposed_ports: List[str] = dataclasses.field(default_factory=list)
    entrypoint: Optional[str] = None
    additional_flags: Optional[str] = None
    command: Optional[List[str]] = None
    env_vars: Dict[str, str] = dataclasses.field(default_factory=dict)

    privileged: bool = False
    remove: bool = False
    interactive: bool = False
    tty: bool = False
    detach: bool = False

    stdin: Optional[str] = None
    user: Optional[str] = None
    cap_add: Optional[List[str]] = None
    cap_drop: Optional[List[str]] = None
    security_opt: Optional[List[str]] = None
    network: Optional[str] = None
    dns: Optional[str] = None
    workdir: Optional[str] = None
    platform: Optional[str] = None
    ulimits: Optional[List[Ulimit]] = None
    labels: Optional[Dict[str, str]] = None
    init: Optional[bool] = None


class ContainerConfigurator(Protocol):
    """Protocol for functional configurators. A ContainerConfigurator modifies, when called,
    a ContainerConfiguration in place."""

    def __call__(self, configuration: ContainerConfiguration):
        """
        Modify the given container configuration.

        :param configuration: the configuration to modify
        """
        ...


@dataclasses.dataclass
class DockerRunFlags:
    """Class to capture Docker run/create flags for a container.
    run: https://docs.docker.com/engine/reference/commandline/run/
    create: https://docs.docker.com/engine/reference/commandline/create/
    """

    env_vars: Optional[Dict[str, str]]
    extra_hosts: Optional[Dict[str, str]]
    labels: Optional[Dict[str, str]]
    mounts: Optional[List[SimpleVolumeBind]]
    network: Optional[str]
    platform: Optional[DockerPlatform]
    privileged: Optional[bool]
    ports: Optional[PortMappings]
    ulimits: Optional[List[Ulimit]]
    user: Optional[str]
    dns: Optional[List[str]]


# TODO: remove Docker/Podman compatibility switches (in particular strip_wellknown_repo_prefixes=...)
#  from the container client base interface and introduce derived Podman client implementations instead!
class ContainerClient(metaclass=ABCMeta):
    @abstractmethod
    def get_system_info(self) -> dict:
        """Returns the docker system-wide information as dictionary (``docker info``)."""

    def get_system_id(self) -> str:
        """Returns the unique and stable ID of the docker daemon."""
        return self.get_system_info()["ID"]

    @abstractmethod
    def get_container_status(self, container_name: str) -> DockerContainerStatus:
        """Returns the status of the container with the given name"""
        pass

    def get_networks(self, container_name: str) -> List[str]:
        LOG.debug("Getting networks for container: %s", container_name)
        container_attrs = self.inspect_container(container_name_or_id=container_name)
        return list(container_attrs["NetworkSettings"].get("Networks", {}).keys())

    def get_container_ipv4_for_network(
        self, container_name_or_id: str, container_network: str
    ) -> str:
        """
        Returns the IPv4 address for the container on the interface connected to the given network
        :param container_name_or_id: Container to inspect
        :param container_network: Network the IP address will belong to
        :return: IP address of the given container on the interface connected to the given network
        """
        LOG.debug(
            "Getting ipv4 address for container %s in network %s.",
            container_name_or_id,
            container_network,
        )
        # we always need the ID for this
        container_id = self.get_container_id(container_name=container_name_or_id)
        network_attrs = self.inspect_network(container_network)
        containers = network_attrs.get("Containers") or {}
        if container_id not in containers:
            LOG.debug("Network attributes: %s", network_attrs)
            try:
                inspection = self.inspect_container(container_name_or_id=container_name_or_id)
                LOG.debug("Container %s Attributes: %s", container_name_or_id, inspection)
                logs = self.get_container_logs(container_name_or_id=container_name_or_id)
                LOG.debug("Container %s Logs: %s", container_name_or_id, logs)
            except ContainerException as e:
                LOG.debug("Cannot inspect container %s: %s", container_name_or_id, e)
            raise ContainerException(
                "Container %s is not connected to target network %s",
                container_name_or_id,
                container_network,
            )
        try:
            ip = str(ipaddress.IPv4Interface(containers[container_id]["IPv4Address"]).ip)
        except Exception as e:
            raise ContainerException(
                f"Unable to detect IP address for container {container_name_or_id} in network {container_network}: {e}"
            )
        return ip

    @abstractmethod
    def stop_container(self, container_name: str, timeout: int = 10):
        """Stops container with given name
        :param container_name: Container identifier (name or id) of the container to be stopped
        :param timeout: Timeout after which SIGKILL is sent to the container.
        """

    @abstractmethod
    def restart_container(self, container_name: str, timeout: int = 10):
        """Restarts a container with the given name.
        :param container_name: Container identifier
        :param timeout: Seconds to wait for stop before killing the container
        """

    @abstractmethod
    def pause_container(self, container_name: str):
        """Pauses a container with the given name."""

    @abstractmethod
    def unpause_container(self, container_name: str):
        """Unpauses a container with the given name."""

    @abstractmethod
    def remove_container(self, container_name: str, force=True, check_existence=False) -> None:
        """Removes container with given name"""

    @abstractmethod
    def remove_image(self, image: str, force: bool = True) -> None:
        """Removes an image with given name

        :param image: Image name and tag
        :param force: Force removal
        """

    @abstractmethod
    def list_containers(self, filter: Union[List[str], str, None] = None, all=True) -> List[dict]:
        """List all containers matching the given filters

        :return: A list of dicts with keys id, image, name, labels, status
        """

    def get_running_container_names(self) -> List[str]:
        """Returns a list of the names of all running containers"""
        result = self.list_containers(all=False)
        result = [container["name"] for container in result]
        return result

    def is_container_running(self, container_name: str) -> bool:
        """Checks whether a container with a given name is currently running"""
        return container_name in self.get_running_container_names()

    @abstractmethod
    def copy_into_container(
        self, container_name: str, local_path: str, container_path: str
    ) -> None:
        """Copy contents of the given local path into the container"""

    @abstractmethod
    def copy_from_container(
        self, container_name: str, local_path: str, container_path: str
    ) -> None:
        """Copy contents of the given container to the host"""

    @abstractmethod
    def pull_image(self, docker_image: str, platform: Optional[DockerPlatform] = None) -> None:
        """Pulls an image with a given name from a Docker registry"""

    @abstractmethod
    def push_image(self, docker_image: str) -> None:
        """Pushes an image with a given name to a Docker registry"""

    @abstractmethod
    def build_image(
        self,
        dockerfile_path: str,
        image_name: str,
        context_path: str = None,
        platform: Optional[DockerPlatform] = None,
    ) -> None:
        """Builds an image from the given Dockerfile

        :param dockerfile_path: Path to Dockerfile, or a directory that contains a Dockerfile
        :param image_name: Name of the image to be built
        :param context_path: Path for build context (defaults to dirname of Dockerfile)
        :param platform: Target platform for build (defaults to platform of Docker host)
        """

    @abstractmethod
    def tag_image(self, source_ref: str, target_name: str) -> None:
        """Tags an image with a new name

        :param source_ref: Name or ID of the image to be tagged
        :param target_name: New name (tag) of the tagged image
        """

    @abstractmethod
    def get_docker_image_names(
        self,
        strip_latest: bool = True,
        include_tags: bool = True,
        strip_wellknown_repo_prefixes: bool = True,
    ) -> List[str]:
        """
        Get all names of docker images available to the container engine
        :param strip_latest: return images both with and without :latest tag
        :param include_tags: include tags of the images in the names
        :param strip_wellknown_repo_prefixes: whether to strip off well-known repo prefixes like
               "localhost/" or "docker.io/library/" which are added by the Podman API, but not by Docker
        :return: List of image names
        """

    @abstractmethod
    def get_container_logs(self, container_name_or_id: str, safe: bool = False) -> str:
        """Get all logs of a given container"""

    @abstractmethod
    def stream_container_logs(self, container_name_or_id: str) -> CancellableStream:
        """Returns a blocking generator you can iterate over to retrieve log output as it happens."""

    @abstractmethod
    def inspect_container(self, container_name_or_id: str) -> Dict[str, Union[Dict, str]]:
        """Get detailed attributes of a container.

        :return: Dict containing docker attributes as returned by the daemon
        """

    def inspect_container_volumes(self, container_name_or_id) -> List[VolumeInfo]:
        """Return information about the volumes mounted into the given container.

        :param container_name_or_id: the container name or id
        :return: a list of volumes
        """
        volumes = []
        for doc in self.inspect_container(container_name_or_id)["Mounts"]:
            volumes.append(VolumeInfo(**{k.lower(): v for k, v in doc.items()}))

        return volumes

    @abstractmethod
    def inspect_image(
        self, image_name: str, pull: bool = True, strip_wellknown_repo_prefixes: bool = True
    ) -> Dict[str, Union[dict, list, str]]:
        """Get detailed attributes of an image.

        :param image_name: Image name to inspect
        :param pull: Whether to pull image if not existent
        :param strip_wellknown_repo_prefixes: whether to strip off well-known repo prefixes like
               "localhost/" or "docker.io/library/" which are added by the Podman API, but not by Docker
        :return: Dict containing docker attributes as returned by the daemon
        """

    @abstractmethod
    def create_network(self, network_name: str) -> str:
        """
        Creates a network with the given name
        :param network_name: Name of the network
        :return Network ID
        """

    @abstractmethod
    def delete_network(self, network_name: str) -> None:
        """
        Delete a network with the given name
        :param network_name: Name of the network
        """

    @abstractmethod
    def inspect_network(self, network_name: str) -> Dict[str, Union[Dict, str]]:
        """Get detailed attributes of an network.

        :return: Dict containing docker attributes as returned by the daemon
        """

    @abstractmethod
    def connect_container_to_network(
        self,
        network_name: str,
        container_name_or_id: str,
        aliases: Optional[List] = None,
        link_local_ips: List[str] = None,
    ) -> None:
        """
        Connects a container to a given network
        :param network_name: Network to connect the container to
        :param container_name_or_id: Container to connect to the network
        :param aliases: List of dns names the container should be available under in the network
        :param link_local_ips: List of link-local (IPv4 or IPv6) addresses
        """

    @abstractmethod
    def disconnect_container_from_network(
        self, network_name: str, container_name_or_id: str
    ) -> None:
        """
        Disconnects a container from a given network
        :param network_name: Network to disconnect the container from
        :param container_name_or_id: Container to disconnect from the network
        """

    def get_container_name(self, container_id: str) -> str:
        """Get the name of a container by a given identifier"""
        return self.inspect_container(container_id)["Name"].lstrip("/")

    def get_container_id(self, container_name: str) -> str:
        """Get the id of a container by a given name"""
        return self.inspect_container(container_name)["Id"]

    @abstractmethod
    def get_container_ip(self, container_name_or_id: str) -> str:
        """Get the IP address of a given container

        If container has multiple networks, it will return the IP of the first
        """

    def get_image_cmd(self, docker_image: str, pull: bool = True) -> List[str]:
        """Get the command for the given image
        :param docker_image: Docker image to inspect
        :param pull: Whether to pull if image is not present
        :return: Image command in its array form
        """
        cmd_list = self.inspect_image(docker_image, pull)["Config"]["Cmd"] or []
        return cmd_list

    def get_image_entrypoint(self, docker_image: str, pull: bool = True) -> str:
        """Get the entry point for the given image
        :param docker_image: Docker image to inspect
        :param pull: Whether to pull if image is not present
        :return: Image entrypoint
        """
        LOG.debug("Getting the entrypoint for image: %s", docker_image)
        entrypoint_list = self.inspect_image(docker_image, pull)["Config"].get("Entrypoint") or []
        return shlex.join(entrypoint_list)

    @abstractmethod
    def has_docker(self) -> bool:
        """Check if system has docker available"""

    @abstractmethod
    def commit(
        self,
        container_name_or_id: str,
        image_name: str,
        image_tag: str,
    ):
        """Create an image from a running container.

        :param container_name_or_id: Source container
        :param image_name: Destination image name
        :param image_tag: Destination image tag
        """

    def create_container_from_config(self, container_config: ContainerConfiguration) -> str:
        """
        Similar to create_container, but allows passing the whole ContainerConfiguration
        :param container_config: ContainerConfiguration how to start the container
        :return: Container ID
        """
        return self.create_container(
            image_name=container_config.image_name,
            name=container_config.name,
            entrypoint=container_config.entrypoint,
            remove=container_config.remove,
            interactive=container_config.interactive,
            tty=container_config.tty,
            command=container_config.command,
            mount_volumes=container_config.volumes,
            ports=container_config.ports,
            exposed_ports=container_config.exposed_ports,
            env_vars=container_config.env_vars,
            user=container_config.user,
            cap_add=container_config.cap_add,
            cap_drop=container_config.cap_drop,
            security_opt=container_config.security_opt,
            network=container_config.network,
            dns=container_config.dns,
            additional_flags=container_config.additional_flags,
            workdir=container_config.workdir,
            privileged=container_config.privileged,
            platform=container_config.platform,
            labels=container_config.labels,
            ulimits=container_config.ulimits,
            init=container_config.init,
        )

    @abstractmethod
    def create_container(
        self,
        image_name: str,
        *,
        name: Optional[str] = None,
        entrypoint: Optional[str] = None,
        remove: bool = False,
        interactive: bool = False,
        tty: bool = False,
        detach: bool = False,
        command: Optional[Union[List[str], str]] = None,
        mount_volumes: Optional[Union[VolumeMappings, List[SimpleVolumeBind]]] = None,
        ports: Optional[PortMappings] = None,
        exposed_ports: Optional[List[str]] = None,
        env_vars: Optional[Dict[str, str]] = None,
        user: Optional[str] = None,
        cap_add: Optional[List[str]] = None,
        cap_drop: Optional[List[str]] = None,
        security_opt: Optional[List[str]] = None,
        network: Optional[str] = None,
        dns: Optional[Union[str, List[str]]] = None,
        additional_flags: Optional[str] = None,
        workdir: Optional[str] = None,
        privileged: Optional[bool] = None,
        labels: Optional[Dict[str, str]] = None,
        platform: Optional[DockerPlatform] = None,
        ulimits: Optional[List[Ulimit]] = None,
        init: Optional[bool] = None,
    ) -> str:
        """Creates a container with the given image

        :return: Container ID
        """

    @abstractmethod
    def run_container(
        self,
        image_name: str,
        stdin: bytes = None,
        *,
        name: Optional[str] = None,
        entrypoint: Optional[str] = None,
        remove: bool = False,
        interactive: bool = False,
        tty: bool = False,
        detach: bool = False,
        command: Optional[Union[List[str], str]] = None,
        mount_volumes: Optional[Union[VolumeMappings, List[SimpleVolumeBind]]] = None,
        ports: Optional[PortMappings] = None,
        exposed_ports: Optional[List[str]] = None,
        env_vars: Optional[Dict[str, str]] = None,
        user: Optional[str] = None,
        cap_add: Optional[List[str]] = None,
        cap_drop: Optional[List[str]] = None,
        security_opt: Optional[List[str]] = None,
        network: Optional[str] = None,
        dns: Optional[str] = None,
        additional_flags: Optional[str] = None,
        workdir: Optional[str] = None,
        labels: Optional[Dict[str, str]] = None,
        platform: Optional[DockerPlatform] = None,
        privileged: Optional[bool] = None,
        ulimits: Optional[List[Ulimit]] = None,
        init: Optional[bool] = None,
    ) -> Tuple[bytes, bytes]:
        """Creates and runs a given docker container

        :return: A tuple (stdout, stderr)
        """

    def run_container_from_config(
        self, container_config: ContainerConfiguration
    ) -> Tuple[bytes, bytes]:
        """Like ``run_container`` but uses the parameters from the configuration."""

        return self.run_container(
            image_name=container_config.image_name,
            stdin=container_config.stdin,
            name=container_config.name,
            entrypoint=container_config.entrypoint,
            remove=container_config.remove,
            interactive=container_config.interactive,
            tty=container_config.tty,
            detach=container_config.detach,
            command=container_config.command,
            mount_volumes=container_config.volumes,
            ports=container_config.ports,
            exposed_ports=container_config.exposed_ports,
            env_vars=container_config.env_vars,
            user=container_config.user,
            cap_add=container_config.cap_add,
            cap_drop=container_config.cap_drop,
            security_opt=container_config.security_opt,
            network=container_config.network,
            dns=container_config.dns,
            additional_flags=container_config.additional_flags,
            workdir=container_config.workdir,
            platform=container_config.platform,
            privileged=container_config.privileged,
            ulimits=container_config.ulimits,
            init=container_config.init,
        )

    @abstractmethod
    def exec_in_container(
        self,
        container_name_or_id: str,
        command: Union[List[str], str],
        interactive: bool = False,
        detach: bool = False,
        env_vars: Optional[Dict[str, Optional[str]]] = None,
        stdin: Optional[bytes] = None,
        user: Optional[str] = None,
        workdir: Optional[str] = None,
    ) -> Tuple[bytes, bytes]:
        """Execute a given command in a container

        :return: A tuple (stdout, stderr)
        """

    @abstractmethod
    def start_container(
        self,
        container_name_or_id: str,
        stdin: bytes = None,
        interactive: bool = False,
        attach: bool = False,
        flags: Optional[str] = None,
    ) -> Tuple[bytes, bytes]:
        """Start a given, already created container

        :return: A tuple (stdout, stderr) if attach or interactive is set, otherwise a tuple (b"container_name_or_id", b"")
        """

    @abstractmethod
    def attach_to_container(self, container_name_or_id: str):
        """
        Attach local standard input, output, and error streams to a running container
        """

    @abstractmethod
    def login(self, username: str, password: str, registry: Optional[str] = None) -> None:
        """
        Login into an OCI registry

        :param username: Username for the registry
        :param password: Password / token for the registry
        :param registry: Registry url
        """


class Util:
    MAX_ENV_ARGS_LENGTH = 20000

    @staticmethod
    def format_env_vars(key: str, value: Optional[str]):
        if value is None:
            return key
        return f"{key}={value}"

    @classmethod
    def create_env_vars_file_flag(cls, env_vars: Dict) -> Tuple[List[str], Optional[str]]:
        if not env_vars:
            return [], None
        result = []
        env_vars = dict(env_vars)
        env_file = None
        if len(str(env_vars)) > cls.MAX_ENV_ARGS_LENGTH:
            # default ARG_MAX=131072 in Docker - let's create an env var file if the string becomes too long...
            env_file = cls.mountable_tmp_file()
            env_content = ""
            for name, value in dict(env_vars).items():
                if len(value) > cls.MAX_ENV_ARGS_LENGTH:
                    # each line in the env file has a max size as well (error "bufio.Scanner: token too long")
                    continue
                env_vars.pop(name)
                value = value.replace("\n", "\\")
                env_content += f"{cls.format_env_vars(name, value)}\n"
            save_file(env_file, env_content)
            result += ["--env-file", env_file]

        env_vars_res = [
            item for k, v in env_vars.items() for item in ["-e", cls.format_env_vars(k, v)]
        ]
        result += env_vars_res
        return result, env_file

    @staticmethod
    def rm_env_vars_file(env_vars_file) -> None:
        if env_vars_file:
            return rm_rf(env_vars_file)

    @staticmethod
    def mountable_tmp_file():
        f = os.path.join(config.dirs.mounted_tmp, short_uid())
        TMP_FILES.append(f)
        return f

    @staticmethod
    def append_without_latest(image_names: List[str]):
        suffix = ":latest"
        for image in list(image_names):
            if image.endswith(suffix):
                image_names.append(image[: -len(suffix)])

    @staticmethod
    def strip_wellknown_repo_prefixes(image_names: List[str]) -> List[str]:
        """
        Remove well-known repo prefixes like `localhost/` or `docker.io/library/` from the list of given
        image names. This is mostly to ensure compatibility of our Docker client with Podman API responses.
        :return: a copy of the list of image names, with well-known repo prefixes removed
        """
        result = []
        for image in image_names:
            for prefix in WELL_KNOWN_IMAGE_REPO_PREFIXES:
                if image.startswith(prefix):
                    image = image.removeprefix(prefix)
                    # strip only one of the matching prefixes (avoid multi-stripping)
                    break
            result.append(image)
        return result

    @staticmethod
    def tar_path(path: str, target_path: str, is_dir: bool):
        f = tempfile.NamedTemporaryFile()
        with tarfile.open(mode="w", fileobj=f) as t:
            abs_path = os.path.abspath(path)
            arcname = (
                os.path.basename(path)
                if is_dir
                else (os.path.basename(target_path) or os.path.basename(path))
            )
            t.add(abs_path, arcname=arcname)

        f.seek(0)
        return f

    @staticmethod
    def untar_to_path(tardata, target_path):
        target_path = Path(target_path)
        with tarfile.open(mode="r", fileobj=io.BytesIO(b"".join(b for b in tardata))) as t:
            if target_path.is_dir():
                t.extractall(path=target_path)
            else:
                member = t.next()
                if member:
                    member.name = target_path.name
                    t.extract(member, target_path.parent)
                else:
                    LOG.debug("File to copy empty, ignoring...")

    @staticmethod
    def parse_additional_flags(
        additional_flags: str,
        env_vars: Optional[Dict[str, str]] = None,
        labels: Optional[Dict[str, str]] = None,
        mounts: Optional[List[SimpleVolumeBind]] = None,
        network: Optional[str] = None,
        platform: Optional[DockerPlatform] = None,
        ports: Optional[PortMappings] = None,
        privileged: Optional[bool] = None,
        user: Optional[str] = None,
        ulimits: Optional[List[Ulimit]] = None,
        dns: Optional[Union[str, List[str]]] = None,
    ) -> DockerRunFlags:
        """Parses additional CLI-formatted Docker flags, which could overwrite provided defaults.
        :param additional_flags: String which contains the flag definitions inspired by the Docker CLI reference:
                                 https://docs.docker.com/engine/reference/commandline/run/
        :param env_vars: Dict with env vars. Will be modified in place.
        :param labels: Dict with labels. Will be modified in place.
        :param mounts: List of mount tuples (host_path, container_path). Will be modified in place.
        :param network: Existing network name (optional). Warning will be printed if network is overwritten in flags.
        :param platform: Platform to execute container. Warning will be printed if platform is overwritten in flags.
        :param ports: PortMapping object. Will be modified in place.
        :param privileged: Run the container in privileged mode. Warning will be printed if overwritten in flags.
        :param ulimits: ulimit options in the format <type>=<soft limit>[:<hard limit>]
        :param user: User to run first process. Warning will be printed if user is overwritten in flags.
        :param dns: List of DNS servers to configure the container with.
        :return: A DockerRunFlags object that will return new objects if respective parameters were None and
                additional flags contained a flag for that object or the same which are passed otherwise.
        """
        # Argparse refactoring opportunity: custom argparse actions can be used to modularize parsing (e.g., key=value)
        # https://docs.python.org/3/library/argparse.html#action

        # Configure parser
        parser = NoExitArgumentParser(description="Docker run flags parser")
        parser.add_argument(
            "--add-host",
            help="Add a custom host-to-IP mapping (host:ip)",
            dest="add_hosts",
            action="append",
        )
        parser.add_argument(
            "--env", "-e", help="Set environment variables", dest="envs", action="append"
        )
        parser.add_argument(
            "--label", "-l", help="Add container meta data", dest="labels", action="append"
        )
        parser.add_argument("--network", help="Connect a container to a network")
        parser.add_argument(
            "--platform",
            type=DockerPlatform,
            help="Docker platform (e.g., linux/amd64 or linux/arm64)",
        )
        parser.add_argument(
            "--privileged",
            help="Give extended privileges to this container",
            action="store_true",
        )
        parser.add_argument(
            "--publish",
            "-p",
            help="Publish container port(s) to the host",
            dest="publish_ports",
            action="append",
        )
        parser.add_argument(
            "--ulimit", help="Container ulimit settings", dest="ulimits", action="append"
        )
        parser.add_argument("--user", "-u", help="Username or UID to execute first process")
        parser.add_argument(
            "--volume", "-v", help="Bind mount a volume", dest="volumes", action="append"
        )
        parser.add_argument("--dns", help="Set custom DNS servers", dest="dns", action="append")

        # Parse
        flags = shlex.split(additional_flags)
        args = parser.parse_args(flags)

        # Post-process parsed flags
        extra_hosts = None
        if args.add_hosts:
            for add_host in args.add_hosts:
                extra_hosts = extra_hosts if extra_hosts is not None else {}
                hosts_split = add_host.split(":")
                extra_hosts[hosts_split[0]] = hosts_split[1]

        if args.envs:
            env_vars = env_vars if env_vars is not None else {}
            for env in args.envs:
                lhs, _, rhs = env.partition("=")
                env_vars[lhs] = rhs

        if args.labels:
            labels = labels if labels is not None else {}
            for label in args.labels:
                key, _, value = label.partition("=")
                # Only consider non-empty labels
                if key:
                    labels[key] = value

        if args.network:
            LOG.warning(
                "Overwriting Docker container network '%s' with new value '%s'",
                network,
                args.network,
            )
            network = args.network

        if args.platform:
            LOG.warning(
                "Overwriting Docker platform '%s' with new value '%s'",
                platform,
                args.platform,
            )
            platform = args.platform

        if args.privileged:
            LOG.warning(
                "Overwriting Docker container privileged flag %s with new value %s",
                privileged,
                args.privileged,
            )
            privileged = args.privileged

        if args.publish_ports:
            for port_mapping in args.publish_ports:
                port_split = port_mapping.split(":")
                protocol = "tcp"
                if len(port_split) == 2:
                    host_port, container_port = port_split
                elif len(port_split) == 3:
                    LOG.warning(
                        "Host part of port mappings are ignored currently in additional flags"
                    )
                    _, host_port, container_port = port_split
                else:
                    raise ValueError(f"Invalid port string provided: {port_mapping}")
                host_port_split = host_port.split("-")
                if len(host_port_split) == 2:
                    host_port = [int(host_port_split[0]), int(host_port_split[1])]
                elif len(host_port_split) == 1:
                    host_port = int(host_port)
                else:
                    raise ValueError(f"Invalid port string provided: {port_mapping}")
                if "/" in container_port:
                    container_port, protocol = container_port.split("/")
                ports = ports if ports is not None else PortMappings()
                ports.add(host_port, int(container_port), protocol)

        if args.ulimits:
            ulimits = ulimits if ulimits is not None else []
            ulimits_dict = {ul.name: ul for ul in ulimits}
            for ulimit in args.ulimits:
                name, _, rhs = ulimit.partition("=")
                soft, _, hard = rhs.partition(":")
                hard_limit = int(hard) if hard else int(soft)
                new_ulimit = Ulimit(name=name, soft_limit=int(soft), hard_limit=hard_limit)
                if ulimits_dict.get(name):
                    LOG.warning(f"Overwriting Docker ulimit {new_ulimit}")
                ulimits_dict[name] = new_ulimit
            ulimits = list(ulimits_dict.values())

        if args.user:
            LOG.warning(
                "Overwriting Docker user '%s' with new value '%s'",
                user,
                args.user,
            )
            user = args.user

        if args.volumes:
            mounts = mounts if mounts is not None else []
            for volume in args.volumes:
                match = re.match(
                    r"(?P<host>[\w\s\\\/:\-.]+?):(?P<container>[\w\s\/\-.]+)(?::(?P<arg>ro|rw|z|Z))?",
                    volume,
                )
                if not match:
                    LOG.warning("Unable to parse volume mount Docker flags: %s", volume)
                    continue
                host_path = match.group("host")
                container_path = match.group("container")
                rw_args = match.group("arg")
                if rw_args:
                    LOG.info("Volume options like :ro or :rw are currently ignored.")
                mounts.append((host_path, container_path))

        dns = ensure_list(dns or [])
        if args.dns:
            LOG.info(
                "Extending Docker container DNS servers %s with additional values %s", dns, args.dns
            )
            dns.extend(args.dns)

        return DockerRunFlags(
            env_vars=env_vars,
            extra_hosts=extra_hosts,
            labels=labels,
            mounts=mounts,
            ports=ports,
            network=network,
            platform=platform,
            privileged=privileged,
            ulimits=ulimits,
            user=user,
            dns=dns,
        )

    @staticmethod
    def convert_mount_list_to_dict(
        mount_volumes: Union[List[SimpleVolumeBind], VolumeMappings],
    ) -> Dict[str, Dict[str, str]]:
        """Converts a List of (host_path, container_path) tuples to a Dict suitable as volume argument for docker sdk"""

        def _map_to_dict(paths: SimpleVolumeBind | VolumeBind):
            if isinstance(paths, VolumeBind):
                return str(paths.host_dir), {
                    "bind": paths.container_dir,
                    "mode": "ro" if paths.read_only else "rw",
                }
            else:
                return str(paths[0]), {"bind": paths[1], "mode": "rw"}

        return dict(
            map(
                _map_to_dict,
                mount_volumes,
            )
        )

    @staticmethod
    def resolve_dockerfile_path(dockerfile_path: str) -> str:
        """If the given path is a directory that contains a Dockerfile, then return the file path to it."""
        rel_path = os.path.join(dockerfile_path, "Dockerfile")
        if os.path.isdir(dockerfile_path) and os.path.exists(rel_path):
            return rel_path
        return dockerfile_path