bin/client.py

Summary

Maintainability
D
2 days
Test Coverage
#!/usr/bin/env python

#   Copyright (C) 2012 STFC
#
#   Licensed under the Apache License, Version 2.0 (the "License");
#   you may not use this file except in compliance with the License.
#   You may obtain a copy of the License at
#
#       http://www.apache.org/licenses/LICENSE-2.0
#
#   Unless required by applicable law or agreed to in writing, software
#   distributed under the License is distributed on an "AS IS" BASIS,
#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#   See the License for the specific language governing permissions and
#   limitations under the License.

#   Main script for APEL client.
#   The order of execution is as follows:
#    - fetch benchmark information from LDAP database
#    - join EventRecords and BlahdRecords into JobRecords
#    - summarise jobs
#    - unload JobRecords or SummaryRecords into filesystem
#    - send data to server using SSM
'''
   @author: Konrad Jopek, Will Rogers
'''

from optparse import OptionParser
import sys
import os
import logging.config
import ldap

try:
    import ConfigParser
except ImportError:
    # Renamed in Python 3
    import configparser as ConfigParser

from apel import __version__
from apel.db import ApelDb, ApelDbException
from apel.db.unloader import DbUnloader
from apel.ldap import fetch_specint
from apel.common import set_up_logging
from apel.common.exceptions import install_exc_handler, default_handler
import ssm.agents


DB_BACKEND = 'mysql'
LOGGER_ID = 'client'
LOG_BREAK = '====================='


class ClientConfigException(Exception):
    '''
    Exception raised if client is misconfigured.
    '''
    pass


def run_ssm(scp):
    """Run the SSM according to the values in the ConfigParser object."""
    log = logging.getLogger(LOGGER_ID)

    protocol = ssm.agents.get_protocol(scp, log)
    log.info('Setting up SSM with protocol: %s', protocol)
    brokers, project, token = ssm.agents.get_ssm_args(protocol, scp, log)
    ssm.agents.run_sender(protocol, brokers, project, token, scp, log)


