forumhouseteam/laravel-amqp

View on GitHub
src/Queue/AMQPQueue.php

Summary

Maintainability
A
3 hrs
Test Coverage
<?php

namespace Forumhouse\LaravelAmqp\Queue;

use Forumhouse\LaravelAmqp\Exception\AMQPException;
use Forumhouse\LaravelAmqp\Jobs\AMQPJob;
use Forumhouse\LaravelAmqp\Utility\ArrayUtil;
use Illuminate\Contracts\Queue\Queue as QueueContract;
use Illuminate\Queue\InvalidPayloadException;
use Illuminate\Queue\Queue;
use PhpAmqpLib\Channel\AMQPChannel;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPTable;

/**
 * Class representing AMQP Queue
 *
 * @package Forumhouse\LaravelAmqp\Queue
 */
class AMQPQueue extends Queue implements QueueContract
{
    const EXCHANGE_TYPE_DIRECT = 'direct';

    const EXCHANGE_TYPE_TOPIC = 'topic';

    const EXCHANGE_TYPE_HEADERS = 'headers';

    const EXCHANGE_TYPE_FANOUT = 'fanout';

    /**
     * @var AMQPStreamConnection Connection to amqp compatible server
     */
    protected $connection;

    /**
     * @var AMQPChannel Channel, that is used for communication
     */
    protected $channel;

    /**
     * @var string Default queue name to be used when passed queue = null
     */
    protected $defaultQueueName;

    /**
     * @var string Exchange name, if used
     */
    protected $exchangeName;

    /**
     * @var string Default channel id if needed
     */
    private $defaultChannelId;

    /**
     * @var array
     */
    private $queueFlags;

    /**
     * @var array
     */
    private $messageProperties;

    /**
     * @var bool
     */
    private $declareQueues;

    /**
     * @var int
     */
    private $retryAfter;

    /**
     * @param AMQPStreamConnection $connection
     * @param string               $defaultQueueName  Default queue name
     * @param array                $queueFlags        Queue flags See a list of parameters to
     *                                                \PhpAmqpLib\Channel\AMQPChannel::queue_declare. Parameters should
     *                                                be passed like for call_user_func_array in this parameter
     * @param bool                 $declareQueues     If we should declare queues before actually trying to send a
     *                                                message
     * @param array                $messageProperties This is passed as a second parameter to
     *                                                \PhpAmqpLib\Message\AMQPMessage constructor
     * @param string               $defaultChannelId  Default channel id
     * @param string               $exchangeName      Exchange name
     * @param mixed                $exchangeType      Exchange type
     * @param mixed                $exchangeFlags     Exchange flags
     * @param mixed                $retryAfter       Optional timeout for failed jobs
     */
    public function __construct(
        AMQPStreamConnection $connection,
        $defaultQueueName = null,
        $queueFlags = [],
        $declareQueues = true,
        $messageProperties = [],
        $defaultChannelId = null,
        $exchangeName = '',
        $exchangeType = null,
        $exchangeFlags = [],
        $retryAfter = 0
    ) {
        $this->connection = $connection;
        $this->defaultQueueName = $defaultQueueName ?: 'default';
        $this->queueFlags = $queueFlags;
        $this->declareQueues = $declareQueues;
        $this->messageProperties = $messageProperties;
        $this->defaultChannelId = $defaultChannelId;
        $this->exchangeName = $exchangeName;
        $this->channel = $connection->channel($this->defaultChannelId);
        $this->retryAfter = $retryAfter;

        if ($exchangeName !== null) {
            $this->declareExchange($exchangeName, $exchangeType, $exchangeFlags);
        }
    }

    /**
     * @param string $exchangeName  The name of the exchange. For example, 'logs'
     * @param string $exchangeType  The type of the exchange. See EXCHANGE_TYPE_* constants for details
     * @param array  $exchangeFlags The flags of the exchange. See \PhpAmqpLib\Channel\AMQPChannel::exchange_declare
     *                              (from third parameter onwards). Must be an assoc array. Default flags can be omitted
     *
     * @see \PhpAmqpLib\Channel\AMQPChannel::exchange_declare
     * @return void
     */
    protected function declareExchange($exchangeName, $exchangeType, array $exchangeFlags = [])
    {
        $flags = array_replace([
            'exchange'    => $exchangeName,
            'type'        => $exchangeType,
            'passive'     => false,
            'durable'     => false,
            'auto_delete' => true,
            'internal'    => false,
            'nowait'      => false,
            'arguments'   => null,
            'ticket'      => null,
        ], $exchangeFlags);

        call_user_func_array([$this->channel, 'exchange_declare'], $flags);
    }

