runperf/machine.py
#!/bin/env python3
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
#
# See LICENSE for more details.
#
# Copyright: Red Hat Inc. 2018
# Author: Lukas Doktor <ldoktor@redhat.com>
import contextlib
import json
import logging
import os
import re
import time
import uuid
import aexpect
import yaml
from . import exceptions, profiles, utils
from .utils import MutableShellSession as ShellSession
from .utils import CONTEXT
LOG = logging.getLogger(__name__)
# : Path to yaml files with host configurations
HOSTS_DIR = os.path.abspath(os.path.join(os.path.dirname(__file__), 'hosts'))
# : Minimal set of required keys for host definition
HOST_KEYS = {'hugepage_kb', 'numa_nodes', 'host_cpus',
'guest_cpus', 'guest_mem_m', 'arch'}
def get_distro_info(machine):
"""Various basic sysinfo"""
out = {"general": f"Name:{machine.name}\nDistro:{machine.distro}"}
with machine.get_session_cont() as session:
# Get basic kernel info
kernel = session.cmd("uname -r; uname -v; uname -m; uname -o",
print_func='mute', ignore_all_errors=True)
kernel_ver = kernel.split('\n', 1)[0].strip()
# Do not include kernel_version in the cmdline
kernel_cmd = session.cmd("cat /proc/cmdline", print_func='mute',
ignore_all_errors=True)
out["kernel_raw"] = kernel + '\n' + kernel_cmd
kernel_cmd = kernel_cmd.replace(kernel_ver, "FILTERED")
# Sort the kernel_cmdline parts as the order does not matter
out["kernel"] = (kernel + '\n' +
" ".join(sorted(_.strip()
for _ in kernel_cmd.split(' ')
if _.strip())))
out["mitigations"] = session.cmd("grep --color=never . "
"/sys/devices/system/cpu/"
"vulnerabilities/*",
print_func='mute',
ignore_all_errors=True)
if session.cmd_status("which rpm", print_func='mute') == 0:
out["rpm"] = session.cmd("rpm -qa | sort", print_func='mute',
ignore_all_errors=True)
out["systemctl"] = session.cmd("systemctl | "
"grep -v 'session-[0-9]*\\.scope'"
" | tr -s ' ' | uniq | sort",
print_func='mute',
ignore_all_errors=True)
out["runperf_sysinfo"] = session.cmd("cat /var/lib/runperf/sysinfo "
"| uniq | sort",
print_func='mute',
ignore_all_errors=True)
return out
class BaseMachine:
"""
Basic machine interaction
"""
def __init__(self, log, name, distro, default_passwords=None):
self.log = log # worker log
self.name = name # human readable name
self.distro = distro # distribution running/to-be-provisioned
self.default_passwords = default_passwords # default ssh passwords
self.log_fetcher = utils.LogFetcher()
# For the first time collect everything
self.log_fetcher.params["since"] = 0
def __str__(self):
return self.name
def __repr__(self):
return f"{self.__class__.__name__}({self.name}, {self.distro})"
def get_fullname(self):
"""
Return full host name
"""
return self.name
def get_addr(self):
"""
Get addr/hostname
"""
raise NotImplementedError
def get_host_addr(self):
"""
Get addr/hostname of the host (or self)
"""
raise NotImplementedError
def get_ssh_cmd(self, hop=None):
"""
Get session
:param hop: Use hop as ssh proxy
"""
if hop:
return (hop.get_ssh_cmd() +
" -A -t ssh -o BatchMode=yes "
"-o StrictHostKeyChecking=no "
"-o UserKnownHostsFile=/dev/null -o ControlMaster=auto "
"-o ControlPath='/var/tmp/%r@%h-%p' "
f"-o ControlPersist=60 root@{self.get_addr()}")
return ("ssh -o BatchMode=yes -o StrictHostKeyChecking=no"
" -o UserKnownHostsFile=/dev/null -o ControlMaster=auto "
"-o ControlPath='/var/tmp/%r@%h-%p' "
f"-o ControlPersist=60 root@{self.get_addr()}")
def ssh_copy_id(self, hop=None):
"""
Copy default id to remote host
"""
return utils.ssh_copy_id(self.log, self.get_addr(),
self.default_passwords, hop)
def get_session(self, timeout=60, hop=None):
"""
Get session to this machine
:param timeout: timeout
:param hop: ssh proxy machine
:type hop: BaseMachine
:return: aexpect shell session
"""
end = time.time() + timeout
session = None
rp_path = os.path.join("__sessions__",
utils.string_to_safe_path(self.name))
try:
while time.time() < end:
try:
session = ShellSession(None, self.get_ssh_cmd(hop))
session.read_up_to_prompt()
session.close()
session = None
session = ShellSession(rp_path, self.get_ssh_cmd(hop),
output_func=self.log.debug,
output_prefix=">> ")
session.read_up_to_prompt()
session.sendline("export TERM=xterm-256color")
for _ in range(3):
try:
session.cmd("true")
return session
except (aexpect.ExpectError, aexpect.ShellError):
pass
# Session not ready, let's try it again
session.close()
self.log.debug("Session is not responsive, trying another "
"round")
continue
except (aexpect.ExpectError, aexpect.ShellError) as err:
if session:
session.close()
session = None
if "Permission denied" in str(err):
if not self.default_passwords:
raise RuntimeError("Permission denied and no "
"default passwords specified:\n"
f"{err}") from err
self.ssh_copy_id(hop)
time.sleep(1)
except Exception as err:
if session:
session.close()
raise RuntimeError(f"Unable to get ssh session: {err}") from err
raise RuntimeError("Timeout while getting ssh session "
f"({self.get_ssh_cmd(hop)})")
@contextlib.contextmanager
def get_session_cont(self, timeout=60, hop=None):
"""
Get session to this machine suitable for "with" usage
:param timeout: timeout
:param hop: ssh proxy machine
:type hop: BaseMachine
:return: aexpect shell session
"""
session = None
try:
session = self.get_session(timeout, hop)
yield session
finally:
if session:
session.close()
def copy_from(self, src, dst):
"""
Copy file from the machine
:warning: This won't check/setup keys
"""
cmd = ["rsync", "-amrh", "-e", "ssh -o StrictHostKeyChecking=no " +
"-o UserKnownHostsFile=/dev/null -o ControlMaster=auto " +
"-o ControlPath='/var/tmp/%r@%h-%p' -o ControlPersist=60" +
" -o BatchMode=yes", f"root@{self.get_addr()}:{src}",
dst]
utils.check_output(cmd)
def copy_to(self, src, dst):
"""
Copy file(s) to the machine
:warning: This won't check/setup keys
"""
cmd = ["rsync", "-amrh", "-e", "ssh -o StrictHostKeyChecking=no " +
"-o UserKnownHostsFile=/dev/null -o ControlMaster=auto " +
"-o ControlPath='/var/tmp/%r@%h-%p' -o ControlPersist=60" +
" -o BatchMode=yes", src, f"root@{self.get_addr()}:{dst}"]
utils.check_output(cmd)
def get_info(self):
"""
Report basic info about this machine
"""
output = {}
for entry in utils.sorted_entry_points('runperf.machine.distro_info'):
out = entry.load()(self)
if out:
output.update(out)
return output
def fetch_logs(self, path):
"""
Fetch logs from this machine
"""
self.log_fetcher.collect(path, self)
class Controller:
"""
Object allowing to interact with multiple hosts
"""
def __init__(self, args, log):
self.log = log
self._args = args
self._provisioner = args.provisioner
self.default_passwords = args.default_passwords
self.paths = args.paths
self.profile = None
main_host = args.hosts[0]
root_log = logging.getLogger('')
self.main_host = Host(root_log, main_host[0], main_host[1],
args.distro, args)
hosts = [self.main_host]
for host in args.hosts[1:]:
hosts.append(Host(root_log, host[0], host[1], args.distro, args,
self.main_host))
self.hosts = hosts
self.metadata = args.metadata
@staticmethod
def for_each_host(hosts, method, args=tuple(), kwargs=None):
"""
Perform action in parallel on each host, signal RebootRequest if
necessary.
:param hosts: List of hosts to run the tasks on
:param method: host.$method to be performed per each host
:param args: positional arguments forwarded to the called methods
:param kwargs: key word arguments forwarded to the called methods
:raise exceptions.RebootRequest: When any of the actions report
non-zero return.
"""
if kwargs is None:
kwargs = {}
threads = [utils.ThreadWithStatus(target=getattr(host, method),
name=f"{host.name}-{method}",
args=args, kwargs=kwargs)
for host in hosts]
for thread in threads:
thread.start()
for thread in threads:
thread.join()
for thread in threads:
if thread.completed is not True:
if thread.exc:
raise RuntimeError(f"Thread {thread} "
"failed") from thread.exc
raise RuntimeError(f"Thread {thread} failed")
reboot_request = [host for host in hosts if host.reboot_request]
if reboot_request:
raise exceptions.RebootRequest(reboot_request, method)
def for_each_host_retry(self, attempts, hosts, method, args=tuple(),
kwargs=None):
"""
Perform action in parallel on each host while allowing re-try if
available.
This is useful for tasks that might fail/require reboot.
:param attempts: How many attempts per-host
:param hosts: List of hosts to run the tasks on
:param method: host.$method to be performed per each host
:param args: positional arguments forwarded to the called methods
:param kwargs: key word arguments forwarded to the called methods
:raise exceptions.RebootRequest: When any of the actions report
non-zero return.
"""
if kwargs is None:
kwargs = {}
i = 0
all_hosts = hosts
while True:
try:
self.for_each_host(hosts, method, args, kwargs)
return
except exceptions.RebootRequest as exc:
# Retry only with hosts that requested retry
hosts = exc.hosts
for host in hosts:
host.reboot()
i += 1
if i >= attempts:
raise RuntimeError(f"Failed to {method} on "
f"{','.join(str(_) for _ in hosts)} "
f"({','.join(str(_) for _ in all_hosts)}) "
f"in {attempts} attempts")
def setup(self):
"""Basic setup like ssh keys, pbench installation and such"""
CONTEXT.msg(f"SETUP hosts {','.join(str(_) for _ in self.hosts)}")
if self._provisioner:
self.log.info("PROVISION %s", self.hosts)
plugin = utils.named_entry_point('runperf.provisioners',
self._provisioner[0])
provisioner = plugin(self, self._provisioner[1])
self.for_each_host(self.hosts, 'provision', (provisioner,))
# Run per-host setup
self.for_each_host_retry(2, self.hosts, "setup")
# Allow to customize host
if self._args.host_setup_script:
with open(self._args.host_setup_script,
encoding="utf-8") as script:
self.for_each_host(self.hosts, 'run_script', [script.read()])
if self._args.host_rpms:
cmd = utils.shell_dnf_install_cmd(self._args.host_rpms)
self.for_each_host(self.hosts, 'run_script', [cmd])
if self._args.host_setup_script_reboot:
self.for_each_host(self.hosts, "reboot")
shared_pub_key = self.main_host.generate_ssh_key()
world_versions = []
for host in self.hosts:
host.shared_pub_key = shared_pub_key
world_versions.append(host.get_info())
self.write_metadata("environment_world", json.dumps(world_versions))
def write_metadata(self, key, value):
"""Append the key:value to the RUNPERF_METADATA file"""
with open(os.path.join(self._args.output, "RUNPERF_METADATA"),
'a', encoding="utf-8") as out:
out.write(f"\n{key}:")
out.write(value)
def fetch_logs(self, path):
"""
Fetch logs from all hosts
"""
self.for_each_host(self.hosts, 'fetch_logs', (path, ))
def _step(self):
"""
Decorator to record failures in our outputdir
"""
def inner(func):
def wrapper(*args, **kwargs):
try:
return func(*args, **kwargs)
except Exception as exc:
err_path = utils.record_failure(self._args.output, exc)
try:
self.fetch_logs(err_path)
except Exception: # pylint: disable=W0703
pass
raise exceptions.StepFailed from exc
return wrapper
return inner
def _apply_profile(self, profile, extra):
CONTEXT.msg(f"APPLY profile {profile} {extra}")
# Allow 5 attempts, one to revert previous profile, one to
# apply and 3 extra in case one boot fails to get resources
# (eg. hugepages)
setup_script = None
if self._args.worker_setup_script:
with open(self._args.worker_setup_script,
encoding="utf-8") as setup_script_fd:
setup_script = setup_script_fd.read()
if self._args.worker_rpms:
if not setup_script:
setup_script = '#!/bin/bash\n'
setup_script += '\n\nInstall rpms specified by --worker-rpms\n'
setup_script += utils.shell_dnf_install_cmd(self._args.worker_rpms)
self.for_each_host_retry(5, self.hosts, 'apply_profile',
(profile, extra, setup_script,
self.paths))
self.profile = self.main_host.profile.name
return [host.workers for host in self.hosts]
def apply_profile(self, profile, extra):
"""Apply profile on each host, report list of lists of workers"""
return self._step()(self._apply_profile)(profile, extra)
def _revert_profile(self):
CONTEXT.msg(f"REVERT profile {self.profile}")
# Collect information about the profile in case it was applied
if self.profile is not None:
env = []
for host in self.hosts:
try:
env.append(host.profile.get_info())
except Exception as details: # pylint: disable=W0703
env.append({"failure": f"Failed to get info: {details}"})
self.write_metadata(f"environment_profile_{self.profile}",
json.dumps(env))
# Allow 3 attempts, one to revert previous profile, one to apply
# and one extra in case one boot fails to get resources (eg. hugepages)
self.for_each_host_retry(3, self.hosts, 'revert_profile')
self.profile = None
def revert_profile(self):
"""Revert profile"""
return self._step()(self._revert_profile)()
@staticmethod
def _move_results(tmp_path):
base_path = os.path.dirname(tmp_path)
for i in range(10000):
try:
path = os.path.join(base_path, f"{i:04d}")
os.rename(tmp_path, path)
return path
except IOError:
pass
raise RuntimeError(f"Failed to create test output dir in {base_path} "
"in 10000 iterations.")
def run_test(self, test_class, workers, extra):
"""
Run a test
:param test_class: class to be instantiated and executed via this
controller
:param workers: list of workers to be made available for execution
"""
test = test_class(self.main_host, workers,
os.path.join(self._args.output, self.profile),
self.metadata, extra.copy())
name = test.name
CONTEXT.set(1, test.output, "Running test")
try:
test.setup()
test.run()
path = self._move_results(test.output)
CONTEXT.set(1, path, f"{name} FINISHED")
except exceptions.TestSkip as exc:
CONTEXT.msg(f"{name} SKIPPED: {exc}")
except Exception as exc:
CONTEXT.msg(f"{name} INTERRUPTED: {exc}")
raise
finally:
test.cleanup()
def cleanup(self):
"""Post-testing cleanup"""
CONTEXT.msg(f"CLEANUP hosts {self.hosts}")
self.for_each_host_retry(2, self.hosts, 'cleanup')
class Host(BaseMachine):
"""
Base object to leverage a machine
"""
def __init__(self, parent_log, name, addr, distro, args, hop=None):
super().__init__(parent_log.getChild(name), name, distro,
args.default_passwords)
self.addr = addr
self.hop = hop
self.shared_pub_key = None
self.reboot_request = False
self.profile = None
self._cleanup = []
self.workers = []
self.params = self._process_params(args)
self.guest_distro = args.guest_distro or args.distro
def setup(self):
"""
Prepare host
"""
if self.params.get("disable_smt"):
with self.get_session_cont() as session:
smt_control = session.cmd("cat /sys/devices/system/cpu/smt/"
"control").strip()
if smt_control != "forceoff":
session.cmd("echo 'off' > /sys/devices/system/cpu/smt/"
"control")
self.reboot_request = True
session.cmd("grubby --update-kernel=ALL "
"--args=nosmt=force")
def get_fullname(self):
if self.hop:
return self.hop.get_fullname() + '-' + self.addr
return self.addr
def get_addr(self):
"""Return addr as they are static"""
return self.addr
def get_host_addr(self):
"""Return our addr as we are the host"""
return self.addr
def get_ssh_cmd(self, hop=None):
"""By default use self.hop as the default hop"""
if hop is None and self.hop:
hop = self.hop
return BaseMachine.get_ssh_cmd(self, hop=hop)
def _process_params(self, args):
# Use args.paths to find yaml file for given machine
path_cfg = None
for path in args.paths:
path_cfg = os.path.join(path, 'hosts', self.addr + '.yaml')
if os.path.exists(path_cfg):
with open(path_cfg, encoding="utf-8") as cfg:
params = yaml.load(cfg, Loader=yaml.SafeLoader)
break
else:
params = {}
# Add --force-params overrides
if args.force_params and self.addr in args.force_params:
self.log.debug("Overriding params via --force-params")
params.update(args.force_params.get(self.addr))
if not HOST_KEYS.issubset(params):
self.log.warning("%s keys are not defined for %s. Define them "
"in %s or set via --force-params.",
", ".join(HOST_KEYS.difference(params)),
path_cfg, self.addr)
raise NotImplementedError("Implicit values for undefined hosts "
"not yet supported.")
# Add dynamic defaults
if 'arch' not in params:
params['arch'] = os.uname()[4]
return params
def __repr__(self):
return (f"{self.__class__.__name__}({self.name}, {self.addr}, "
f"{self.distro}, {self.profile})")
def generate_ssh_key(self):
"""
Generate/reuse ssh key in ~/.ssh/id_rsa
"""
with self.get_session_cont() as session:
if (session.cmd_status('[ -e ~/.ssh/id_rsa.pub ]') or
session.cmd_output('[ -e ~/.ssh/id_rsa ]')):
self._cleanup.append("ssh_keys")
session.cmd("rm -f ~/.ssh/id_rsa.pub "
"~/.ssh/id_rsa")
session.cmd('ssh-keygen -b 2048 -t rsa -f '
'~/.ssh/id_rsa -q -N ""')
return session.cmd_output('cat ~/.ssh/id_rsa.pub')
def run_script(self, script, timeout=3600):
"""
Runs a script on the machine
"""
with self.get_session_cont() as session:
tmp = session.cmd_output("mktemp").strip()
session.cmd(utils.shell_write_content_cmd(tmp, script, False))
session.cmd(f"sh -x {tmp}", timeout)
def reboot(self):
"""Gracefully reboot the machine"""
self.log.debug(" Rebooting...")
with self.get_session_cont() as session:
session.sendline("reboot")
time.sleep(30)
with self.get_session_cont(360):
# Just checking whether it's obtainable
pass
self.log.debug(" Reboot DONE")
self.reboot_request = False
def provision(self, provisioner):
"""Provision the machine"""
self.log.debug(" Provisioning using %s...", provisioner)
provisioner.provision(self)
self.log.debug(" Provisioning DONE")
def apply_profile(self, profile, extra, setup_script, rp_paths):
"""
Apply profile and set new workers
:param profile: name of the requested profile
:param setup_script: setup script to be executed on each worker setup
:param paths: paths to runperf assets
"""
self.profile = profiles.get(profile, extra, self, rp_paths)
CONTEXT.set(0, self.profile.name, f"Applying profile {profile}")
ret = self.profile.apply(setup_script)
if ret is True:
self.reboot_request = True
else:
self.workers = ret
def revert_profile(self):
"""Revert profile if any profile set"""
self.log.debug(" Reverting profile %s", self.profile)
if self.profile is None:
return
if self.profile.revert():
self.reboot_request = True
self.workers = []
self.profile = None
def cleanup(self):
"""Cleanup after testing"""
if self.profile is not None:
self.profile.revert()
if "ssh_key" in self._cleanup:
with self.get_session_cont() as session:
session.cmd_status("rm -f ~/.ssh/id_rsa.pub")
session.cmd_status("rm -f ~/.ssh/id_rsa")
self._cleanup = []
def get_info(self):
out = BaseMachine.get_info(self)
out["params"] = "\n".join(f"{_[0]}: {_[1]}"
for _ in sorted(self.params.items()))
return out
def fetch_logs(self, path):
"""Fetch important logs"""
if self.profile:
self.profile.fetch_logs(path)
self.log_fetcher.collect(path, self)
def __del__(self):
self.cleanup()
class LibvirtGuest(BaseMachine):
"""
Object representing libvirt guests
"""
_RE_IPADDR = re.compile(r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}')
XML_FILTERS = ((re.compile(r"<uuid>[^<]+</uuid>"), "UUID"),
(re.compile(r"<mac address=[^/]+/>"), "MAC"),
(re.compile(r"[\"']/var/lib/libvirt/[^\"']+[\"']"), "PATH"),
(re.compile(r"<seclabel.*?</seclabel>", flags=re.DOTALL),
"SECLABEL"),
(re.compile(r"portid=[\"'][^\"']+[\"']"), "PORTID"),
(re.compile(r"[\"']/dev/pts[^\"']*[\"']"), "PTS"),
(re.compile(r"\sid=['\"]\d+['\"]"), " ID"))
def __init__(self, host, name, distro, base_image, smp, mem,
default_passwords=None, extra_params=None):
"""
:param host: Host on which to define the VM
:param name: Name of the VM
:param distro: OS version installed on the image
:param image: Path to guest image
:param smp: Number of CPUs to be used by VM
:param mem: Amount of memory to be used by VM
"""
if extra_params is None:
extra_params = {}
_name = f"{host.name}.{name}"
super().__init__(host.log.getChild(name), _name, distro,
default_passwords)
self.host = host
self._host_session = None
self.base_image = base_image
self.smp = smp
self.mem = mem
self.extra_params = extra_params
self._re_running = re.compile(fr'\d+ +{self.name} +running')
self._addr = None
self._started = False
self.image = None
def get_fullname(self):
return self.host.get_fullname() + '-' + self.get_addr()
def get_ssh_cmd(self, hop=None):
"""By default use self.hop as the default hop"""
if hop is None:
hop = self.host
return BaseMachine.get_ssh_cmd(self, hop=hop)
def get_host_session(self):
"""
Get and cache host session.
This session will be cleaned automatically on ".cleanup()"
"""
if self._host_session and self._host_session.is_responsive():
return self._host_session
self._host_session = self.host.get_session()
return self._host_session
@staticmethod
def _get_os_variant_rhel(lower, oss):
out = "".join("".join(lower).split('-', 2)[:-1])
while True:
if re.search(fr"{out}$", oss, re.MULTILINE):
return out
if '.' not in out:
break
out = out.rsplit('.', 1)[0]
return "rhel8.0" # This should be the safest option
def _get_os_variant(self, session):
if not self.distro:
raise ValueError(f"No distro specified {self.distro}")
lower = self.distro.lower()
oss = session.cmd("osinfo-query os -f short-id", print_func="mute")
if lower in oss:
return lower
if lower.startswith('rhel'):
return self._get_os_variant_rhel(lower, oss)
no_underscore = lower.replace('-', '')
if no_underscore in oss:
return no_underscore
raise NotImplementedError(f"Unknown os_variant: {self.distro}")
def get_info(self):
out = BaseMachine.get_info(self)
xml = self.get_host_session().cmd(f"virsh dumpxml '{self.name}'",
print_func="mute",
ignore_all_errors=True)
out["libvirt_xml_raw"] = xml
for reg, repl in self.XML_FILTERS:
xml = reg.sub(repl, xml)
out["libvirt_xml"] = xml
return out
def _log_path(self, suffix):
return f"/var/log/libvirt/{os.path.basename(self.image)}_{suffix}"
def start(self):
"""
Define and start the VM
"""
if self.is_defined():
raise RuntimeError(f"VM {self.name} already running")
self._started = True
# Always re-create image from base
session = self.get_host_session()
fmt = self.extra_params.get("image_format", "qcow2")
src_fmt = self.base_image.rsplit('.', 1)[-1]
image = f"{self.base_image[:-(len(src_fmt) + 1)]}-{self.name}.{fmt}"
if fmt == src_fmt:
session.cmd(f"\\cp -f {self.base_image} {image}",
timeout=600)
else:
session.cmd(f"qemu-img convert -f {src_fmt} -O {fmt} "
f"{self.base_image} {image}",
timeout=600)
# System might get a bit laggy after huge-file copy, use sync to
# avoid unresponsive system
session.cmd("sync", timeout=600)
self.image = image
xml = self.extra_params.get("xml", None)
if xml:
session.cmd("cat << \\EOF | "
f"virt-xml --edit --disk path={image} | "
"virt-xml --edit --disk driver_type=raw | "
f"virt-xml --edit --metadata name={self.name} | "
f"virt-xml --edit --metadata uuid={uuid.uuid1()} > "
f"'{self._log_path('.xml')}'\n{xml}\nEOF")
else:
session.cmd(f"virt-install --import --disk '{self.image}' "
f"--memory '{self.mem}' --name '{self.name}' "
f"--os-variant '{self._get_os_variant(session)}' "
f"--vcpus '{self.smp}' --serial "
f"file,path='{self._log_path('_serial.log')}' "
f"{self.extra_params.get('virt-install-extra', '')} "
f"--dry-run --print-xml > '{self._log_path('.xml')}'")
if "qemu_bin" in self.extra_params:
session.cmd(f"echo -e 'cd /domain/devices/emulator\nset {self.extra_params['qemu_bin']}\n"
f"save' | xmllint --shell '{self._log_path('.xml')}'")
# Finally start the machine
session.cmd("chown -R qemu:qemu /dev/hugepages/")
session.cmd(f"virsh create '{self._log_path('.xml')}'")
def is_running(self):
"""Whether VM is running"""
out = self.get_host_session().cmd_output("virsh list")
return bool(self._re_running.search(out))
def is_defined(self):
"""Whether VM is defined (not necessary running)"""
out = self.get_host_session().cmd_output("virsh list --all")
return f" {self.name} " in out
def get_addr(self):
if self._addr is not None:
return self._addr
end = time.time() + 240
session = self.get_host_session()
out = ""
while time.time() < end:
out = session.cmd_output(f"virsh domifaddr {self.name}",
print_func='mute')
addrs = self._RE_IPADDR.findall(out)
if addrs:
self.log.debug(out)
self._addr = addrs[-1]
return self._addr
raise RuntimeError(f"Failed to get {self.name} IP addr in 240s: {out}")
def get_host_addr(self):
return self.host.get_addr()
def cleanup(self):
"""Destroy the machine and close host connection"""
errs = []
if not self._started:
if self._host_session:
self._host_session.close()
self._host_session = None
return
session = None
try:
session = self.get_host_session()
# Try graceful shutdown first
if (self.is_defined() and
session.cmd_status(f"virsh destroy '{self.name}' "
"--graceful")):
time.sleep(5)
# Double-check it does not exists and nuke it
if (self.is_defined() and
session.cmd_status(f"virsh destroy '{self.name}'")):
errs.append("destroy")
if (self.is_defined() and
session.cmd_status(f"virsh undefine '{self.name}'")):
errs.append("undefine")
if self.image:
session.cmd_status(f"rm -f '{self.image}' '{self.image}.xml' "
"'/var/log/libvirt/"
f"{os.path.basename(self.image)}"
"_serial.log'")
self._started = False
if errs:
raise RuntimeError(f"Cleanup of {errs} failed")
finally:
if session:
session.close()
def __del__(self):
self.cleanup()