odin-detector/odin-data

View on GitHub
python/src/odin_data/control/frame_processor_adapter.py

Summary

Maintainability
C
1 day
Test Coverage
"""
Created on 6th September 2017

:author: Alan Greer
"""
import logging
import os

from odin.adapters.adapter import (
    ApiAdapterRequest,
    ApiAdapterResponse,
    request_types,
    response_types,
)
from tornado import escape

from odin_data.control.odin_data_adapter import OdinDataAdapter

FP_ADAPTER_KEY = 'fr_adapter_name'
PCT_BUFFER_FREE_KEY = 'buffer_threshold'

def bool_from_string(value):
    bool_value = False
    if value.lower() == 'true' or value.lower() == '1':
        bool_value = True
    return bool_value


class FrameProcessorAdapter(OdinDataAdapter):
    """
    OdinDataAdapter class

    This class provides the adapter interface between the ODIN server and the ODIN-DATA detector system,
    transforming the REST-like API HTTP verbs into the appropriate frameProcessor ZeroMQ control messages
    """
    VERSION_CHECK_CONFIG_ITEMS = ['plugin']

    def __init__(self, **kwargs):
        """
        Initialise the OdinDataAdapter object

        :param kwargs:
        """
        logging.debug("FPA init called")
        super(FrameProcessorAdapter, self).__init__(**kwargs)

        self._fr_adapter_name = kwargs.get(FP_ADAPTER_KEY, None)
        self._fr_adapter = None

        # Set the bufer threshold check, default to 0.0 (no check) if the option
        # does not exist or is badly formatted
        self._fr_pct_buffer_threshold = 0.0
        try:
            self._fr_pct_buffer_threshold = float(self.options.get(PCT_BUFFER_FREE_KEY, 0.0))
        except ValueError:
            logging.error("Could not set the buffer threshold to: {}".format(kwargs[PCT_BUFFER_FREE_KEY]))

        self._param = {
            'config/hdf/acquisition_id': '',
            'config/hdf/file/path': '',
            'config/hdf/file/name': '',
            'config/hdf/file/extension': 'h5',
            'config/hdf/frames': 0
        }
        self._command = 'config/hdf/write'
        self.setup_rank()

    def initialize(self, adapters):
        """Initialize the adapter after it has been loaded.
        Find and record the FR adapter for later error checks
        """
        if self._fr_adapter_name is None:
            return

        if self._fr_adapter_name not in adapters:
            raise ValueError(
                "Given FR adapter name '{}', but it is not in the loaded adapters {}".format(
                    self._fr_adapter_name, adapters
                )
            )

        self._fr_adapter = adapters[self._fr_adapter_name]
        logging.info("FP adapter initiated connection to FR adapter: {}".format(self._fr_adapter_name))

    @request_types('application/json', 'application/vnd.odin-native')
    @response_types('application/json', default='application/json')
    def get(self, path, request):

        """
        Implementation of the HTTP GET verb for OdinDataAdapter

        :param path: URI path of the GET request
        :param request: Tornado HTTP request object
        :return: ApiAdapterResponse object to be returned to the client
        """
        status_code = 200
        response = {}

        # First check if we are interested in the config items
        #
        # Store these parameters locally:
        # config/hdf/file/path
        # config/hdf/file/name
        # config/hdf/file/extension
        #
        # When this arrives write all params into a single IPC message
        # config/hdf/write
        if path in self._param:
            response['value'] = self._param[path]
        else:
            return super(FrameProcessorAdapter, self).get(path, request)

        return ApiAdapterResponse(response, status_code=status_code)

    @request_types('application/json', 'application/vnd.odin-native')
    @response_types('application/json', default='application/json')
    def put(self, path, request):  # pylint: disable=W0613

        """
        Implementation of the HTTP PUT verb for OdinDataAdapter

        :param path: URI path of the PUT request
        :param request: Tornado HTTP request object
        :return: ApiAdapterResponse object to be returned to the client
        """
        status_code = 200
        response = {}
        logging.debug("PUT path: %s", path)
        logging.debug("PUT request: %s", request)

        # First check if we are interested in the config items
        #
        # Store these parameters locally:
        # config/hdf/file/path
        # config/hdf/file/name
        # config/hdf/file/extension
        #
        # When this arrives write all params into a single IPC message
        # config/hdf/write
        try:
            self.clear_error()
            if path in self._param:
                logging.debug("Setting {} to {}".format(path, str(escape.url_unescape(request.body)).replace('"', '')))
                if path == 'config/hdf/frames':
                    self._param[path] = int(str(escape.url_unescape(request.body)).replace('"', ''))
                else:
                    self._param[path] = str(escape.url_unescape(request.body)).replace('"', '')
                # Merge with the configuration store

            elif path == self._command:
                write = bool_from_string(str(escape.url_unescape(request.body)))
                config = {'hdf': {'write': write}}
                logging.debug("Setting {} to {}".format(path, config))
                if write:
                    # Before attempting to write files, make some simple error checks

                    if self._fr_adapter is not None:
                        # Check if we have a valid buffer status from the FR adapter
                        valid, reason = self.check_fr_status()
                        if not valid:
                            raise RuntimeError(reason)

                    # Check the file path is valid
                    if not os.path.isdir(str(self._param['config/hdf/file/path'])):
                        raise RuntimeError("Invalid path specified [{}]".format(str(self._param['config/hdf/file/path'])))
                    # Check the filename exists
                    if str(self._param['config/hdf/file/name']) == '':
                        raise RuntimeError("File name must not be empty")

                    # First setup the rank for the frameProcessor applications
                    self.setup_rank()
                    rank = 0
                    for client in self._clients:
                        # Send the configuration required to setup the acquisition
                        # The file path is the same for all clients
                        parameters = {
                            'hdf': {
                                'frames': self._param['config/hdf/frames']
                            }
                        }
                        # Send the number of frames first
                        client.send_configuration(parameters)
                        parameters = {
                            'hdf': {
                                'acquisition_id': self._param['config/hdf/acquisition_id'],
                                'file': {
                                    'path': str(self._param['config/hdf/file/path']),
                                    'name': str(self._param['config/hdf/file/name']),
                                    'extension': str(self._param['config/hdf/file/extension'])
                                }
                            }
                        }
                        client.send_configuration(parameters)
                        rank += 1
                for client in self._clients:
                    # Send the configuration required to start the acquisition
                    client.send_configuration(config)

            else:
                return super(FrameProcessorAdapter, self).put(path, request)

        except Exception as ex:
            logging.error("Error: %s", ex)
            self.set_error(str(ex))
            status_code = 503
            response = {'error': str(ex)}

        return ApiAdapterResponse(response, status_code=status_code)

    def check_fr_status(self):
        valid_check = True
        reason = ''
        # We should have a valid connection to the FR adapter
        if self._fr_adapter is not None:
            # Create an inter adapter request
            req = ApiAdapterRequest(None, accept='application/json')
            status_dict = self._fr_adapter.get(path='status', request=req).data
            if 'value' in status_dict:
                frs = status_dict['value']
                for fr in frs:
                    try:
                        frames_dropped = fr['frames']['dropped']
                        empty_buffers = fr['buffers']['empty']
                        total_buffers = fr['buffers']['total']
                        if frames_dropped > 0:
                            valid_check = False
                            reason = "Frames dropped [{}] on at least one FR".format(frames_dropped)
                        pct_free = 0.0
                        if total_buffers > 0:
                            pct_free = float(empty_buffers) / float(total_buffers) * 100.0
                        if pct_free < self._fr_pct_buffer_threshold:
                            valid_check = False
                            reason = "There are only {}% free buffers left on at least one FR".format(pct_free)
                    except Exception as ex:
                        valid_check = False
                        reason = "Could not complete FR validity check, exception was thrown: {}".format(str(ex))
            else:
                valid_check = False
                reason = "No status returned from the FR adapter"
        else:
            valid_check = False
            reason = "No FR adapter has been registered with the FP adapter"

        return valid_check, reason

    def require_version_check(self, param):
        # If the parameter is in the version check list then request a version update
        if param in self.VERSION_CHECK_CONFIG_ITEMS:
            return True
        return False

    def setup_rank(self):
        # Attempt initialisation of the connected clients
        processes = len(self._clients)
        rank = 0
        for client in self._clients:
            try:
                # Setup the number of processes and the rank for each client
                parameters = {
                    'hdf': {
                        'process': {
                            'number': processes,
                            'rank': rank
                        }
                    }
                }
                client.send_configuration(parameters)
            except Exception as err:
                logging.debug(OdinDataAdapter.ERROR_FAILED_TO_SEND)
                logging.error("Error: %s", err)
            rank += 1