symfony-doge/veslo

View on GitHub
src/AppBundle/Workflow/Vacancy/Research/Conveyor.php

Summary

Maintainability
A
1 hr
Test Coverage
<?php

/*
 * This file is part of the Veslo project <https://github.com/symfony-doge/veslo>.
 *
 * (C) 2019 Pavel Petrov <itnelo@gmail.com>.
 *
 * For the full copyright and license information, please view the LICENSE
 * file that was distributed with this source code.
 *
 * @license https://opensource.org/licenses/GPL-3.0 GPL-3.0
 */

declare(strict_types=1);

namespace Veslo\AppBundle\Workflow\Vacancy\Research;

use Bunny\Channel;
use Bunny\Client;
use Bunny\Message;
use Exception;
use Psr\Log\LoggerInterface;
use Symfony\Component\Serializer\Serializer;
use Symfony\Component\Workflow\Workflow;
use Veslo\AppBundle\Exception\Workflow\Conveyor\ConnectionFailedException;
use Veslo\AppBundle\Exception\Workflow\Conveyor\DistributionFailedException;
use Veslo\AppBundle\Workflow\Vacancy\Research\Conveyor\Payload;

/**
 * Manages data exchange between workers using workflow
 *
 * @see https://symfony.com/doc/current/components/workflow.html
 */
class Conveyor
{
    /**
     * Logger as it is
     *
     * @var LoggerInterface
     */
    private $logger;

    /**
     * State machine, represent business process
     *
     * @var Workflow
     */
    private $workflow;

    /**
     * Converts data in the appropriate format
     *
     * @var Serializer
     */
    private $serializer;

    /**
     * Communicates with message broker
     *
     * @var Client
     */
    private $amqpClient;

    /**
     * Prefix for workflow-related queues
     *
     * @var string
     */
    private $queuePrefix;

    /**
     * Conveyor constructor.
     *
     * @param LoggerInterface $logger      Logger as it is
     * @param Workflow        $workflow    State machine, represent business process
     * @param Serializer      $serializer  Converts data in the appropriate format
     * @param Client          $amqpClient  Communicates with message broker
     * @param string          $queuePrefix Prefix for workflow-related queues
     */
    public function __construct(
        LoggerInterface $logger,
        Workflow $workflow,
        Serializer $serializer,
        Client $amqpClient,
        string $queuePrefix
    ) {
        $this->logger      = $logger;
        $this->workflow    = $workflow;
        $this->serializer  = $serializer;
        $this->amqpClient  = $amqpClient;
        $this->queuePrefix = $queuePrefix;
    }

    /**
     * Sends payload data to queues for processing according to configured workflow transitions
     *
     * @param object $dto Data to be passed through workflow
     *
     * @return void
     */
    public function send(object $dto): void
    {
        $payload     = new Payload($dto);
        $transitions = $this->workflow->getEnabledTransitions($payload);

        if (0 >= count($transitions)) {
            return;
        }

        $queueNames = [];

        foreach ($transitions as $transition) {
            $transitionName = $transition->getName();
            $queueNames[]   = $this->queuePrefix . $transitionName;
        }

        $this->distribute($payload, $queueNames);
    }

    /**
     * Returns data transfer object filled up with data from queues according to configured workflow transitions
     *
     * @param string $dtoClass Class of data transfer object
     *
     * @return object|null
     */
    public function receive(string $dtoClass): ?object
    {
        // TODO: if (!in_array($dtoName, $this->dtoNames, true))

        $dto = new $dtoClass;

        $transitions = $this->workflow->getEnabledTransitions(new Payload($dto));

        // Space for multiple queue logic here.
        $transition = array_shift($transitions);

        $transitionName = $transition->getName();
        $queueName      = $this->queuePrefix . $transitionName;

        return $this->get($dtoClass, $queueName);
    }

    /**
     * Distributes payload data among the queues for conveyor processing via workflow
     *
     * @param Payload $payload    Data to be passed through workflow
     * @param array   $queueNames Queue names for publishing
     *
     * @return void
     *
     * @throws DistributionFailedException
     */
    private function distribute(Payload $payload, array $queueNames): void
    {
        $this->ensureConnection();

        $channel = $this->amqpClient->channel();
        $channel->txSelect();

        foreach ($queueNames as $queueName) {
            try {
                $channel->queueDeclare($queueName);
                $this->publish($payload, $channel, $queueName);
            } catch (Exception $e) {
                $channel->txRollback();

                // closing an amqp channel only if some error has occurred, otherwise we are fine.
                $channel->close()->then(function () {
                    $this->amqpClient->disconnect();
                });

                $this->logger->critical(
                    'Payload publish failed.',
                    [
                        'message'   => $e->getMessage(),
                        'payload'   => $this->serializer->normalize($payload),
                        'queueName' => $queueName,
                    ]
                );

                throw DistributionFailedException::withQueueName($queueName);
            }
        }

        $channel->txCommit();

        $normalizedPayload = $this->serializer->normalize($payload);
        $this->logger->info('Payload distributed.', ['queueNames' => $queueNames, 'payload' => $normalizedPayload]);
    }

    /**
     * Publishes payload data to queue for conveyor processing via workflow
     *
     * @param Payload $payload   Data to be passed through workflow
     * @param Channel $channel   Channel for communication with message broker
     * @param string  $queueName Queue name for publishing
     *
     * @return void
     */
    private function publish(Payload $payload, Channel $channel, string $queueName): void
    {
        $data    = $payload->getData();
        $message = $this->serializer->serialize($data, 'json');

        $channel->publish($message, ['content_type' => 'application/json'], '', $queueName);
    }

    /**
     * Returns a dto by specified classname filled up with payload data from queue
     *
     * @param string $dtoClass  Class of data transfer object
     * @param string $queueName Queue name for payload data fetching
     *
     * @return object|null Dto or null if target queue is empty
     */
    // TODO: extract into gateway
    private function get(string $dtoClass, string $queueName): ?object
    {
        $this->ensureConnection();

        $channel = $this->amqpClient->channel();
        $channel->queueDeclare($queueName);

        // Immediate Ack here; we CAN afford a message loss as the trade-off
        // between "a single vacancy loss" and CPU & RAM cost
        // while trying to process unnecessary, duplicate data in whole conveyor.
        $message = $channel->get($queueName, true);
        // Также хорошая статья по теме:
        // https://habr.com/ru/company/yandex/blog/442762/ (Идемпотентность при внешних операциях)

        if (!$message instanceof Message) {
            return null;
        }

        $this->logger->debug(
            'Payload received.',
            [
                'dtoClass'       => $dtoClass,
                'queueName'      => $queueName,
                'messageContent' => $message->content,
            ]
        );

        return $this->serializer->deserialize($message->content, $dtoClass, 'json');
    }

    /**
     * Ensures connection with message broker is established
     *
     * @return void
     *
     * @throws ConnectionFailedException
     */
    private function ensureConnection(): void
    {
        if ($this->amqpClient->isConnected()) {
            return;
        }

        try {
            $this->amqpClient->connect();
        } catch (Exception $e) {
            throw ConnectionFailedException::withPrevious($e);
        }
    }
}