    /**
     * @return array
     */
    public function getCustomMessageOptions(){
        return ['retryAfter' => $this->retryAfter];
    }

    /**
     * Push a new job onto the queue.
     *
     * @param  string $job   Job implementation class name
     * @param  mixed  $data  Job custom data. Usually array
     * @param  string $queue Queue name, if different from the default one
     *
     * @throws \Illuminate\Queue\InvalidPayloadException
     * @throws AMQPException
     * @return bool Always true
     */
    public function push($job, $data = '', $queue = null)
    {
        $queue = $this->prepareQueue($queue);
        $payload = new AMQPMessage($this->createPayload($job, $queue, $data), $this->messageProperties);
        $this->channel->basic_publish($payload, $this->exchangeName, $this->getRoutingKey($queue));

        return true;
    }

    /**
     * Adds the message to internal buffer to be sent later in a batch
     *
     * @param string $job   Job implementation class name
     * @param mixed  $data  Job custom data. Usually array
     * @param string $queue Queue name, if different from the default one
     *
     * @return bool
     * @throws \Illuminate\Queue\InvalidPayloadException
     * @throws AMQPException
     */
    public function addMessageToBatch($job, $data = '', $queue = null)
    {
        $queue = $this->prepareQueue($queue);
        $payload = new AMQPMessage($this->createPayload($job, $queue, $data), $this->messageProperties);
        $this->channel->batch_basic_publish($payload, $this->exchangeName, $this->getRoutingKey($queue));

        return true;
    }

    /**
     * Publishes and internal buffer of messages, that were added before with {@see addMessageToBatch}
     *
     * @return void
     */
    public function pushBatch()
    {
        $this->channel->publish_batch();
    }

    /**
     * Helper to return a default queue name in case passed param is empty
     *
     * @param string|null $name Queue name. If null, default will be used
     *
     * @throws AMQPException
     * @return string Queue name to be used in AMQP calls
     */
    protected function getQueueName($name)
    {
        $name = $name ?: $this->defaultQueueName;
        if ($name === null) {
            throw new AMQPException('Default nor specific queue names given');
        }
        return $name;
    }

    /**
     * Declares a queue to the AMQP library
     *
     * @param string $name The name of the queue to declare
     *
     * @return QueueInfo
     * @throws AMQPException
     */
    public function declareQueue($name)
    {
        $queue = $this->getQueueName($name);
        $flags = array_replace_recursive([
            'queue'       => $queue,
            'passive'     => false,
            'durable'     => false,
            'exclusive'   => false,
            'auto_delete' => true,
            'nowait'      => false,
            'arguments'   => null,
            'ticket'      => null,
        ], $this->getQueueFlags($name));

        return QueueInfo::createFromDeclareOk(call_user_func_array([$this->channel, 'queue_declare'], $flags));
    }

    /**
     * @param string      $queueName
     * @param null|string $deferredQueueName
     * @param null|int    $deferredQueueDelay
     *
     * @return array
     */
    protected function getQueueFlags($queueName, $deferredQueueName = null, $deferredQueueDelay = null)
    {
        $args = func_get_args();
        $result = ArrayUtil::arrayMapRecursive(function ($value) use ($args) {
            return is_callable($value) ? call_user_func_array($value, $args) : $value;
        }, $this->queueFlags);

        $result = ArrayUtil::removeNullsRecursive($result);

        return $result;
    }

    /**
     *  Get routing key from config or use default one (queue name)
     *
     * @param $queue string
     *
     * @return string Routing key name
     */
    protected function getRoutingKey($queue)
    {
        return empty($this->queueFlags['routing_key']) ? $queue : $this->queueFlags['routing_key'];
    }

    /**
     * Push a raw payload onto the queue.
     *
     * @param  string $payload Job payload
     * @param  string $queue   Queue name, if different from the default one
     * @param  array  $options Currently unused
     *
     * @return bool Always true
     * @throws AMQPException
     */
    public function pushRaw($payload, $queue = null, array $options = [])
    {
        $queue = $this->prepareQueue($queue);
        $amqpPayload = new AMQPMessage($payload, $this->messageProperties);
        $this->channel->basic_publish($amqpPayload, $this->exchangeName, $queue);
        return true;
    }

    /**
     * Push a new job onto the queue after a delay.
     *
     * @param  \DateTime|int $delay Delay
     * @param  string        $job   Job implementation class name
     * @param  mixed         $data  Job custom data. Usually array
     * @param  string        $queue Queue name, if different from the default one
     *
     * @return bool Always true
     * @throws \Illuminate\Queue\InvalidPayloadException
     * @throws AMQPException
     */
    public function later($delay, $job, $data = '', $queue = null)
    {
        if ($delay instanceof \DateTime) {
            $delay = $delay->getTimestamp() - time();
        }

        $queue = $this->prepareQueue($queue);
        $delayedQueueName = $this->declareDelayedQueue($queue, $delay);

        $payload = new AMQPMessage($this->createPayload($job, $queue, $data), $this->messageProperties);
        $this->channel->basic_publish($payload, $this->exchangeName, $delayedQueueName);
        return true;
    }

