python/cmdoplib/cmdoplib.cache.xsh
# python module for commands with extension modules usage: tacklelib, yaml
tkl_import_module(TACKLELIB_ROOT, 'tacklelib.cache.py', 'tkl')
tkl_import_module(TACKLELIB_ROOT, 'tacklelib.sig.py', 'tkl')
tkl_import_module(TACKLELIB_ROOT, 'tacklelib.io.py', 'tkl')
tkl_import_module(TACKLELIB_ROOT, 'tacklelib.utils.py', 'tkl')
tkl_source_module(CMDOPLIB_ROOT, 'cmdoplib.std.xsh')
tkl_source_module(CMDOPLIB_ROOT, 'cmdoplib.yaml.xsh')
import os, psutil, time, re
class RootDirCache(tkl.FileCache):
def __init__(self, cache_file):
if cache_file.find('.') >= 0:
raise Exception('cache_file must not contain dot (`.`) characters')
tkl.FileCache.__init__(self, 'cmdoplib.' + cache_file, app_cache_dir = LOCAL_CACHE_ROOT, no_logger_warnings = True)
class ServiceProcCache(RootDirCache):
def __init__(self):
RootDirCache.__init__(self, 'service_proc')
def cache_print_proc_list_header(column_names, column_widths, fmt_str = '{:<{}} {:<{}}'):
print(' ' + fmt_str.format(
*(i for j in [(column_name, column_width) for column_name, column_width in zip(column_names, column_widths)] for i in j)
))
text = ''
for column_width in column_widths:
if len(text) > 0:
text += ' '
text += (column_width * '=')
print(' ' + text)
def cache_print_proc_list_row(row_values, column_widths, fmt_str = '{:<{}} {:<{}}'):
print(' ' + fmt_str.format(
*(i for j in [(row_value, column_width) for row_value, column_width in zip(row_values, column_widths)] for i in j)
))
def cache_print_proc_list_footer(column_widths):
text = ''
for column_width in column_widths:
if len(text) > 0:
text += ' '
text += (column_width * '-')
print(' ' + text)
def cache_close_running_procs(procs, service_proc_cache, proc_sigterm_wait_timeout_sec = 5, proc_sigkill_wait_timeout_sec = 1):
if len(procs) > 0:
print('- Closing running processes with timeout={{SIGTERM={}, SIGKILL={}}} secs:'.format(proc_sigterm_wait_timeout_sec, proc_sigkill_wait_timeout_sec))
proc_column_fmt = '{:<{}} {:<{}} {:<{}} {:<{}}'
proc_column_names = ['<pid>', '<proc_name>', '<exit_code>', '<status>']
proc_column_widths = [8, 24, 12, 32]
cache_print_proc_list_header(proc_column_names, proc_column_widths, proc_column_fmt)
def on_proc_sigterm(proc):
proc_row_values = [proc.pid, proc.name(), proc.returncode, 'terminated by <SIGTERM>']
cache_print_proc_list_row(proc_row_values, proc_column_widths, proc_column_fmt)
del service_proc_cache[str(proc.pid)]
service_proc_cache.sync() # sync immediately
def on_proc_sigkill(proc):
proc_row_values = [proc.pid, proc.name(), proc.returncode, 'terminated by <SIGTKILL>']
cache_print_proc_list_row(proc_row_values, proc_column_widths, proc_column_fmt)
del service_proc_cache[str(proc.pid)]
service_proc_cache.sync() # sync immediately
def on_proc_sigkill_ignore(proc):
proc_row_values = [proc.pid, proc.name(), '-', 'running, <SIGTKILL> ignored']
cache_print_proc_list_row(proc_row_values, proc_column_widths, proc_column_fmt)
# send SIGTERM
for proc_id, proc in procs.items():
try:
proc.terminate()
except psutil.NoSuchProcess:
pass
gone_procs, alive_procs = psutil.wait_procs(procs.values(), timeout = proc_sigterm_wait_timeout_sec, callback = on_proc_sigterm)
if len(alive_procs) > 0:
# send SIGKILL
for proc in alive_procs:
try:
p.kill()
except psutil.NoSuchProcess:
pass
gone_procs, alive_procs = psutil.wait_procs(alive_procs, timeout = proc_sigkill_wait_timeout_sec, callback = on_proc_sigkill)
if len(alive_procs) > 0:
# report ignored
for proc in alive_procs:
on_proc_sigkill_ignore(proc)
cache_print_proc_list_footer(proc_column_widths)
def cache_init_service_proc(service_proc_cache):
print('- Initializing service processes cache...')
all_procs = {}
for proc in psutil.process_iter(attrs=['pid', 'name']):
try:
pinfo = proc.as_dict(attrs=['pid', 'name'])
except psutil.NoSuchProcess:
pass
else:
all_procs[int(pinfo['pid'])] = pinfo['name']
# Iterate over process records to:
# 1. Close processes which are created by already unexisted python.exe process id.
# 2. Remove process records which python.exe process has been closed.
#
print('- Reading service processes cache:')
running_orphan_procs = {}
# Format:
# Key: service or background process pid
# Value: (<python_pid>, <proc_name>, <proc_exe>)
#
proc_column_fmt = '{:<{}} {:<{}} {:<{}} {:<{}}'
proc_column_names = ['<pid>', '<proc_name>', '<proc_exe>', '<status>']
proc_column_widths = [8, 24, 64, 32]
cache_print_proc_list_header(proc_column_names, proc_column_widths, proc_column_fmt)
for svc_proc_key, svc_proc_value in dict(service_proc_cache).items():
all_proc_ids = all_procs.keys()
svc_proc_id = int(svc_proc_key)
svc_proc_name = svc_proc_value[1]
svc_proc_exe = svc_proc_value[2]
if svc_proc_id in all_proc_ids:
try:
running_svc_proc = psutil.Process(svc_proc_id)
running_svc_proc_exe = running_svc_proc.exe()
if tkl.compare_file_paths(running_svc_proc_exe, svc_proc_exe):
python_proc_id = int(svc_proc_value[0])
if python_proc_id not in all_proc_ids:
proc_row_values = [svc_proc_id, running_svc_proc.name(), running_svc_proc_exe, 'running, orphan']
cache_print_proc_list_row(proc_row_values, proc_column_widths, proc_column_fmt)
running_orphan_procs[svc_proc_id] = running_svc_proc
else:
proc_row_values = [svc_proc_id, running_svc_proc.name(), running_svc_proc_exe, 'running, controlled']
cache_print_proc_list_row(proc_row_values, proc_column_widths, proc_column_fmt)
except psutil.NoSuchProcess:
proc_row_values = [svc_proc_id, svc_proc_name, svc_proc_exe, 'not found, removed']
cache_print_proc_list_row(proc_row_values, proc_column_widths, proc_column_fmt)
del service_proc_cache[svc_proc_key]
service_proc_cache.sync() # sync immediately
pass
except psutil.AccessDenied:
proc_row_values = [svc_proc_id, running_svc_proc.name(), '?' + svc_proc_exe, 'access denied, ignored']
cache_print_proc_list_row(proc_row_values, proc_column_widths, proc_column_fmt)
else:
proc_row_values = [svc_proc_id, svc_proc_name, svc_proc_exe, 'not found, removed']
cache_print_proc_list_row(proc_row_values, proc_column_widths, proc_column_fmt)
del service_proc_cache[svc_proc_key]
service_proc_cache.sync() # sync immediately
cache_print_proc_list_footer(proc_column_widths)
cache_close_running_procs(running_orphan_procs, service_proc_cache)
executed_procs = {}
if SVN_SSH_ENABLED and GIT_SVN_ENABLED or GIT_SSH_ENABLED:
current_proc_id = os.getpid()
print('- Starting service processes:')
proc_column_fmt = '{:<{}} {:<{}} {:<{}} {:<{}}'
proc_column_names = ['<pid>', '<proc_name>', '<proc_exe>', '<status>']
proc_column_widths = [8, 24, 64, 32]
cache_print_proc_list_header(proc_column_names, proc_column_widths, proc_column_fmt)
# NOTE:
# Use `w+t` instead of `w+b` to enable implicit line endings convertion.
#
stdout_iostr = tkl.TmpFileIO('w+t')
stderr_iostr = tkl.TmpFileIO('w+t')
# use signals delayer to delay a user interruption in a critical code
call_proc_id = None
call_proc = None
with tkl.DelayedSigInterrupt((tkl.signal.SIGINT, tkl.signal.SIGTERM)):
ret = call('${GIT_SVN_SSH_AGENT}', [], stdout = stdout_iostr, stderr = stderr_iostr, in_bg = True)
# WORKAROUND:
# In case of redirection the process id can be not a target process id but intermediate process, where the child is our target process.
# We must reread a process list and search the id as a parent process id to extract a child process as a target process.
#
# open immediately in case of interemediate process
call_proc_id = ret.proc.pid # this is not a target process, but intermediate process
try:
call_proc = psutil.Process(call_proc_id)
except psutil.NoSuchProcess:
# too late, process is already closed
pass
cmd_expr_expanded = get_default_call_cmd_expr_expander()('${GIT_SVN_SSH_AGENT}')
# scan for a child process while a timeout
child_proc_search_timeout_sec = 3
prev_time = time.time()
is_child_proc_found = False
while True:
all_child_procs = {}
for proc in psutil.process_iter(attrs=['pid', 'ppid', 'name']):
try:
pinfo = proc.as_dict(attrs=['pid', 'ppid', 'name'])
except psutil.NoSuchProcess:
pass
else:
proc_id = int(pinfo['pid'])
proc_parent_id = int(pinfo['ppid'])
proc_name = pinfo['name']
all_child_procs[proc_id] = (proc_parent_id, proc_name)
for child_proc_id, child_proc_value in all_child_procs.items():
child_proc_parent_id = child_proc_value[0]
child_proc_name = child_proc_value[1]
if child_proc_parent_id == call_proc_id:
# retest process on existence
child_proc = None
try:
child_proc = psutil.Process(child_proc_id)
except psutil.NoSuchProcess:
pass
if child_proc:
try:
child_proc_exe = child_proc.exe()
if tkl.compare_file_paths(child_proc_exe, cmd_expr_expanded):
call_proc_id = child_proc_id
call_proc = child_proc
proc_row_values = [child_proc_id, child_proc.name(), child_proc_exe, 'child process, running']
cache_print_proc_list_row(proc_row_values, proc_column_widths, proc_column_fmt)
# register a process in the cache
service_proc_cache[str(child_proc_id)] = (current_proc_id, child_proc_name, child_proc_exe)
service_proc_cache.sync() # sync immediately
executed_procs[child_proc_id] = child_proc
is_child_proc_found = True
break
except psutil.NoSuchProcess:
proc_row_values = [child_proc_id, child_proc.name(), '-', 'child process, closed']
cache_print_proc_list_row(proc_row_values, proc_column_widths, proc_column_fmt)
except psutil.AccessDenied:
proc_row_values = [child_proc_id, child_proc.name(), '-', 'child process, access denied']
cache_print_proc_list_row(proc_row_values, proc_column_widths, proc_column_fmt)
else:
proc_row_values = [child_proc_id, child_proc.name(), '-', 'child process, closed']
cache_print_proc_list_row(proc_row_values, proc_column_widths, proc_column_fmt)
if is_child_proc_found:
break
next_time = time.time()
if next_time - prev_time >= child_proc_search_timeout_sec:
break
# give to scheduler a break
time.sleep(.05)
if not is_child_proc_found and call_proc:
# retest process on existence
call_proc = None
try:
call_proc = psutil.Process(call_proc_id)
except psutil.NoSuchProcess:
pass
if call_proc:
proc_row_values = [call_proc_id, call_proc_exe, 'immediate process, running']
cache_print_proc_list_row(proc_row_values, proc_column_widths, proc_column_fmt)
# register a process in the cache
service_proc_cache[str(call_proc_id)] = (current_proc_id, call_proc.name(), call_proc_exe)
service_proc_cache.sync() # sync immediately
executed_procs[call_proc_id] = call_proc
else:
proc_row_values = [call_proc_id, call_proc_exe, 'immediate process, closed']
cache_print_proc_list_row(proc_row_values, proc_column_widths, proc_column_fmt)
cache_print_proc_list_footer(proc_column_widths)
# reading stdout until no length change in timeout
stdout_prev_len = stdout_iostr.tell()
stderr_prev_len = stderr_iostr.tell()
prev_time = time.time()
while True:
time.sleep(.05)
stdout_next_len = stdout_iostr.tell()
stderr_next_len = stderr_iostr.tell()
next_time = time.time()
if stdout_next_len != stdout_prev_len or stderr_next_len != stderr_prev_len:
stdout_prev_len = stdout_next_len
stderr_prev_len = stderr_next_len
prev_time = next_time
else:
if next_time - prev_time >= 0.1:
break
stdout_size = stdout_iostr.tell()
stderr_size = stderr_iostr.tell()
# rereading stdout to extract `SSH_AUTH_SOCK` environment variable
ssh_auth_sock_value = None
stdout_iostr.seek(0)
for line in stdout_iostr.readlines():
ssh_auth_sock_match = re.match('SSH_AUTH_SOCK[ \t]*=[ \t]*\'?([^\';]+)\'?', line)
if ssh_auth_sock_match:
ssh_auth_sock_value = ssh_auth_sock_match.group(1)
break
if stdout_size > 0:
stdout_iostr.seek(0)
print(str(stdout_iostr.read()).rstrip())
if stderr_size > 0:
stderr_iostr.seek(0)
print(str(stderr_iostr.read()).rstrip())
if stdout_size > 0 or stderr_size > 0:
print('<') # end of a command output
if ssh_auth_sock_value is None:
raise Exception('SSH_AUTH_SOCK is not found in the stdout of the `GIT_SVN_SSH_AGENT` process')
# register SSH_AUTH_SOCK environment variable
yaml_update_environ_vars(
{ 'SSH_AUTH_SOCK' : {
'if' : '${SVN_SSH_ENABLED} or ${GIT_SSH_ENABLED}',
'apps' : ['${GIT}'],
'value' : ssh_auth_sock_value
}
},
search_by_pred_at_third = lambda var_name: getglobalvar(var_name))
return executed_procs