src/pyff/repo.py

Summary

Maintainability
A
1 hr
Test Coverage
import random, time

from pyff.constants import NS, config
from pyff.logs import get_log
from pyff.resource import Resource, ResourceOpts
from pyff.samlmd import entitiesdescriptor, root
from pyff.store import make_icon_store_instance, make_store_instance
from pyff.utils import is_text, make_default_scheduler

log = get_log(__name__)


class MDRepository:
    """A class representing a set of SAML metadata and the resources from where this metadata was loaded."""

    def __init__(self, scheduler=None) -> None:
        random.seed(time.time())
        self.rm = Resource(url=None, opts=ResourceOpts())  # root
        if scheduler is None:
            scheduler = make_default_scheduler()
            scheduler.start()
        self.scheduler = scheduler
        self.store = make_store_instance()
        self.icon_store = make_icon_store_instance()
        self.rm.add_watcher(self.store, scheduler=self.scheduler)
        if config.load_icons:
            self.rm.add_watcher(self.icon_store, scheduler=self.scheduler)

    def _lookup(self, member, store=None):
        if store is None:
            store = self.store

        if member is None:
            member = "entities"

        if is_text(member):
            if '!' in member:
                (src, xp) = member.split("!")
                if len(src) == 0:
                    src = None
                return self.lookup(src, xp=xp, store=store)

        log.debug("calling store lookup %s" % member)
        return store.lookup(member)

    def lookup(self, member, xp=None, store=None):
        """
        Lookup elements in the working metadata repository

        :param member: A selector (cf below)
        :type member: basestring
        :param xp: An optional xpath filter
        :type xp: basestring
        :param store: the store to operate on
        :return: An iterable of EntityDescriptor elements
        :rtype: etree.Element


        **Selector Syntax**

            - selector "+" selector
            - [sourceID] "!" xpath
            - attribute=value or {attribute}value
            - entityID
            - source (typically @Name from an EntitiesDescriptor set but could also be an alias)

        The first form results in the intersection of the results of doing a lookup on the selectors. The second form
        results in the EntityDescriptor elements from the source (defaults to all EntityDescriptors) that match the
        xpath expression. The attribute-value forms results in the EntityDescriptors that contain the specified entity
        attribute pair. If non of these forms apply, the lookup is done using either source ID (normally @Name from
        the EntitiesDescriptor) or the entityID of single EntityDescriptors. If member is a URI but isn't part of
        the metadata repository then it is fetched an treated as a list of (one per line) of selectors. If all else
        fails an empty list is returned.

        """
        if store is None:
            store = self.store

        l = self._lookup(member, store=store)
        if hasattr(l, 'tag'):
            l = [l]
        elif hasattr(l, '__iter__'):
            l = list(l)

        if xp is None:
            return l
        else:
            log.debug("filtering %d entities using xpath %s" % (len(l), xp))
            t = entitiesdescriptor(l, 'dummy', lookup_fn=self.lookup)
            if t is None:
                return []
            l = root(t).xpath(xp, namespaces=NS, smart_strings=False)
            log.debug("got %d entities after filtering" % len(l))
            return l