modxcms/revolution

View on GitHub
core/model/modx/registry/modfileregister.class.php

Summary

Maintainability
D
2 days
Test Coverage
<?php
/**
 * This file contains a simple file-based implementation of modRegister.
 *
 * @package modx
 * @subpackage registry
*/

/** Make sure the modRegister class is included. */
require_once(dirname(__FILE__) . '/modregister.class.php');

/**
 * A simple, file-based implementation of modRegister.
 *
 * This implementation does not address transactional conflicts and should be
 * used in non-critical processes that are easily recoverable.
 *
 * @package modx
 * @subpackage registry
 */
class modFileRegister extends modRegister {
    /**
     * A physical directory where the register stores topics and messages.
     * @var string
     */
    protected $directory = null;

    /**
     * Construct a new modFileRegister instance.
     *
     * @param modX &$modx A reference to a modX instance.
     * @param string $key A valid PHP variable which will be set on the modRegistry instance.
     * @param array $options Optional array of registry options.
     */
    function __construct(& $modx, $key, $options = array())
    {
        parent::__construct($modx, $key, $options);

        $modx->getCacheManager();
        $this->directory = $modx->getCachePath() . 'registry/';
        $this->directory .= isset($options['directory'])
            ? $options['directory']
            : $key;

        $this->directory = rtrim($this->directory, '/') . '/';
    }

    /**
     * Make sure the register can write to the specified $directory.
     *
     * {@inheritdoc}
     */
    public function connect(array $attributes = array()) {
        $connected = false;
        if (is_string($this->directory) && strlen($this->directory)) {
            $connected = $this->modx->cacheManager->writeTree($this->directory);
        }
        return $connected;
    }

    /**
     * Clear the register messages.
     *
     * {@inheritdoc}
     */
    public function clear($topic)
    {
        $topicDirectory = $this->directory . ltrim($this->sanitizePath($topic), '/');

        return $this->modx->cacheManager->deleteTree(
            realpath($topicDirectory),
            array(
                'extensions' => array('.msg.php')
            )
        );
    }

    /**
     * {@inheritdoc}
     *
     * This implementation supports the following options and default behavior:
     * <ul>
     * <li>msg_limit: Only poll until the specified limit of messages has
     * been digested. Default is 5 messages.</li>
     * <li>time_limit: Poll for new messages for a specified number of
     * seconds. Default is the result of the php time_limit system variable.</li>
     * <li>poll_limit: Only poll for new subscriptions a specified number
     * of times. Default is unlimited.</li>
     * <li>poll_interval: Wait a specified number of seconds between each
     * additional polling iteration, after the initial one. Default is no
     * interval.</li>
     * <li>remove_read: Remove the message immediately upon digesting it.
     * Default is true.</li>
     * <li>include_keys: Include the message keys in the array of messages returned.
     * Default is false.</li>
     * </ul>
     */
    public function read(array $options = array()) {
        $this->__kill = false;
        $messages = array();
        $topicMessages = array();
        $msgLimit = isset($options['msg_limit']) ? intval($options['msg_limit']) : 5;
        $timeLimit = isset($options['time_limit']) ? intval($options['time_limit']) : ini_get('time_limit');
        $pollLimit = isset($options['poll_limit']) ? intval($options['poll_limit']) : 0;
        $pollInterval = isset($options['poll_interval']) ? intval($options['poll_interval']) : 0;
        $removeRead = isset($options['remove_read']) ? (boolean) $options['remove_read'] : true;
        $includeKeys = isset($options['include_keys']) ? (boolean) $options['include_keys'] : false;
        $startTime = microtime(true);
        $time = $timeLimit <= 0 ? -1 : $startTime;
        $expires = $startTime + $timeLimit;
        $msgCount = 0;
        $iteration = 0;
        while ($time < $expires && $msgCount < $msgLimit && !$this->__kill) {
            if ($iteration > 0) {
                if ($pollLimit > 0 && $iteration >= $pollLimit) {
                    break;
                }
                if ($pollInterval > 0) sleep($pollInterval);
            }
            $iteration++;
            foreach ($this->subscriptions as $subIdx => $topic) {
                $topicMessages = array();
                $topicDirectory = $this->directory;
                $topicDirectory.= $topic[0] == '/' ? substr($topic, 1) : $topic ;
                if (is_dir($topicDirectory)) {
                    $dirListing = $this->getSortedDirectoryListing($topicDirectory);
                    if (!empty($dirListing)) {
                        foreach ($dirListing as $idx => $entry) {
                            if ($msgCount >= $msgLimit || $this->__kill) break;
                            if ($newMsg = $this->_readMessage($topicDirectory . $entry, $removeRead)) {
                                if (!$includeKeys) {
                                    $topicMessages[] = $newMsg;
                                } else {
                                    $msgKey = substr($entry, 0, strpos($entry, '.msg.php'));
                                    $topicMessages[$msgKey] = $newMsg;
                                }
                                $msgCount++;
                            }
                        }
                    }
                }
                elseif ($newMsg = $this->_readMessage($topicDirectory . '.msg.php', $removeRead)) {
                    if (!$includeKeys) {
                        $topicMessages[] = $newMsg;
                    } else {
                        $topicMessages[$topicDirectory] = $newMsg;
                    }
                    $msgCount++;
                }
            }
            if (!empty($topicMessages)) {
                if (!$includeKeys) {
                    $messages = $messages + $topicMessages;
                } else {
                    $messages = array_merge($messages, $topicMessages);
                }
            }
            $time = microtime(true);
        }
        return $messages;
    }

