ssm/brokers.py

Summary

Maintainability
A
0 mins
Test Coverage
"""
   Copyright (C) 2012 STFC.

   Licensed under the Apache License, Version 2.0 (the "License");
   you may not use this file except in compliance with the License.
   You may obtain a copy of the License at

       http://www.apache.org/licenses/LICENSE-2.0

   Unless required by applicable law or agreed to in writing, software
   distributed under the License is distributed on an "AS IS" BASIS,
   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   See the License for the specific language governing permissions and
   limitations under the License.

   @author: Will Rogers

Class to interact with a BDII LDAP server to retrieve information about
the stomp brokers specified in a network.
"""
from __future__ import print_function

import ldap
import logging

log = logging.getLogger(__name__)

# Constants used for specific LDAP queries
STOMP_SERVICE = 'msg.broker.stomp'
STOMP_SSL_SERVICE = 'msg.broker.stomp-ssl'

STOMP_PREFIX = 'stomp'
STOMP_SSL_PREFIX = 'stomp+ssl'


class StompBrokerGetter(object):
    """Class for seaching a BDII for message brokers.

    Given the URL of a BDII, searches for all the STOMP
    brokers listed that are part of the specified network.
    """

    def __init__(self, bdii_url):
        """Set up the LDAP connection and strings which are re-used."""
        # Set up the LDAP connection
        logging.warning('LDAP is deprecated and will be removed in an upcoming version, '
                        'please set host locally in SSM config.')
        log.debug('Connecting to %s...', bdii_url)
        self._ldap_conn = ldap.initialize(bdii_url)

        self._base_dn = 'o=grid'
        self._service_id_key = 'GlueServiceUniqueID'
        self._endpoint_key = 'GlueServiceEndpoint'
        self._service_data_value_key = 'GlueServiceDataValue'

    def get_broker_urls(self, service_type, network):
        """Get a list stomp broker URLs in a specified network from a BDII.

        Checks them to see if they are part of the network. The network is
        supplied as a string. Returns a list of URLs.
        """
        prod_broker_urls = []

        broker_details = self._get_broker_details(service_type)

        for broker_id, broker_url in broker_details:
            if self._broker_in_network(broker_id, network):
                prod_broker_urls.append(broker_url)

        return prod_broker_urls

    def get_broker_hosts_and_ports(self, service_type, network):
        """Get a list of stomp broker (host, port) tuples from a BDII.

        Gets the list of all the stomp brokers in the BDII, then checks them to
        see if they are part of the network. The network is supplied as a
        string.Returns a list of (host, port) tuples.
        """
        urls = self.get_broker_urls(service_type, network)
        hosts_and_ports = []
        for url in urls:
            hosts_and_ports.append(parse_stomp_url(url))
        return hosts_and_ports

    def _get_broker_details(self, service_type):
        """Search the BDII for all STOMP message brokers.

        Returns a list of tuples: (<GlueServiceUniqueID>, <URL>).
        """
        broker_details = []

        ldap_filter = '(&(objectClass=GlueService)(GlueServiceType=%s))' % service_type
        attrs = [self._service_id_key, self._endpoint_key]

        brokers = self._ldap_conn.search_s(self._base_dn, ldap.SCOPE_SUBTREE, ldap_filter, attrs)

        for unused_dn, attrs in brokers:
            details = attrs[self._service_id_key][0], attrs[self._endpoint_key][0]
            broker_details.append(details)

        return broker_details

    def _broker_in_network(self, broker_id, network):
        """Check that a GlueServiceUniqueID is part of a specified network."""
        ldap_filter = '(&(GlueServiceDataKey=cluster)(GlueChunkKey=GlueServiceUniqueID=%s))' \
            % broker_id
        attrs = [self._service_data_value_key]
        results = self._ldap_conn.search_s(self._base_dn, ldap.SCOPE_SUBTREE,
                                           ldap_filter, attrs)

        try:
            unused_dn, attrs2 = results[0]
            return network in attrs2[self._service_data_value_key]
        except IndexError:  # no results from the query
            return False


def parse_stomp_url(stomp_url):
    """Parse a stomp scheme URL.

    Given a URL of the form stomp://stomp.cern.ch:6262/,
    return a tuple containing (stomp.cern.ch, 6262).
    """
    parts = stomp_url.split(':')

    protocols = [STOMP_PREFIX, STOMP_SSL_PREFIX]
    if not parts[0].lower() in protocols:
        raise ValueError("URL %s does not begin 'stomp:'." % stomp_url)

    host = parts[1].strip('/')
    port = parts[2].strip('/')
    if not port.isdigit():
        raise ValueError('URL %s does not have an integer as its third part.')

    return host, int(port)


if __name__ == '__main__':
    # BDII URL
    BDII = 'ldap://lcg-bdii.cern.ch:2170'
    BG = StompBrokerGetter(BDII)

    def print_brokers(text, service, network):
        """Pretty print a list of brokers."""
        brokers = BG.get_broker_hosts_and_ports(service, network)
        # Print section heading
        print('==', text, '==')
        # Print brokers in form 'host:port'
        for broker in brokers:
            print('%s:%i' % (broker[0], broker[1]))
        # Leave space between sections
        print()

    print_brokers('SSL production brokers', STOMP_SSL_SERVICE, 'PROD')
    print_brokers('Production brokers', STOMP_SERVICE, 'PROD')
    print_brokers('SSL test brokers', STOMP_SSL_SERVICE, 'TEST-NWOB')
    print_brokers('Test brokers', STOMP_SERVICE, 'TEST-NWOB')