runperf/tests.py
#!/bin/env python3
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
#
# See LICENSE for more details.
#
# Copyright: Red Hat Inc. 2018
# Author: Lukas Doktor <ldoktor@redhat.com>
import json
import os
import pipes
import re
import tempfile
import time
from . import exceptions
from . import utils
from .utils import pbench
class BaseTest:
"""Base implementation of a Test class"""
name = ""
min_groups = 1
def __init__(self, host, workers, base_output_path, metadata, extra):
name = extra.pop('__NAME__', None)
if not name:
name = self.name
self.name = utils.string_to_safe_path(name)
self.host = host
self.workers = workers
base_output_path = os.path.join(base_output_path, self.name)
if not os.path.exists(base_output_path):
os.makedirs(base_output_path)
self.output = tempfile.mkdtemp(prefix="tmp", dir=base_output_path)
self.metadata = metadata
def setup(self):
"""
Allow extra steps before test execution
"""
def _all_machines_kmsg(self, msg):
"""
Log a message on all workers' as well as host's kmsg
"""
sessions = []
try:
sessions.append(self.host.get_session())
for workers in self.workers:
for worker in workers:
sessions.append(worker.get_session(hop=self.host))
msg = f"C{time.time():.0f}: {self.host.profile.name}: {msg}"
cmd = f"echo runperf: W$(date +%s): {pipes.quote(msg)} > /dev/kmsg"
for session in sessions:
session.sendline(cmd)
for session in sessions:
# Ensure all commands finished
session.cmd("true")
finally:
for session in sessions:
session.close()
def run(self):
"""Run the testing"""
if len(self.workers) < self.min_groups:
msg = (f"Not enough groups of workers ({len(self.workers)} < "
"{self.min_groups})")
with open(os.path.join(self.output, "SKIP"), 'w',
encoding="utf-8") as skip:
skip.write(msg)
raise exceptions.TestSkip("msg")
self._all_machines_kmsg(f"Starting test {self.name}")
return self._run()
def _run(self):
"""
Deploy, run and fetch results to self.output
"""
raise NotImplementedError
def inject_metadata(self, session, path):
"""
Add our "RUNPERF_METADATA.json" to the dirname($path) in order to
preserve our extended data (especially profile, workers and such...)
:param session: Session to the worker
:param path: Path where the results should be located
"""
meta = {}
for key, value in self.metadata.items():
meta[key] = value
meta['distro'] = self.host.distro
meta['profile'] = self.host.profile.name
str_workers = {}
for i, workers in enumerate(self.workers):
str_workers[i] = {worker.name: worker.get_info()
for worker in workers}
meta['workers'] = str_workers
dir_path = os.path.dirname(path)
if session.cmd_status(f"[ -d '{dir_path}' ]") == 0:
result_path = os.path.join(dir_path, "RUNPERF_METADATA.json")
results_json = json.dumps(meta, indent=4, sort_keys=True)
utils.shell_write_content(
lambda cmd: session.cmd(cmd, timeout=600, print_func='mute'),
result_path, results_json)
def cleanup(self):
"""
Cleanup the environment; is **always** executed even for SKIP tests
"""
class DummyTest(BaseTest):
"""
Dummy test intended for selftesting
"""
name = "DummyTest"
def _run(self):
result_path = os.path.join(self.output, "result.json")
with open(result_path, 'w', encoding="utf-8") as result:
with open(os.path.join(os.path.dirname(__file__), "assets",
"tests", "DummyTest",
"result.json"),
encoding="utf-8") as src:
result.write(src.read() % {"hostname": self.host.get_addr()})
with self.host.get_session_cont() as session:
self.inject_metadata(session, result_path)
class PBenchTest(BaseTest):
"""
Pbench test
Metadata: pbench_server - set the pbench-server-url
Metadata: pbench_server_publish - publish results to pbench server
"""
test = ""
args = ""
default_args = ()
timeout = 172800
watchdog_timeout = 0
def __init__(self, host, workers, base_output_path,
metadata, extra):
super().__init__(host, workers, base_output_path, metadata, extra)
if "pbench_server_publish" in self.metadata:
self.pbench_publish = True
else:
self.pbench_publish = False
pbench_tools = extra.pop("pbench_tools", None)
if not pbench_tools:
pbench_tools = metadata.get("pbench_tools", None)
if pbench_tools:
pbench_tools = json.loads(pbench_tools)
else:
pbench_tools = ["sar:--interval=3", "iostat:--interval=3",
"mpstat:--interval=3",
"proc-interrupts:--interval=3",
"proc-vmstat:--interval=3"]
self.pbench_tools = pbench_tools
for key, value in self.default_args:
if key not in extra:
extra[key] = value
# Using sorted to always use the same cmdline
for key, value in sorted(extra.items()):
# Replace special values
# Skip "__*" keys
if key.startswith("__"):
continue
if key == "runtime":
# Allow 2-times runtime output stalls, or 180s to avoid failing
# on sysinfo collection
self.watchdog_timeout = max(360, int(value) * 3)
# __PER_WORKER_CPUS__ == no cpus perf worker
if value == "__PER_WORKER_CPUS__":
for _workers in self.workers:
if len(_workers):
value = int(int(self.host.params["guest_cpus"]) /
len(_workers))
break
else:
raise RuntimeError("Unable to get number of workers from "
f" {self.workers!r}")
self.args += f" --{key}={value}"
self._cmd = (f"pbench-{self.test} {self.args} "
f"--clients={','.join(_.get_addr() for _ in self.workers[0])}")
def setup(self):
def install_pbench(host, metadata, test):
with host.get_session_cont() as session:
session.runperf_stage("Setup pbench")
pbench.install_on(session, metadata, test=test)
threads = []
remotes = set()
for host_workers in self.workers:
if self.host in host_workers:
# When host is also in workers, perform install first on host
install_pbench(self.host, self.metadata, self.test)
break
else:
name = f"host {self.host.name}"
remotes.add(self.host)
threads.append(utils.ThreadWithStatus(target=install_pbench,
name=name,
args=(self.host,
self.metadata,
self.test)))
for workers in self.workers:
for worker in workers:
remotes.add(worker)
name = f"worker {worker.name}"
threads.append(utils.ThreadWithStatus(target=install_pbench,
name=name,
args=(worker,
self.metadata,
self.test)))
for thread in threads:
thread.start()
for thread in threads:
thread.join()
failed = [thread for thread in threads if thread.completed is not True]
if failed:
for thread in failed:
if thread.exc:
raise RuntimeError("Failed to install pbench on "
f"{failed}") from thread.exc
raise RuntimeError(f"Failed to install pbench on {failed}")
# Register the tools for all workers
with self.host.get_session_cont() as session:
pbench.register_tools(session, self.pbench_tools, remotes)
self._wait_for_workers_calm_down()
def _wait_for_workers_calm_down(self):
"""
Wait for the machines to calm down before the testing and use
hop=self.host as the host will be executing the ssh commands.
"""
for workers in self.workers:
for worker in workers:
with worker.get_session_cont(hop=self.host) as session:
if not utils.wait_for_machine_calms_down(session,
timeout=1800):
worker.log.warning("Worker did not stabilize in 1800s,"
" proceeding on a loaded machine!")
with self.host.get_session_cont(hop=self.host) as session:
if not utils.wait_for_machine_calms_down(session, timeout=1800):
self.host.log.warning("Host did not stabilize in 1800s,"
" proceeding on a loaded machine!")
def _pbench_destructive_cleanup_on_failure(self, session):
"""
In case of a failure pbench might leave some processes behind,
use this drastic method to ensure they won't spoil future testing.
Note the list might not be complete but it should prevent hangs
related to multiple TMs running.
https://github.com/distributed-system-analysis/pbench/issues/2625
"""
nuke_cmd = ("for NAME in pbench-tool-meister-start redis-server "
"gpg-agent scdaemon fio uperf linpack; do "
"killall -9 $NAME; done")
session.cmd_status(nuke_cmd)
for workers in self.workers:
for worker in workers:
with worker.get_session_cont(hop=self.host) as wsession:
wsession.cmd_status(nuke_cmd)
@staticmethod
def _run_with_watchdog(cmd, session, timeout, watchdog_timeout):
"""
Run a command while reading it's output ensuring it keeps producing
output at least every `watchdog_timeout`s
"""
session.read_nonblocking(0.2, 10)
session.sendline(cmd)
end_time = time.time() + timeout
watchdog_next = time.time() + watchdog_timeout
re_prompt = re.compile(session.prompt)
output = ""
while time.time() < end_time:
out = session.read_nonblocking(1, min(1, end_time - time.time()))
if not out:
if watchdog_next <= time.time():
raise RuntimeError(f"Output of {cmd} stalled for more "
f"than {watchdog_timeout}s. Output so "
"far:\n\n" + output)
continue
output += out
nonempty_lines = [_ for _ in output.splitlines() if _.strip()]
if nonempty_lines and re_prompt.search(nonempty_lines[-1]):
return output
watchdog_next = time.time() + watchdog_timeout
raise RuntimeError(f"{cmd} execution took longer than the {timeout}s. "
"output so far:\n\n" + output)
def _run(self):
# We only need one group of workers
src = None
try:
with self.host.get_session_cont() as session:
session.cmd("true")
session.runperf_stage("Run pbench")
benchmark_bin = utils.shell_find_command(session, self.test)
if benchmark_bin:
prefix = f"benchmark_bin={benchmark_bin} "
else:
prefix = ""
# FIXME: Return this when https://github.com/distributed
# -system-analysis/pbench/issues/1743 is resolved
session.cmd(". /opt/pbench-agent/base")
if self.watchdog_timeout:
# Run the test while checking the output for stalls
cmd = prefix + self._cmd
self.host.log.debug("Sending command: %s", cmd)
self._run_with_watchdog(cmd, session,
self.timeout,
self.watchdog_timeout)
else:
session.cmd_output(prefix + self._cmd,
timeout=self.timeout)
# Let the system to rest a bit after heavy load
session.read_nonblocking(5)
ret = session.cmd_output(session.status_test_command, 10)
digit_lines = [line for line in ret.splitlines()
if line.strip().isdigit()]
if digit_lines:
if int(digit_lines[0].strip()) != 0:
self._pbench_destructive_cleanup_on_failure(session)
raise RuntimeError(f"Execution failed {digit_lines} ("
"redis and pbench TM were "
"forcefully destroyed, ensure "
"your workloads were not affected)")
else:
raise RuntimeError("Failed to get status")
session.runperf_stage("Postprocess pbench")
src = session.cmd_output("echo $(ls -dt /var/lib/pbench-agent/"
f"{self.test}__*/ | "
"head -n 1)").strip()
self.inject_metadata(session, os.path.join(src, "result.json"))
if self.pbench_publish:
extra_args = []
user = self.metadata.get("project")
if user:
extra_args.append(f"--user {user}")
prefix = self.metadata.get("build")
if prefix:
extra_args.append(f"--prefix {prefix}")
session.cmd(f"pbench-copy-results {' '.join(extra_args)}",
timeout=600)
self.host.copy_from(src, self.output)
except Exception:
if src:
self.host.copy_from(src, self.output)
raise
class PBenchFio(PBenchTest):
"""Default fio benchmark (read)"""
name = "fio"
test = "fio"
default_args = (("test-types", "read,write,rw"),
("ramptime", 10),
("runtime", 180),
("samples", 3))
class Linpack(PBenchTest):
"""linpack test"""
name = "linpack"
test = "linpack"
default_args = (("samples", 3),)
def __init__(self, host, workers, base_output_path, metadata, extra):
if "linpack-binary" in extra:
self._detect_linpack_bin = False
if "threads" not in extra:
# We want 2*cpus to stress the scheduler
extra["threads"] = utils.list_of_threads(
host.params["guest_cpus"] * 2)
PBenchTest.__init__(self, host, workers, base_output_path, metadata,
extra)
def _run(self):
# For pbench-agent<=0.69 use pbench-run-benchmark to support clients
with self.host.get_session_cont() as session:
pbench_help = session.cmd_output("pbench-linpack -h")
# When linpack is not specified by the user we need to detect
# and append it now as it was probably installed during
# `setup()`
linpack_bin = None
for name in ("linpack", "xlinpack_xeon64"):
linpack_bin = utils.shell_find_command(session, name)
if linpack_bin:
break
if not linpack_bin:
linpack_bin = session.cmd_output(
"ls /usr/local/*/benchmarks/linpack/"
"xlinpack_xeon64 2>/dev/null").strip()
if not linpack_bin:
raise exceptions.TestSkip("No linpack binary found"
" on host")
linpack_bin = linpack_bin.splitlines()[0]
if '--clients' in pbench_help:
linpack_dir = os.path.dirname(linpack_bin)
self._cmd = f"linpack_dir={linpack_dir} {self._cmd}"
else:
pbench_args = (self._cmd.split(' ', 1)[1]
.replace('--samples=', '--run-samples='))
self._cmd = ("ANSIBLE_HOST_KEY_CHECKING=false "
"ANSIBLE_PYTHON_INTERPRETER=/usr/bin/python3 "
f"pbench-run-benchmark {self.test} "
f"{pbench_args}")
self._cmd += f" --linpack-binary='{linpack_bin}'"
PBenchTest._run(self)
class UPerf(PBenchTest):
"""
Uperf test
By default executes tcp stream test. If you need to test udp we strongly
suggest also setting type=rr, otherwise it's not guaranteed the packets
are not plainly dropped.
"""
name = "uperf"
test = "uperf"
default_args = (("test-types", "stream"),
("runtime", 60),
("samples", 3),
("protocols", "tcp"),
("message-sizes", "1,64,16384"))
def __init__(self, host, workers, base_output_path, metadata, extra):
super().__init__(host, workers, base_output_path, metadata, extra)
# FIXME: Workaround missing perl paths
self._cmd = ("PERL5LIB=/opt/pbench-agent/tool-scripts/postprocess/:"
"/opt/pbench-agent/bench-scripts/postprocess/ "
f"{self._cmd}")
# FIXME: Ugly IPv4-libvirt-bridge-only hack to use main host
addrs = []
for worker in self.workers[0]:
addr = worker.get_host_addr()
utils.ssh_copy_id(self.host.log, addr, host.default_passwords,
self.host)
addrs.append(addr)
self._cmd += (f" --servers {','.join(addrs)}")
class PBenchNBD(PBenchFio):
"""
Executes PBenchFio with a custom job to test nbd
By default it creates and distributes the job-file using "nbd-check.fio"
from assets but you can override the job-file path and distribute your
own version. In such case you have to make sure to use the right paths
and format.
"""
name = "fio-nbd"
default_args = (("numjobs", 4),
("job-file", "/var/lib/runperf/runperf-nbd/nbd.fio"))
base_path = "/var/lib/runperf/runperf-nbd/"
def __init__(self, host, workers, base_output_path, metadata, extra):
self.fio_job_file = extra.get("job-file", self.base_path + "nbd.fio")
super().__init__(host, workers, base_output_path, metadata, extra)
def setup(self):
PBenchFio.setup(self)
with open(os.path.join(os.path.dirname(__file__), "assets", "pbench",
"nbd-check.fio"),
encoding="utf-8") as fio_check:
fio_check_tpl = utils.shell_write_content_cmd(self.base_path +
"nbd-check.fio",
fio_check.read())
with open(os.path.join(os.path.dirname(__file__), "assets", "pbench",
"nbd.fio"),
encoding="utf-8") as fio:
fio_tpl = utils.shell_write_content_cmd(self.fio_job_file,
fio.read())
for workers in self.workers:
for worker in workers:
with worker.get_session_cont() as session:
session.runperf_stage("Start NBD listener")
session.cmd("mkdir -p " + self.base_path)
session.cmd(fio_check_tpl)
ret = session.cmd_status(
f"fio --parse-only {self.base_path}/nbd-check.fio")
if ret:
raise exceptions.TestSkip(
f"Fio {session.cmd('which fio')} does not support "
f"ioengine=nbd on worker {worker}")
session.cmd("dd bs=1M count=256 if=/dev/urandom "
f"of='{self.base_path}/disk.img'")
session.cmd(f"nohup qemu-nbd -t -k {self.base_path}/socket"
f" -f raw {self.base_path}/disk.img &> "
f"$(mktemp {self.base_path}/qemu_nbd_XXXX.log)"
f" & echo $! >> {self.base_path}/kill_pids")
# Sometimes nohup is not enough, use disown
session.cmd(f"for PID in $(cat {self.base_path}/kill_pids)"
"; do disown -h $PID; done")
with self.host.get_session_cont(hop=self.host) as session:
session.cmd("mkdir -p " + self.base_path)
session.cmd(fio_tpl)
def cleanup(self):
for workers in self.workers:
for worker in workers:
with worker.get_session_cont() as session:
pids = session.cmd(f"cat {self.base_path}/kill_pids "
"2>/dev/null || true")
for pid in pids.splitlines():
session.cmd_status(f"kill -9 '{pid}'")
session.cmd("rm -Rf " + self.base_path)
with self.host.get_session_cont(hop=self.host) as session:
session.cmd(f"rm -Rf {self.base_path}")
PBenchFio.cleanup(self)
class PBenchLibblkio(PBenchFio):
"""
Executes PBenchFio with a custom job to test libblkio
By default it creates and distributes the job-file using "libblkio.fio"
from assets but you can override the job-file path and distribute your
own version. In such case you have to make sure to use the right paths
and format.
On top of the default params it also supports:
* hipri - sets corresponding fio option in the file (single value 0 or 1)
* target - changes the exported target path (by default /dev/ram0)
* __SETUP_RAMDISK__ - whether to setup ramdisk by "modprobe brd ..."
* __STORAGE_DAEMON_CMD__ - override the usual "qemu-storage-daemon" cmd
(you have to use %s as the target filename to be replaced here)
"""
name = "fio-libblkio"
default_args = (("numjobs", 1),
("job-file", "/var/lib/runperf/runperf-libblkio/"
"libblkio.fio"))
base_path = "/var/lib/runperf/runperf-libblkio/"
def __init__(self, host, workers, base_output_path, metadata, extra):
self._params = {}
self._params["hipri"] = extra.pop("hipri", 0)
self._params["target"] = extra.pop("target", "/dev/ram0")
self._params["setup_ramdisk"] = extra.pop("__SETUP_RAMDISK__", True)
socket_path = os.path.join(self.base_path, "vhost-user-blk.sock")
storage_daemon_cmd = extra.pop(
"__STORAGE_DAEMON_CMD__", "qemu-storage-daemon --blockdev "
"driver=host_device,node-name=file,filename=%s,cache.direct=on "
"--object iothread,id=iothread0 "
"--export type=vhost-user-blk,iothread=iothread0,id=export,"
"node-name=file,addr.type=unix,addr.path=%s,"
"writable=on")
try:
self._params["storage_daemon_cmd"] = (storage_daemon_cmd
% (self._params["target"],
socket_path))
except TypeError as details:
raise exceptions.TestSkip("Failed to format __STORAGE_DAEMON_CMD__"
"; expecting 2 %s (target and socket)"
% details)
self.fio_job_file = extra.get("job-file", self.base_path +
"libblkio.fio")
super().__init__(host, workers, base_output_path, metadata, extra)
def setup(self):
PBenchFio.setup(self)
with open(os.path.join(os.path.dirname(__file__), "assets", "pbench",
"libblkio.fio"),
encoding="utf-8") as fio:
fio_tpl = utils.shell_write_content_cmd(
self.fio_job_file, fio.read() % self._params["hipri"])
for workers in self.workers:
for worker in workers:
with worker.get_session_cont() as session:
session.runperf_stage("Start libblkio export")
if self._params.get("setup_ramdisk"):
session.cmd("modprobe brd rd_nr=1 rd_size=1048576 "
"max_part=0")
session.cmd("mkdir -p " + self.base_path)
session.cmd(f"nohup {self._params['storage_daemon_cmd']} "
"&> "
f"$(mktemp {self.base_path}/libblkio_XXXX.log)"
f" & echo $! >> {self.base_path}/kill_pids")
# Sometimes nohup is not enough, use disown
session.cmd(f"for PID in $(cat {self.base_path}/kill_pids)"
"; do disown -h $PID; done")
with self.host.get_session_cont(hop=self.host) as session:
session.cmd("mkdir -p " + self.base_path)
session.cmd(fio_tpl)
def cleanup(self):
for workers in self.workers:
for worker in workers:
with worker.get_session_cont() as session:
pids = session.cmd(f"cat {self.base_path}/kill_pids "
"2>/dev/null || true")
for pid in pids.splitlines():
session.cmd_status(f"kill -9 '{pid}'")
if self._params.get("setup_ramdisk"):
session.cmd_status("rmmod brd")
session.cmd("rm -Rf " + self.base_path)
with self.host.get_session_cont(hop=self.host) as session:
session.cmd(f"rm -Rf {self.base_path}")
PBenchFio.cleanup(self)
def get(name, extra):
"""
Get list of test classes based on test name
:param test_name: Test name optionally followed by ':' and extra params
:return: instance that allow performing the test and extra params
"""
return (utils.named_entry_point('runperf.tests', name), extra)