def run_client(ccp):
    '''
    Run the client according to the configuration in the ConfigParser
    object.
    '''
    log = logging.getLogger(LOGGER_ID)

    try:
        spec_updater_enabled = ccp.getboolean('spec_updater', 'enabled')
        joiner_enabled = ccp.getboolean('joiner', 'enabled')

        if spec_updater_enabled or joiner_enabled:
            site_name = ccp.get('spec_updater', 'site_name')
            if site_name == '':
                raise ClientConfigException('Site name must be configured.')

        if spec_updater_enabled:
            ldap_host = ccp.get('spec_updater', 'ldap_host')
            ldap_port = int(ccp.get('spec_updater', 'ldap_port'))
        local_jobs = ccp.getboolean('joiner', 'local_jobs')
        if local_jobs:
            hostname = ccp.get('spec_updater', 'lrms_server')
            if hostname == '':
                raise ClientConfigException('LRMS server hostname must be '
                                            'configured if local jobs are '
                                            'enabled.')

            slt = ccp.get('spec_updater', 'spec_type')
            sl = ccp.getfloat('spec_updater', 'spec_value')

        unloader_enabled = ccp.getboolean('unloader', 'enabled')

        include_vos = None
        exclude_vos = None
        if unloader_enabled:
            unload_dir = ccp.get('unloader', 'dir_location')
            if ccp.getboolean('unloader', 'send_summaries'):
                table_name = 'VSuperSummaries'
            else:
                table_name = 'VJobRecords'
            send_ur = ccp.getboolean('unloader', 'send_ur')
            try:
                include = ccp.get('unloader', 'include_vos')
                include_vos = [vo.strip() for vo in include.split(',')]
            except ConfigParser.NoOptionError:
                # Only exclude VOs if we haven't specified the ones to include.
                include_vos = None
                try:
                    exclude = ccp.get('unloader', 'exclude_vos')
                    exclude_vos = [vo.strip() for vo in exclude.split(',')]
                except ConfigParser.NoOptionError:
                    exclude_vos = None

    except (ClientConfigException, ConfigParser.Error), err:
        log.error('Error in configuration file: %s', err)
        sys.exit(1)

    log.info('Starting apel client version %s.%s.%s', *__version__)

    # Log into the database
    try:
        db_hostname = ccp.get('db', 'hostname')
        db_port = ccp.getint('db', 'port')
        db_name = ccp.get('db', 'name')
        db_username = ccp.get('db', 'username')
        db_password = ccp.get('db', 'password')

        log.info('Connecting to the database ... ')
        db = ApelDb(DB_BACKEND, db_hostname, db_port,
                    db_username, db_password, db_name)
        db.test_connection()
        log.info('Connected.')

    except (ConfigParser.Error, ApelDbException), err:
        log.error('Error during connecting to database: %s', err)
        log.info(LOG_BREAK)
        sys.exit(1)

    log.info('Running manual spec update.')
    specs = []
    index = 1
    while True:
        key = 'manual_spec' + str(index)
        try:
            spec = ccp.get('spec_updater', key)
        except ConfigParser.NoOptionError:
            break
        specs.append(spec)
        index += 1

    if len(specs) > 0:
        try:
            s = ccp.get('spec_updater', 'site_name')
        except ConfigParser.NoOptionError:
            log.error('Site name must be configured '
                      'for manual_spec definitions.')
            sys.exit(1)
        for spec in specs:
            parts = spec.split(',')
            if len(parts) != 3:
                log.warning('Check manual_spec definitions.')
            try:
                sl = float(parts[2])
            except ValueError:
                log.error('Service level must be a number '
                          'for manual_spec definitions.')
                sys.exit(1)
            ce = parts[0]
            slt = parts[1]
            db.update_spec(s, ce, slt, sl)
    log.info('Manual spec update finished. %s updated.', len(specs))

    if spec_updater_enabled:
        log.info(LOG_BREAK)
        log.info('Starting spec updater.')
        try:
            spec_values = fetch_specint(site_name, ldap_host, ldap_port)
            for value in spec_values:
                db.update_spec(site_name, value[0], 'si2k', value[1])
            log.info('Spec updater finished.')
        except ldap.SERVER_DOWN, e:
            log.warning('Failed to fetch spec info: %s', e)
            log.warning('Spec updater failed.')
        except ldap.NO_SUCH_OBJECT, e:
            log.warning('Found no spec values in BDII: %s', e)
            log.warning('Is the site name %s correct?', site_name)

        log.info(LOG_BREAK)

    if joiner_enabled:
        log.info(LOG_BREAK)
        log.info('Starting joiner.')
        # This contains all the joining logic, contained in ApelMysqlDb() and
        # the stored procedures.
        if local_jobs:
            log.info('Updating benchmark information for local jobs:')
            log.info('%s, %s, %s, %s.', site_name, hostname, slt, sl)
            db.update_spec(site_name, hostname, slt, sl)
            log.info('Creating local jobs.')
            db.create_local_jobs()

        db.join_records()
        log.info('Joining complete.')
        log.info(LOG_BREAK)

    # Always summarise - we need the summaries for the sync messages.
    log.info(LOG_BREAK)
    log.info('Starting summariser.')
    # This contains all the summarising logic, contained in ApelMysqlDb() and
    # the stored procedures.
    db.summarise_jobs()
    log.info('Summarising complete.')
    log.info(LOG_BREAK)

    if unloader_enabled:
        log.info(LOG_BREAK)
        log.info('Starting unloader.')

        log.info('Will unload from %s.', table_name)

        interval = ccp.get('unloader', 'interval')
        withhold_dns = ccp.getboolean('unloader', 'withhold_dns')

        unloader = DbUnloader(db, unload_dir, include_vos, exclude_vos,
                              local_jobs, withhold_dns)
        try:
            if interval == 'latest':
                msgs, recs = unloader.unload_latest(table_name, send_ur)
            elif interval == 'gap':
                start = ccp.get('unloader', 'gap_start')
                end = ccp.get('unloader', 'gap_end')
                msgs, recs = unloader.unload_gap(table_name, start, end, send_ur)
            elif interval == 'all':
                msgs, recs = unloader.unload_all(table_name, send_ur)
            else:
                log.warning('Unrecognised interval: %s', interval)
                log.warning('Will not start unloader.')

            if recs > 0:
                log.info('Unloaded %d records in %d messages.', recs, msgs)
            else:
                log.warning('No usage records unloaded. If this is unexpected,'
                            ' please check your config.')

        except KeyError:
            log.warning('Invalid table name: %s, omitting', table_name)
        except ApelDbException, e:
            log.warning('Failed to unload records successfully: %s', e)

        # Always send sync messages
        msgs, recs = unloader.unload_sync()

        if recs > 0:
            log.info('Unloaded %d sync records in %d messages.', recs, msgs)
        else:
            log.warning('No sync records unloaded. If this is unexpected,'
                        ' please check your config.')

        log.info('Unloading complete.')
        log.info(LOG_BREAK)


def main():
    '''
    Parse command line arguments, set up logging and begin the client
    workflow.
    '''
    install_exc_handler(default_handler)
    ver = 'APEL client %s.%s.%s' % __version__
    opt_parser = OptionParser(version=ver, description=__doc__)

    opt_parser.add_option('-c', '--config',
                          help='main configuration file for APEL',
                          default='/etc/apel/client.cfg')

    opt_parser.add_option('-s', '--ssm_config',
                          help='location of SSM config file',
                          default='/etc/apel/sender.cfg')

    opt_parser.add_option('-l', '--log_config',
                          help='location of logging config file (optional)',
                          default='/etc/apel/logging.cfg')

    options, unused_args = opt_parser.parse_args()
    ccp = ConfigParser.ConfigParser()
    ccp.read(options.config)

    scp = ConfigParser.ConfigParser()
    scp.read(options.ssm_config)

    # set up logging
    try:
        if os.path.exists(options.log_config):
            logging.config.fileConfig(options.log_config)
        else:
            set_up_logging(ccp.get('logging', 'logfile'),
                           ccp.get('logging', 'level'),
                           ccp.getboolean('logging', 'console'))
        log = logging.getLogger(LOGGER_ID)
    except (ConfigParser.Error, ValueError, IOError), err:
        print 'Error configuring logging: %s' % str(err)
        print 'The system will exit.'
        sys.exit(1)

    run_client(ccp)

    if ccp.getboolean('ssm', 'enabled'):
        # Send unloaded messages
        log.info(LOG_BREAK)
        log.info('Starting SSM.')
        try:
            run_ssm(scp)
        except SystemExit as e:
            if e.code == 1:
                log.error('SSM failed to run.')
            else:
                log.critical('Unexpected SystemExit. See traceback below.')
                raise e
        else:
            log.info('SSM stopped.')
        log.info(LOG_BREAK)

    log.info('Client finished')


if __name__ == '__main__':
    main()