leboncoin/concourse-fiaas-resource

View on GitHub
assets/resource.py

Summary

Maintainability
C
1 day
Test Coverage
#!/usr/bin/env python
# -*- encoding: utf-8 -*-
"""
Concourse base resource class
"""
# Python 2 and 3 compatibility
from __future__ import unicode_literals, print_function

import json
import logging
import os
import sys
import tempfile
import time

import argparse
import subprocess


__program__ = "concourse-resource-type"
__version__ = "v0.1.0"
__author__ = "Jose Riguera"
__year__ = "2017"
__email__ = "jose.riguera@springernature.com"
__license__ = "MIT"
__purpose__ = "Consourse resource"


class Resource(object):
    """Base resource implementation."""
    LOGFORMAT = '%(name)s: %(message)s'
    LOGGING = "logging.ini"
    LOGLEVEL = logging.DEBUG
    LOGENVCONF = "RESOURCE_CONFIGLOG"
    DEBUG = "RESOURCE_DEBUG"

    def __init__(self, arguments=None, logging_config=None):
        args = self._args(arguments)
        self._logging(logging_config)
        self.fdin = args.infile
        self.fdout = args.outfile
        self.workfolder = args.workfolder

    def _logging(self, config=None):
        logconf = False
        logpath = os.environ.get(self.LOGENVCONF, config)
        if logpath:
            logpath = os.path.expandvars(logpath)
            try:
                logging.config.fileConfig(logpath)
            except Exception as e:
                print("Error '%s': %s" % (logpath, e), file=sys.stderr)
                logging.basicConfig(level=self.LOGLEVEL, format=self.LOGFORMAT)
            else:
                logconf = True
        else:
            logfile = tempfile.NamedTemporaryFile(delete=False, prefix='log')
            logging.basicConfig(
                level=logging.DEBUG,
                format=self.LOGFORMAT,
                filename=logfile.name)
        if os.environ.get(self.DEBUG, "0").lower() in ['1', 'yes', 'true', 'y']:
            stderr = logging.StreamHandler()
            stderr.setLevel(level=logging.DEBUG)
            logging.getLogger().addHandler(stderr)
        self.logger = logging.getLogger(self.__class__.__name__)
        self.logger.info("Initializing " + str(self.__class__.__name__))
        if not logconf:
            self.logger.info("Using default logging settings")
        else:
            self.logger.info("Using logging settings from '%s'" % logpath)
        return self.logger

    def _args(self, arguments=None):
        epilog = __purpose__ + '\n'
        epilog += __version__ + ', ' + __year__ + ' '
        epilog += __author__ + ' ' + __email__
        parser = argparse.ArgumentParser(
            formatter_class=argparse.RawTextHelpFormatter,
            description=__doc__, epilog=epilog)
        g1 = parser.add_argument_group('Configuration options')
        g1.add_argument('workfolder', nargs='?')
        g1.add_argument('infile',
            nargs='?',
            type=argparse.FileType('r'),
            default=sys.stdin)
        g1.add_argument('outfile',
            nargs='?',
            type=argparse.FileType('w'),
            default=sys.stdout)
        args = parser.parse_args(arguments)
        return args


    def run(self, command):
        """Parse input/arguments, perform requested command return output."""
        try:
            input = json.loads(self.fdin.read())
        except ValueError as e:
            msg = "Input json data not well-formed: %s" % e
            self.logger.error(msg)
            raise ValueError(msg)
        # combine source and params
        source = input.get('source', {})
        params = input.get('params', {})
        version = input.get('version', {})
        # Debug
        if source.get('debug', False):
            self.logger.setLevel(logging.DEBUG)
        self.logger.debug('command: "%s"', command)
        self.logger.debug('input: "%s"', input)
        self.logger.debug('folder: "%s"', self.workfolder)
        self.logger.debug('environment: %s', os.environ)
        if command == 'check':
            try:
                rcode, response = self.check(source, version)
            except Exception as e:
                msg = "Exception running '%s': %s" % (command, str(e))
                self.logger.error(msg)
                raise
        else:
            if not self.workfolder:
                msg = "Workspace folder not provided"
                self.logger.error(msg)
                raise ValueError(msg)
            if command == 'in':
                try:
                    rcode, response = self.fetch(
                    self.workfolder, source, version, params)
                except Exception as e:
                    msg = "Exception running '%s': %s" % (command, str(e))
                    self.logger.error(msg)
                    raise
            elif command == 'out':
                try:
                    os.chdir(self.workfolder)
                except Exception as e:
                    self.logger.error(str(e))
                    raise
                try:
                    rcode, response = self.update(self.workfolder, source, params)
                except Exception as e:
                    msg = "Exception running '%s': %s" % (command, str(e))
                    self.logger.error(msg)
                    raise
            else:
                msg = "Invalid command: '%s'" % command
                self.logger.error(msg)
                raise ValueError(msg)
        self.logger.debug('response: "%s"', response)
        output = json.dumps(response, indent=4, separators=(',', ': '))
        self.fdout.write(str(output) + '\n')
        return rcode


    def metadata(self, result):
        metadata = []
        for k in result.keys():
            metadata.append({"name": str(k), "value": str(result[k]) })
        return metadata


    def process(self, cmd=[], input=None, timeout=None):
        proc = subprocess.Popen(cmd,
            stdin=subprocess.PIPE,
            stdout=subprocess.PIPE,
            stderr=subprocess.PIPE)
        self.logger.info('Running process, pid=%d: %s' % (proc.pid, str(cmd)))
        try:
            output, err = proc.communicate(input, timeout=timeout)
        except subprocess.TimeoutExpired as e:
            self.logger.warning(
                'Process %d killed with timeout %s' % (proc.pid, str(timeout)))
            proc.kill()
            output, err = proc.communicate()
        self.logger.debug("stdout: " + repr(output))
        self.logger.debug("stderr: " + repr(err))
        stdout = output.decode('utf-8')
        stderr = err.decode('utf-8')
        if proc.returncode != 0:
            self.logger.warning(
                'Process %d failed with rcode %d' % (proc.pid, int(proc.returncode)))
        else:
            self.logger.debug('Process %d finished with rcode 0' % (proc.pid))
        return int(proc.returncode), stdout, stderr


    def check(self, source, version):
        """`check` is invoked to detect new versions of the resource.

        `source` is an arbitrary JSON object which specifies the location of
        the resource, including any credentials. This is passed verbatim from
        the pipeline configuration.
        `version` is a JSON object with string fields, used to uniquely
        identify an instance of the resource.
        """
        timestamp = time.time()
        versions = { "timestamp": str(timestamp) }
        rvalue = [ versions ]
        rcode = 0
        return rcode, rvalue


    def fetch(self, dir, source, version, params):
        """`fetch` is passed a destination directory as $1, and is given on stdin
        the configured source and a precise version of the resource to fetch.

        `source` is the same value as passed to check.
        `version` is the same type of value passed to check, and specifies the
        version to fetch.

        `params` is an arbitrary JSON object passed along verbatim from params
        on a get.
        """
        metadata = []
        timestamp = time.time()
        ver = { "timestamp": str(timestamp) }
        rvalue = { "version": ver, "metadata": metadata }
        rcode = 0
        return rcode, rvalue


    def update(self, dir, source, params):
        """`update` is called with a path to the directory containing the
        build's full set of sources as the first argument, and is given on
        stdin the configured params and the resource's source configuration.

        `source` is the same value as passed to check.
        `params` is an arbitrary JSON object passed along verbatim from params
        on a put.
        """
        metadata = []
        timestamp = time.time()
        version = { "timestamp": str(timestamp) }
        rvalue = { "version": version, "metadata": metadata }
        rcode = 0
        return rcode, rvalue


if __name__ == '__main__':
    r = Resource()
    try:
        rcode = r.run(os.path.basename(__file__))
    except Exception as e:
        sys.stderr.write("ERROR: " + str(e) + "\n")
        sys.exit(1)
    sys.exit(rcode)