    /**
     * Declares delayed queue to the AMQP library
     *
     * @param string $destinationQueueName Queue destination
     * @param int    $delay                Queue delay in seconds
     *
     * @return string Deferred queue name for the specified delay
     * @throws AMQPException
     */
    public function declareDelayedQueue($destinationQueueName, $delay)
    {
        $destinationQueueName = $this->getQueueName($destinationQueueName);
        $deferredQueueName = $destinationQueueName . '_deferred_' . $delay;

        $flags = array_replace_recursive([
            'queue'       => '',
            'passive'     => false,
            'durable'     => false,
            'exclusive'   => false,
            'auto_delete' => true,
            'nowait'      => false,
            'arguments'   => null,
            'ticket'      => null,
        ], $this->getQueueFlags($destinationQueueName, $deferredQueueName, $delay), [
            'queue'     => $deferredQueueName,
            'durable'   => true,
            'arguments' => new AMQPTable([
                'x-dead-letter-exchange'    => '',
                'x-dead-letter-routing-key' => $destinationQueueName,
                'x-message-ttl'             => intval($delay * 1000),
            ]),
        ]);

        call_user_func_array([$this->channel, 'queue_declare'], $flags);
        return $deferredQueueName;
    }

    /**
     * Pop the next job off of the queue.
     *
     * @param string|null $queue Queue name if different from the default one
     *
     * @return \Illuminate\Queue\Jobs\Job|null Job instance or null if no unhandled jobs available
     * @throws AMQPException
     */
    public function pop($queue = null)
    {
        $queue = $this->prepareQueue($queue);
        $envelope = $this->channel->basic_get($queue);

        if ($envelope instanceof AMQPMessage) {
            return new AMQPJob($this->container, $queue, $this->channel, $envelope);
        }

        return null;
    }

    /**
     * Get the size of the queue.
     *
     * @param  string $queue
     *
     * @return int
     * @throws AMQPException
     */
    public function size($queue = null)
    {
        $data = $this->declareQueue($this->getQueueName($queue));
        return $data->getJobs();
    }

    /**
     * @return array
     */
    public function getMessageProperties()
    {
        return $this->messageProperties;
    }

    /**
     * @param array $messageProperties
     *
     * @return AMQPQueue
     */
    public function setMessageProperties(array $messageProperties)
    {
        $this->messageProperties = $messageProperties;

        return $this;
    }

    /**
     * Prepares a queue for later use. Declares it if needed
     *
     * @param string $queue
     *
     * @return string
     * @throws AMQPException
     */
    private function prepareQueue($queue)
    {
        $queue = $this->getQueueName($queue);
        if ($this->declareQueues) {
            $this->declareQueue($queue);
        }

        return $queue;
    }

    /**
     * Create a payload string from the given job and data.
     *
     * @param  string  $job
     * @param  string  $queue
     * @param  mixed   $data
     * @return string
     *
     * @throws \Illuminate\Queue\InvalidPayloadException
     */
    protected function createPayload($job, $queue, $data = '')
    {
        $data = is_array($data) ? array_merge($data, $this->getCustomMessageOptions()) : $this->getCustomMessageOptions();
        $payload = json_encode($this->createPayloadArray($job, $queue, $data));
        if (JSON_ERROR_NONE !== json_last_error()) {
            throw new InvalidPayloadException(
                'Unable to JSON encode payload. Error code: '.json_last_error()
            );
        }

        return $payload;
    }

    /**
     * Create a payload for an object-based queue handler.
     *
     * @param  mixed  $job
     * @param  string  $queue
     * @return array
     */
    protected function createObjectPayload($job, $queue)
    {
        $payload = $this->withCreatePayloadHooks($queue, [
            'displayName' => $this->getDisplayName($job),
            'job' => 'Illuminate\Queue\CallQueuedHandler@call',
            'maxTries' => $job->tries ?? null,
            'timeout' => $job->timeout ?? null,
            'timeoutAt' => $this->getJobExpiration($job),
            'data' => [
                'commandName' => $job,
                'command' => $job,
            ],
        ]);

        return array_merge($payload, [
            'data' => array_merge([
                'commandName' => get_class($job),
                'command' => serialize(clone $job),
            ],$this->getCustomMessageOptions()),
        ]);
    }
}