    /**
     * Get list of topic messages from a directory sorted by modified date.
     *
     * @param string $dir A valid directory path.
     * @return array An array of topic messages sorted by modified date.
     */
    private function getSortedDirectoryListing($dir) {
        $listing = array();
        $d = new DirectoryIterator($dir);
        $idx = 0;
        foreach ($d as $f) {
            $filename = $f->getFilename();
            if ($f->isFile() && strpos($filename, '.msg.php')) {
                $listing[] = $filename;
                $idx++;
            }
        }
        if (!empty($listing)) sort($listing);
        return $listing;
    }

    /**
     * Read a message file from the queue.
     *
     * @todo Implement support for reading various message types, other than
     * executable PHP format.
     * @access private
     * @param string $filename An absolute path to a message file to read.
     * @param boolean $remove Indicates if the message file should be deleted
     * once the message is read from it.
     */
    private function _readMessage($filename, $remove = true) {
        $message = null;
        if (file_exists($filename)) {
            $message = @ include($filename);
            if ($remove) {
                @ unlink($filename);
            }
        }
        return $message;
    }

    /**
     * {@inheritdoc}
     *
     * This implementation provides support for sending messages using either
     * time-based indexes so they are consumed in the order they are produced,
     * or named indexes typically used when consumers want to subscribe to a
     * specific, unique message. Individual messages or message collections
     * passed in numerically indexed arrays are treated as time-based messages
     * and message collections passed in associative arrays are treated as named
     * messages. e.g., to send a single message as named, wrap it in an array
     * with the intended message name as the key.
     *
     * This implementation also supports a message_type option to indicate the
     * format of the message being sent to the register. Currently only supports
     * executable PHP format.
     *
     * Other implementation specific options include:
     * <ul>
     * <li>delay: Number of seconds to delay the message. This option is only
     * supported for time-based messages.</li>
     * <li>ttl: Number of seconds the message is valid in the queue.
     * Default is forever or 0.</li>
     * <li>kill: Tells a message consumer to stop consuming any more
     * messages after reading any message sent with this option.</li>
     * </ul>
     *
     * @todo Implement support for sending various message types, other than
     * executable PHP format.
     */
    public function send($topic, $message, array $options = array()) {
        $sent = false;
        if (empty($topic) || $topic[0] != '/') $topic = $this->_currentTopic . $topic;
        $topicIdx = array_search($topic, $this->subscriptions);
        $topic = substr($topic, 1);
        if ($topicIdx !== false) {
            $messageType = isset($options['message_type']) ? $options['message_type'] : 'php';
            $topicDirectory = $this->directory . $topic;
            if ($topicDirectory[strlen($topicDirectory) - 1] != '/') $topicDirectory .= '/';
            if (!is_array($message)) {
                $message = array($message);
            }
            foreach ($message as $msgIdx => $msg) {
                if (is_scalar($msg) || is_array($msg) || is_object($msg)) {
                    switch ($messageType) {
                        //TODO: implement more message types
                        case 'php' :
                        default :
                            $timestamp = isset($options['delay']) ? time() + intval($options['delay']) : time();
                            $expires = isset($options['ttl']) && !empty($options['ttl']) ? time() + intval($options['ttl']) : 0;
                            $kill = isset($options['kill']) ? (boolean) $options['kill'] : false;
                            if (!is_int($msgIdx)) {
                                if (strpos($msgIdx, '../') !== false) {
                                    $this->modx->log(modX::LOG_LEVEL_ERROR, "Directory traversal attempt in register message key; message skipped with key {$msgIdx}");
                                    break;
                                }
                                $msgKey = $msgIdx;
                            } else {
                                $msgKey = strftime('%Y%m%dT%H%M%S', $timestamp) . '-' . sprintf("%03d", $msgIdx);
                            }
                            $filename = $topicDirectory . $msgKey . '.msg.php';
                            $content = "<?php\n";
                            if ($expires > 0) $content.= "if (time() > {$expires}) return null;\n";
                            if ($kill) $content.= "\$this->__kill = true;\n";
                            $content.= 'return ' . var_export($msg, true) . ";\n";
                            $sent = $this->modx->cacheManager->writeFile($filename, $content);
                    }
                }
            }
        }
        return $sent;
    }

    public function close() {
        return true;
    }

    /**
     * Sanitize the specified path
     *
     * @param string $path The path to clean
     * @return string The sanitized path
     */
    protected function sanitizePath($path) {
        return preg_replace(array("/\.*[\/|\\\]/i", "/[\/|\\\]+/i"), array('/', '/'), $path);
    }
}