salt/cli/batch.py
# -*- coding: utf-8 -*-
'''
Execute batch runs
'''
# Import python libs
from __future__ import absolute_import, print_function, unicode_literals
import math
import time
import copy
from datetime import datetime, timedelta
# Import salt libs
import salt.utils.stringutils
import salt.client
import salt.output
import salt.exceptions
# Import 3rd-party libs
# pylint: disable=import-error,no-name-in-module,redefined-builtin
from salt.ext import six
from salt.ext.six.moves import range
# pylint: enable=import-error,no-name-in-module,redefined-builtin
import logging
log = logging.getLogger(__name__)
def get_bnum(opts, minions, quiet):
'''
Return the active number of minions to maintain
'''
partition = lambda x: float(x) / 100.0 * len(minions)
try:
if '%' in opts['batch']:
res = partition(float(opts['batch'].strip('%')))
if res < 1:
return int(math.ceil(res))
else:
return int(res)
else:
return int(opts['batch'])
except ValueError:
if not quiet:
salt.utils.stringutils.print_cli('Invalid batch data sent: {0}\nData must be in the '
'form of %10, 10% or 3'.format(opts['batch']))
def batch_get_opts(
tgt,
fun,
batch,
parent_opts,
arg=(),
tgt_type='glob',
ret='',
kwarg=None,
**kwargs):
# We need to re-import salt.utils.args here
# even though it has already been imported.
# when cmd_batch is called via the NetAPI
# the module is unavailable.
import salt.utils.args
arg = salt.utils.args.condition_input(arg, kwarg)
opts = {'tgt': tgt,
'fun': fun,
'arg': arg,
'tgt_type': tgt_type,
'ret': ret,
'batch': batch,
'failhard': kwargs.get('failhard', False),
'raw': kwargs.get('raw', False)}
if 'timeout' in kwargs:
opts['timeout'] = kwargs['timeout']
if 'gather_job_timeout' in kwargs:
opts['gather_job_timeout'] = kwargs['gather_job_timeout']
if 'batch_wait' in kwargs:
opts['batch_wait'] = int(kwargs['batch_wait'])
for key, val in six.iteritems(parent_opts):
if key not in opts:
opts[key] = val
return opts
def batch_get_eauth(kwargs):
eauth = {}
if 'eauth' in kwargs:
eauth['eauth'] = kwargs.pop('eauth')
if 'username' in kwargs:
eauth['username'] = kwargs.pop('username')
if 'password' in kwargs:
eauth['password'] = kwargs.pop('password')
if 'token' in kwargs:
eauth['token'] = kwargs.pop('token')
return eauth
class Batch(object):
'''
Manage the execution of batch runs
'''
def __init__(self, opts, eauth=None, quiet=False, parser=None):
self.opts = opts
self.eauth = eauth if eauth else {}
self.pub_kwargs = eauth if eauth else {}
self.quiet = quiet
self.local = salt.client.get_local_client(opts['conf_file'])
self.minions, self.ping_gen, self.down_minions = self.__gather_minions()
self.options = parser
def __gather_minions(self):
'''
Return a list of minions to use for the batch run
'''
args = [self.opts['tgt'],
'test.ping',
[],
self.opts['timeout'],
]
selected_target_option = self.opts.get('selected_target_option', None)
if selected_target_option is not None:
args.append(selected_target_option)
else:
args.append(self.opts.get('tgt_type', 'glob'))
self.pub_kwargs['yield_pub_data'] = True
ping_gen = self.local.cmd_iter(*args,
gather_job_timeout=self.opts['gather_job_timeout'],
**self.pub_kwargs)
# Broadcast to targets
fret = set()
nret = set()
for ret in ping_gen:
if ('minions' and 'jid') in ret:
for minion in ret['minions']:
nret.add(minion)
continue
else:
try:
m = next(six.iterkeys(ret))
except StopIteration:
if not self.quiet:
salt.utils.stringutils.print_cli('No minions matched the target.')
break
if m is not None:
fret.add(m)
return (list(fret), ping_gen, nret.difference(fret))
def get_bnum(self):
return get_bnum(self.opts, self.minions, self.quiet)
def __update_wait(self, wait):
now = datetime.now()
i = 0
while i < len(wait) and wait[i] <= now:
i += 1
if i:
del wait[:i]
def run(self):
'''
Execute the batch run
'''
args = [[],
self.opts['fun'],
self.opts['arg'],
self.opts['timeout'],
'list',
]
bnum = self.get_bnum()
# No targets to run
if not self.minions:
return
to_run = copy.deepcopy(self.minions)
active = []
ret = {}
iters = []
# wait the specified time before decide a job is actually done
bwait = self.opts.get('batch_wait', 0)
wait = []
if self.options:
show_jid = self.options.show_jid
show_verbose = self.options.verbose
else:
show_jid = False
show_verbose = False
# the minion tracker keeps track of responses and iterators
# - it removes finished iterators from iters[]
# - if a previously detected minion does not respond, its
# added with an empty answer to ret{} once the timeout is reached
# - unresponsive minions are removed from active[] to make
# sure that the main while loop finishes even with unresp minions
minion_tracker = {}
if not self.quiet:
# We already know some minions didn't respond to the ping, so inform
# the user we won't be attempting to run a job on them
for down_minion in self.down_minions:
salt.utils.stringutils.print_cli('Minion {0} did not respond. No job will be sent.'.format(down_minion))
# Iterate while we still have things to execute
while len(ret) < len(self.minions):
next_ = []
if bwait and wait:
self.__update_wait(wait)
if len(to_run) <= bnum - len(wait) and not active:
# last bit of them, add them all to next iterator
while to_run:
next_.append(to_run.pop())
else:
for i in range(bnum - len(active) - len(wait)):
if to_run:
minion_id = to_run.pop()
if isinstance(minion_id, dict):
next_.append(minion_id.keys()[0])
else:
next_.append(minion_id)
active += next_
args[0] = next_
if next_:
if not self.quiet:
salt.utils.stringutils.print_cli('\nExecuting run on {0}\n'.format(sorted(next_)))
# create a new iterator for this batch of minions
new_iter = self.local.cmd_iter_no_block(
*args,
raw=self.opts.get('raw', False),
ret=self.opts.get('return', ''),
show_jid=show_jid,
verbose=show_verbose,
gather_job_timeout=self.opts['gather_job_timeout'],
**self.eauth)
# add it to our iterators and to the minion_tracker
iters.append(new_iter)
minion_tracker[new_iter] = {}
# every iterator added is 'active' and has its set of minions
minion_tracker[new_iter]['minions'] = next_
minion_tracker[new_iter]['active'] = True
else:
time.sleep(0.02)
parts = {}
# see if we found more minions
for ping_ret in self.ping_gen:
if ping_ret is None:
break
m = next(six.iterkeys(ping_ret))
if m not in self.minions:
self.minions.append(m)
to_run.append(m)
for queue in iters:
try:
# Gather returns until we get to the bottom
ncnt = 0
while True:
part = next(queue)
if part is None:
time.sleep(0.01)
ncnt += 1
if ncnt > 5:
break
continue
if self.opts.get('raw'):
parts.update({part['data']['id']: part})
if part['data']['id'] in minion_tracker[queue]['minions']:
minion_tracker[queue]['minions'].remove(part['data']['id'])
else:
salt.utils.stringutils.print_cli('minion {0} was already deleted from tracker, probably a duplicate key'.format(part['id']))
else:
parts.update(part)
for id in part:
if id in minion_tracker[queue]['minions']:
minion_tracker[queue]['minions'].remove(id)
else:
salt.utils.stringutils.print_cli('minion {0} was already deleted from tracker, probably a duplicate key'.format(id))
except StopIteration:
# if a iterator is done:
# - set it to inactive
# - add minions that have not responded to parts{}
# check if the tracker contains the iterator
if queue in minion_tracker:
minion_tracker[queue]['active'] = False
# add all minions that belong to this iterator and
# that have not responded to parts{} with an empty response
for minion in minion_tracker[queue]['minions']:
if minion not in parts:
parts[minion] = {}
parts[minion]['ret'] = {}
for minion, data in six.iteritems(parts):
if minion in active:
active.remove(minion)
if bwait:
wait.append(datetime.now() + timedelta(seconds=bwait))
# Munge retcode into return data
failhard = False
if 'retcode' in data and isinstance(data['ret'], dict) and 'retcode' not in data['ret']:
data['ret']['retcode'] = data['retcode']
if self.opts.get('failhard') and data['ret']['retcode'] > 0:
failhard = True
if self.opts.get('raw'):
ret[minion] = data
yield data
else:
ret[minion] = data['ret']
yield {minion: data['ret']}
if not self.quiet:
ret[minion] = data['ret']
data[minion] = data.pop('ret')
if 'out' in data:
out = data.pop('out')
else:
out = None
salt.output.display_output(
data,
out,
self.opts)
if failhard:
log.error(
'Minion %s returned with non-zero exit code. '
'Batch run stopped due to failhard', minion
)
raise StopIteration
# remove inactive iterators from the iters list
for queue in minion_tracker:
# only remove inactive queues
if not minion_tracker[queue]['active'] and queue in iters:
iters.remove(queue)
# also remove the iterator's minions from the active list
for minion in minion_tracker[queue]['minions']:
if minion in active:
active.remove(minion)
if bwait:
wait.append(datetime.now() + timedelta(seconds=bwait))