modxcms/revolution

View on GitHub
core/model/modx/registry/moddbregister.class.php

Summary

Maintainability
D
2 days
Test Coverage
<?php
/*
 * This file is part of MODX Revolution.
 *
 * Copyright (c) MODX, LLC. All Rights Reserved.
 *
 * For complete copyright and license information, see the COPYRIGHT and LICENSE
 * files found in the top-level directory of this distribution.
 */

require_once dirname(__FILE__) . '/modregister.class.php';

/**
 * A simple, database-based implementation of modRegister.
 *
 * This implementation does not address transactional conflicts and should be
 * used in non-critical processes that are easily recoverable.
 *
 * @package modx
 * @subpackage registry.db
 */
class modDbRegister extends modRegister
{
    /**
     * The queue object representing this modRegister instance.
     * @access protected
     * @var modDbRegisterQueue $_queue
     */
    protected $_queue = null;

    /**
     * Construct a new modDbRegister instance.
     *
     * @param modX &$modx A reference to the modX instance
     * @param string $key The key of the registry to load
     * @param array $options An array of options to set
     */
    function __construct(modX &$modx, $key, array $options = array()) {
        parent :: __construct($modx, $key, $options);
        $this->_queue = $this->_initQueue($key, $options);
    }

    /**
     * Initialize a new queue
     * @param string $key The new name of the queue
     * @param array $options An array of options
     * @return modDbRegisterQueue A reference to the new Queue object
     */
    protected function _initQueue($key, $options) {
        $queue = $this->modx->getObject('registry.db.modDbRegisterQueue', array(
            'name' => $key
        ));
        if (!$queue) {
            $queue = $this->modx->newObject('registry.db.modDbRegisterQueue');
            $queue->set('name', $key);
            $queue->set('options', $options);
        } elseif (!empty($options)) {
            $queue->set('options', $options);
        }
        if ($queue && $queue->isDirty('options')) $queue->save();
        return $queue;
    }

    /**
     * Connect to the register service implementation. If we made it here, we connected fine.
     *
     * @param array $attributes A collection of attributes required for
     * connection to the register.
     * @return boolean Indicates if the connection was successful.
     */
    public function connect(array $attributes = array()) {
        return true;
    }

    /**
     * Clear the register messages.
     *
     * {@inheritdoc}
     */
    public function clear($topic)
    {
        $topicObject = $this->modx->getObject('registry.db.modDbRegisterTopic', array(
            'queue' => $this->_queue->get('id'),
            'name' => $topic
        ));
        if (!$topicObject) {
            return false;
        }

        return (bool) $this->modx->removeCollection('registry.db.modDbRegisterMessage', array(
            'topic' => $topicObject->get('id')
        ));
    }

    /**
     * This implementation supports the following options and default behavior:
     * <ul>
     * <li>msg_limit: Only poll until the specified limit of messages has
     * been digested. Default is 5 messages.</li>
     * <li>time_limit: Poll for new messages for a specified number of
     * seconds. Default is the result of the php time_limit system variable.</li>
     * <li>poll_limit: Only poll for new subscriptions a specified number
     * of times. Default is unlimited.</li>
     * <li>poll_interval: Wait a specified number of seconds between each
     * additional polling iteration, after the initial one. Default is no
     * interval.</li>
     * <li>remove_read: Remove the message immediately upon digesting it.
     * Default is true.</li>
     * <li>include_keys: Include the message keys in the array of messages returned.
     * Default is false.</li>
     * </ul>
     *
     * @param array $options An array of general or protocol specific options.
     * @return mixed The resulting message from the register.
     */
    public function read(array $options = array()) {
        $this->__kill = false;
        $messages = array();
        $topicMessages = array();
        $msgLimit = isset($options['msg_limit']) ? intval($options['msg_limit']) : 5;
        $timeLimit = isset($options['time_limit']) ? intval($options['time_limit']) : ini_get('max_execution_time');
        $pollLimit = isset($options['poll_limit']) ? intval($options['poll_limit']) : 0;
        $pollInterval = isset($options['poll_interval']) ? intval($options['poll_interval']) : 0;
        $removeRead = isset($options['remove_read']) ? (boolean) $options['remove_read'] : true;
        $includeKeys = isset($options['include_keys']) ? (boolean) $options['include_keys'] : false;
        $startTime = microtime(true);
        $time = $timeLimit <= 0 ? -1 : $startTime;
        $expires = $startTime + $timeLimit;
        $msgCount = 0;
        $iteration = 0;
        while ($time < $expires && $msgCount < $msgLimit && !$this->__kill) {
            if ($iteration > 0) {
                if ($pollLimit > 0 && $iteration >= $pollLimit) {
                    break;
                }
                if ($pollInterval > 0) sleep($pollInterval);
            }
            $iteration++;
            foreach ($this->subscriptions as $subIdx => $topic) {
                $topicMessages = array();
                $balance = $msgLimit - $msgCount;
                $args = array(
                    &$this,
                    $topic,
                    dirname($topic) . '/',
                    basename($topic),
                    $balance,
                    array('fetchMode' => PDO::FETCH_OBJ)
                );
                foreach ($this->modx->call('registry.db.modDbRegisterMessage', 'getValidMessages', $args) as $msg) {
                    $newMsg = $this->_readMessage($msg, $removeRead);
                    if ($newMsg !== null) {
                        if (!$includeKeys) {
                            $topicMessages[] = $newMsg;
                        } else {
                            $topicMessages[$msg->id] = $newMsg;
                        }
                        $msgCount++;
                    } else {
                        $this->modx->log(modX::LOG_LEVEL_INFO, 'Message was null or expired: ' . print_r($msg, 1));
                    }
                    if ($this->__kill) break;
                }
            }
            if (!empty($topicMessages)) {
                if (!$includeKeys) {
                    $messages = $messages + $topicMessages;
                } else {
                    $messages = array_merge($messages, $topicMessages);
                }
            }
            $time = microtime(true);
        }
        return $messages;
    }

