saltstack/salt

View on GitHub
salt/utils/cache.py

Summary

Maintainability
B
5 hrs
Test Coverage
# -*- coding: utf-8 -*-
'''
In-memory caching used by Salt
'''
# Import Python libs
from __future__ import absolute_import, print_function, unicode_literals
import os
import re
import time
import logging
try:
    import salt.utils.msgpack as msgpack
except ImportError:
    msgpack = None

# Import salt libs
import salt.config
import salt.payload
import salt.utils.data
import salt.utils.dictupdate
import salt.utils.files

# Import third party libs
from salt.ext.six.moves import range  # pylint: disable=import-error,redefined-builtin
from salt.utils.zeromq import zmq

log = logging.getLogger(__name__)


class CacheFactory(object):
    '''
    Cache which can use a number of backends
    '''
    @classmethod
    def factory(cls, backend, ttl, *args, **kwargs):
        log.debug('Factory backend: %s', backend)
        if backend == 'memory':
            return CacheDict(ttl, *args, **kwargs)
        elif backend == 'disk':
            return CacheDisk(ttl, kwargs['minion_cache_path'], *args, **kwargs)
        else:
            log.error('CacheFactory received unrecognized cache type')


class CacheAPI(dict):
    '''
    Stub to export any cache implementation API
    '''
    def store(self):
        '''
        Store data in the cache persistence.
        :return:
        '''


class CacheDict(CacheAPI):
    '''
    Subclass of dict that will lazily delete items past ttl
    '''
    def __init__(self, ttl, *args, **kwargs):  # pylint: disable=W0231
        dict.__init__(self, *args, **kwargs)   # pylint: disable=W0233
        self._ttl = ttl
        self._key_cache_time = {}

    def _enforce_ttl_key(self, key):
        '''
        Enforce the TTL to a specific key, delete if its past TTL
        '''
        if key not in self._key_cache_time or self._ttl == 0:
            return
        if time.time() - self._key_cache_time[key] > self._ttl:
            del self._key_cache_time[key]
            dict.__delitem__(self, key)

    def __getitem__(self, key):
        '''
        Check if the key is ttld out, then do the get
        '''
        self._enforce_ttl_key(key)
        return dict.__getitem__(self, key)

    def __setitem__(self, key, val):
        '''
        Make sure to update the key cache time
        '''
        self._key_cache_time[key] = time.time()
        dict.__setitem__(self, key, val)

    def __contains__(self, key):
        self._enforce_ttl_key(key)
        return dict.__contains__(self, key)


class CacheDisk(CacheDict):
    '''
    Class that represents itself as a dictionary to a consumer
    but uses a disk-based backend. Serialization and de-serialization
    is done with msgpack
    '''
    def __init__(self, ttl, path, *args, **kwargs):
        super(CacheDisk, self).__init__(ttl, *args, **kwargs)
        self._path = path
        self._dict = {}
        self._read()

    def _enforce_ttl_key(self, key):
        '''
        Enforce the TTL to a specific key, delete if its past TTL
        '''
        if key not in self._key_cache_time or self._ttl == 0:
            return
        if time.time() - self._key_cache_time[key] > self._ttl:
            del self._key_cache_time[key]
            self._dict.__delitem__(key)

    def __contains__(self, key):
        self._enforce_ttl_key(key)
        return self._dict.__contains__(key)

    def __repr__(self):
        '''
        Represent CacheDisk.
        :return:
        '''
        return '<{name} of {length} entries at {memaddr}>'.format(
            name=self.__class__.__name__, length=len(self), memaddr=hex(id(self)))

    def __str__(self):
        '''
        String version of this object.
        :return:
        '''
        return self.__repr__()

    def __len__(self):
        '''
        Length of the cache storage.

        :return:
        '''
        return len(self._dict)

    def __getitem__(self, key):
        '''
        Check if the key is ttld out, then do the get
        '''
        self._enforce_ttl_key(key)
        item = None
        if key in self._dict:
            item = self._dict.__getitem__(key)

        return item

    def __setitem__(self, key, val):
        '''
        Make sure to update the key cache time
        '''
        self._key_cache_time[key] = time.time()
        self._dict.__setitem__(key, val)
        # Do the same as the parent but also persist
        self.store()

    def __delitem__(self, key):
        '''
        Make sure to remove the key cache time
        '''
        del self._key_cache_time[key]
        self._dict.__delitem__(key)
        # Do the same as the parent but also persist
        self.store()

    def _read(self):
        '''
        Read in from disk
        '''
        if msgpack is None:
            log.error('Cache cannot be read from the disk: msgpack is missing')
        elif not os.path.exists(self._path):
            log.debug('Cache path does not exist for reading: %s', self._path)
        else:
            try:
                with salt.utils.files.fopen(self._path, 'rb') as fp_:
                    cache = salt.utils.data.decode(msgpack.load(fp_, encoding=__salt_system_encoding__))
                if "CacheDisk_cachetime" in cache:  # new format
                    self._dict = cache["CacheDisk_data"]
                    self._key_cache_time = cache["CacheDisk_cachetime"]
                else:  # old format
                    self._dict = cache
                    timestamp = os.path.getmtime(self._path)
                    for key in self._dict:
                        self._key_cache_time[key] = timestamp
                if log.isEnabledFor(logging.DEBUG):
                    log.debug('Disk cache retrieved: %s', cache)
            except (IOError, OSError) as err:
                log.error('Error while reading disk cache from %s: %s', self._path, err)

    def store(self):
        '''
        Write content of the entire cache to disk
        '''
        if msgpack is None:
            log.error('Cache cannot be stored on disk: msgpack is missing')
        else:
            # TODO Dir hashing?
            try:
                with salt.utils.files.fopen(self._path, 'wb+') as fp_:
                    cache = {
                        "CacheDisk_data": self._dict,
                        "CacheDisk_cachetime": self._key_cache_time
                    }
                    msgpack.dump(cache, fp_, use_bin_type=True)
            except (IOError, OSError) as err:
                log.error('Error storing cache data to the disk: %s', err)


