selftests/functional/utils/process.py
import os
import stat
import sys
import time
from avocado.utils import process
from selftests.utils import TestCaseTmpDir, skipOnLevelsInferiorThan
# What is commonly known as "0775" or "u=rwx,g=rwx,o=rx"
DEFAULT_MODE = (
stat.S_IRUSR
| stat.S_IWUSR
| stat.S_IXUSR
| stat.S_IRGRP
| stat.S_IWGRP
| stat.S_IXGRP
| stat.S_IROTH
| stat.S_IXOTH
)
FAKE_VMSTAT_CONTENTS = f"""#!{sys.executable}
import time
import random
import signal
import sys
class FakeVMStat:
def __init__(self, interval, interrupt_delay=0):
self.interval = interval
self._sysrand = random.SystemRandom()
def interrupt_handler(signum, frame):
time.sleep(interrupt_delay)
sys.exit(0)
signal.signal(signal.SIGINT, interrupt_handler)
signal.signal(signal.SIGTERM, interrupt_handler)
def get_r(self):
return self._sysrand.randint(0, 2)
def get_b(self):
return 0
def get_swpd(self):
return 0
def get_free(self):
return self._sysrand.randint(1500000, 1600000)
def get_buff(self):
return self._sysrand.randint(290000, 300000)
def get_cache(self):
return self._sysrand.randint(2900000, 3000000)
def get_si(self):
return 0
def get_so(self):
return 0
def get_bi(self):
return self._sysrand.randint(0, 50)
def get_bo(self):
return self._sysrand.randint(0, 500)
def get_in(self):
return self._sysrand.randint(200, 3000)
def get_cs(self):
return self._sysrand.randint(1000, 4000)
def get_us(self):
return self._sysrand.randint(0, 40)
def get_sy(self):
return self._sysrand.randint(1, 5)
def get_id(self):
return self._sysrand.randint(50, 100)
def get_wa(self):
return 0
def get_st(self):
return 0
def start(self):
print("procs -----------memory---------- ---swap-- -----io---- -system-- ------cpu-----")
print(" r b swpd free buff cache si so bi bo in cs us sy id wa st")
while True:
r = self.get_r()
b = self.get_b()
swpd = self.get_swpd()
free = self.get_free()
buff = self.get_buff()
cache = self.get_cache()
si = self.get_si()
so = self.get_so()
bi = self.get_bi()
bo = self.get_bo()
m_in = self.get_in()
cs = self.get_cs()
us = self.get_us()
sy = self.get_sy()
m_id = self.get_id()
wa = self.get_wa()
st = self.get_st()
print("%2d %2d %2d %7d %6d %7d %1d %1d %2d %3d %4d %2d %2d %1d %3d %1d %1d" %
(r, b, swpd, free, buff, cache, si, so, bi, bo, m_in, cs,
us, sy, m_id, wa, st))
time.sleep(self.interval)
if __name__ == '__main__':
vmstat = FakeVMStat(interval=float(sys.argv[1]), interrupt_delay=float(sys.argv[2]))
vmstat.start()
"""
FAKE_UPTIME_CONTENTS = f"""#!{sys.executable}
if __name__ == '__main__':
print("17:56:34 up 8:06, 7 users, load average: 0.26, 0.20, 0.21")
"""
class ProcessTest(TestCaseTmpDir):
def setUp(self):
super().setUp()
self.fake_vmstat = os.path.join(self.tmpdir.name, "vmstat")
with open(self.fake_vmstat, "w", encoding="utf-8") as fake_vmstat_obj:
fake_vmstat_obj.write(FAKE_VMSTAT_CONTENTS)
os.chmod(self.fake_vmstat, DEFAULT_MODE)
self.fake_uptime = os.path.join(self.tmpdir.name, "uptime")
with open(self.fake_uptime, "w", encoding="utf-8") as fake_uptime_obj:
fake_uptime_obj.write(FAKE_UPTIME_CONTENTS)
os.chmod(self.fake_uptime, DEFAULT_MODE)
@skipOnLevelsInferiorThan(2)
def test_process_start(self):
"""
:avocado: tags=parallel:1
"""
proc = process.SubProcess(f"{self.fake_vmstat} 1 0")
proc.start()
time.sleep(3)
proc.terminate()
proc.wait(timeout=1)
stdout = proc.get_stdout().decode()
self.assertIn("memory", stdout, f"result: {stdout}")
self.assertRegex(stdout, "[0-9]+")
@skipOnLevelsInferiorThan(2)
def test_process_stop_interrupted(self):
"""
:avocado: tags=parallel:1
"""
proc = process.SubProcess(f"{self.fake_vmstat} 1 3")
proc.start()
time.sleep(3)
proc.stop(2)
result = proc.result
self.assertIn("timeout after", result.interrupted, "Process wasn't interrupted")
@skipOnLevelsInferiorThan(2)
def test_process_stop_uninterrupted(self):
"""
:avocado: tags=parallel:1
"""
proc = process.SubProcess(f"{self.fake_vmstat} 1 3")
proc.start()
time.sleep(3)
proc.stop(4)
result = proc.result
self.assertFalse(result.interrupted, "Process was interrupted to early")
def test_process_run(self):
proc = process.SubProcess(self.fake_uptime)
result = proc.run()
self.assertEqual(result.exit_status, 0, f"result: {result}")
self.assertIn(b"load average", result.stdout)