    /**
     * Read a message record from the queue topic.
     *
     * @todo Implement support for reading various message types, other than
     * executable PHP format.
     *
     * @param object $obj The message data to read.
     * @param boolean $remove Indicates if the message should be deleted once it is read.
     * @return mixed The message returned
     */
    protected function _readMessage($obj, $remove = true) {
        $message = null;
        if (is_object($obj) && !empty($obj->payload)) {
            $message = eval($obj->payload);
            if ($remove || ($obj->expires > 1 && $obj->expires < time())) {
                $this->modx->removeObject('registry.db.modDbRegisterMessage', array('topic' => $obj->topic, 'id' => $obj->id));
            }
            if ($obj->kill) $this->__kill = true;
        }
        return $message;
    }

    /**
     * This implementation provides support for sending messages using either
     * time-based indexes so they are consumed in the order they are produced,
     * or named indexes typically used when consumers want to subscribe to a
     * specific, unique message. Individual messages or message collections
     * passed in numerically indexed arrays are treated as time-based messages
     * and message collections passed in associative arrays are treated as named
     * messages. e.g., to send a single message as named, wrap it in an array
     * with the intended message name as the key.
     *
     * This implementation also supports a message_type option to indicate the
     * format of the message being sent to the register. Currently only supports
     * executable PHP format.
     *
     * Other implementation specific options include:
     * <ul>
     * <li>delay: Number of seconds to delay the message. This option is only
     * supported for time-based messages.</li>
     * <li>ttl: Number of seconds the message is valid in the queue.
     * Default is forever or 0.</li>
     * <li>kill: Tells a message consumer to stop consuming any more
     * messages after reading any message sent with this option.</li>
     * </ul>
     *
     * @param string $topic A topic container in which to broadcast the message.
     * @param mixed $message A message, or collection of messages to be sent to
     * the register.
     * @param array $options An optional array of general or protocol
     * specific message properties.
     * @return boolean Indicates if the message was recorded.
     *
     * @todo Implement support for sending various message types, other than
     * executable PHP format.
     */
    public function send($topic, $message, array $options = array()) {
        $sent = false;
        if (empty($topic) || $topic[0] != '/') $topic = $this->_currentTopic . $topic;
        $topicIdx = array_search($topic, $this->subscriptions);
        $queueId = $this->_queue->get('id');
        if ($queueId && $topicIdx !== false) {
            $error = false;
            $messageType = isset($options['message_type']) ? $options['message_type'] : 'php';
            if (!$topicObj = $this->modx->getObject('registry.db.modDbRegisterTopic', array('queue' => $queueId, 'name' => $topic))) {
                $topicObj = $this->modx->newObject('registry.db.modDbRegisterTopic');
                $topicObj->set('queue', $queueId);
                $topicObj->set('name', $topic);
                $topicObj->set('created', strftime('%Y-%m-%d %H:%M:%S'));
                if (!$topicObj->save()) {
                    $error = true;
                }
            }
            if (!$error) {
                if (!is_array($message)) {
                    $message = array($message);
                }
                foreach ($message as $msgIdx => $msg) {
                    $payload = '';
                    if (is_scalar($msg) || is_array($msg) || is_object($msg)) {
                        switch ($messageType) {
                            //TODO: implement more message types
                            case 'php' :
                            default :
                                $timestamp = isset($options['delay']) ? time() + intval($options['delay']) : time();
                                $expires = isset($options['ttl']) && intval($options['ttl']) ? time() + intval($options['ttl']) : 0;
                                $kill = isset($options['kill']) ? (boolean) $options['kill'] : false;
                                if (!is_int($msgIdx)) {
                                    $msgKey = $msgIdx;
                                } else {
                                    $msgKey = strftime('%Y%m%dT%H%M%S', $timestamp) . '-' . sprintf("%03d", $msgIdx);
                                }
                                if ($expires > 0) $payload.= "if (time() > {$expires}) return null;\n";
                                $payload.= 'return ' . var_export($msg, true) . ";\n";
                                $messageObj = $this->modx->getObject('registry.db.modDbRegisterMessage', array('topic' => $topicObj->get('id'), 'id' => $msgKey));
                                if (!$messageObj) {
                                    $messageObj = $this->modx->newObject('registry.db.modDbRegisterMessage');
                                    $messageObj->set('topic', $topicObj->get('id'));
                                    $messageObj->set('id', $msgKey);
                                }
                                if ($messageObj) {
                                    $messageObj->set('created', strftime('%Y-%m-%d %H:%M:%S'));
                                    $messageObj->set('valid', strftime('%Y-%m-%d %H:%M:%S', $timestamp));
                                    $messageObj->set('expires', $expires);
                                    $messageObj->set('payload', $payload);
                                    $messageObj->set('kill', $kill);
                                    $sent = $messageObj->save();
                                }
                        }
                    }
                }
            } else {
                $this->modx->log(modX::LOG_LEVEL_ERROR, "Could not send message to queue {$queueId}, topic {$topic}. Message payload is " . print_r($message, 1));
            }
        }
        if (!$sent) $this->modx->log(modX::LOG_LEVEL_ERROR, "Could not send message to queue {$queueId}, topic {$topic}. Message payload is " . print_r($message, 1));
        return $sent;
    }

    /**
     * Close the connection to the register service implementation.
     * @return boolean Indicates if the connection was closed successfully.
     */
    public function close() {
        return true;
    }
}