class CacheCli(object):
    '''
    Connection client for the ConCache. Should be used by all
    components that need the list of currently connected minions
    '''

    def __init__(self, opts):
        '''
        Sets up the zmq-connection to the ConCache
        '''
        self.opts = opts
        self.serial = salt.payload.Serial(self.opts.get('serial', ''))
        self.cache_sock = os.path.join(self.opts['sock_dir'], 'con_cache.ipc')
        self.cache_upd_sock = os.path.join(
            self.opts['sock_dir'], 'con_upd.ipc')

        context = zmq.Context()

        # the socket for talking to the cache
        self.creq_out = context.socket(zmq.REQ)
        self.creq_out.setsockopt(zmq.LINGER, 100)
        self.creq_out.connect('ipc://' + self.cache_sock)

        # the socket for sending updates to the cache
        self.cupd_out = context.socket(zmq.PUB)
        self.cupd_out.setsockopt(zmq.LINGER, 1)
        self.cupd_out.connect('ipc://' + self.cache_upd_sock)

    def put_cache(self, minions):
        '''
        published the given minions to the ConCache
        '''
        self.cupd_out.send(self.serial.dumps(minions))

    def get_cached(self):
        '''
        queries the ConCache for a list of currently connected minions
        '''
        msg = self.serial.dumps('minions')
        self.creq_out.send(msg)
        min_list = self.serial.loads(self.creq_out.recv())
        return min_list


class CacheRegex(object):
    '''
    Create a regular expression object cache for the most frequently
    used patterns to minimize compilation of the same patterns over
    and over again
    '''
    def __init__(self, prepend='', append='', size=1000,
                 keep_fraction=0.8, max_age=3600):
        self.prepend = prepend
        self.append = append
        self.size = size
        self.clear_size = int(size - size * (keep_fraction))
        if self.clear_size >= size:
            self.clear_size = int(size/2) + 1
            if self.clear_size > size:
                self.clear_size = size
        self.max_age = max_age
        self.cache = {}
        self.timestamp = time.time()

    def clear(self):
        '''
        Clear the cache
        '''
        self.cache.clear()

    def sweep(self):
        '''
        Sweep the cache and remove the outdated or least frequently
        used entries
        '''
        if self.max_age < time.time() - self.timestamp:
            self.clear()
            self.timestamp = time.time()
        else:
            paterns = list(self.cache.values())
            paterns.sort()
            for idx in range(self.clear_size):
                del self.cache[paterns[idx][2]]

    def get(self, pattern):
        '''
        Get a compiled regular expression object based on pattern and
        cache it when it is not in the cache already
        '''
        try:
            self.cache[pattern][0] += 1
            return self.cache[pattern][1]
        except KeyError:
            pass
        if len(self.cache) > self.size:
            self.sweep()
        regex = re.compile('{0}{1}{2}'.format(
            self.prepend, pattern, self.append))
        self.cache[pattern] = [1, regex, pattern, time.time()]
        return regex


class ContextCache(object):
    def __init__(self, opts, name):
        '''
        Create a context cache
        '''
        self.opts = opts
        self.cache_path = os.path.join(opts['cachedir'], 'context', '{0}.p'.format(name))
        self.serial = salt.payload.Serial(self.opts)

    def cache_context(self, context):
        '''
        Cache the given context to disk
        '''
        if not os.path.isdir(os.path.dirname(self.cache_path)):
            os.mkdir(os.path.dirname(self.cache_path))
        with salt.utils.files.fopen(self.cache_path, 'w+b') as cache:
            self.serial.dump(context, cache)

    def get_cache_context(self):
        '''
        Retrieve a context cache from disk
        '''
        with salt.utils.files.fopen(self.cache_path, 'rb') as cache:
            return salt.utils.data.decode(self.serial.load(cache))


def context_cache(func):
    '''
    A decorator to be used module functions which need to cache their
    context.

    To evaluate a __context__ and re-hydrate it if a given key
    is empty or contains no items, pass a list of keys to evaulate.
    '''
    def context_cache_wrap(*args, **kwargs):
        func_context = func.__globals__['__context__']
        func_opts = func.__globals__['__opts__']
        func_name = func.__globals__['__name__']

        context_cache = ContextCache(func_opts, func_name)
        if not func_context and os.path.isfile(context_cache.cache_path):
            salt.utils.dictupdate.update(func_context, context_cache.get_cache_context())
        else:
            context_cache.cache_context(func_context)
        return func(*args, **kwargs)
    return context_cache_wrap


# test code for the CacheCli
if __name__ == '__main__':

    opts = salt.config.master_config('/etc/salt/master')

    ccli = CacheCli(opts)

    ccli.put_cache(['test1', 'test10', 'test34'])
    ccli.put_cache(['test12'])
    ccli.put_cache(['test18'])
    ccli.put_cache(['test21'])
    print('minions: {0}'.format(ccli.get_cached()))