gdbots/pbjx-bundle-php

View on GitHub
src/Command/PbjxBatchCommand.php

Summary

Maintainability
D
2 days
Test Coverage
<?php
declare(strict_types=1);

namespace Gdbots\Bundle\PbjxBundle\Command;

use Gdbots\Pbj\Exception\DeserializeMessageFailed;
use Gdbots\Pbj\Serializer\JsonSerializer;
use Gdbots\Pbj\Util\NumberUtil;
use Symfony\Component\Console\Attribute\AsCommand;
use Symfony\Component\Console\Command\Command;
use Symfony\Component\Console\Input\InputArgument;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Input\InputOption;
use Symfony\Component\Console\Output\OutputInterface;
use Symfony\Component\Console\Style\SymfonyStyle;
use Symfony\Component\DependencyInjection\ContainerInterface;
use Symfony\Component\HttpFoundation\Request;

#[AsCommand(name: 'pbjx:batch')]
final class PbjxBatchCommand extends Command
{
    use PbjxAwareCommandTrait;

    public function __construct(protected ContainerInterface $container)
    {
        parent::__construct();
    }

    protected function configure(): void
    {
        $this
            ->setDescription('Reads messages from a newline-delimited JSON file and processes them')
            ->setHelp(<<<EOF
The <info>%command.name%</info> command will read messages (pbj commands or events) from a
newline-delimited JSON file and run pbjx->send or pbjx->publish.

<info>php %command.full_name% --dry-run /path/to/file/message.jsonl</info>

EOF
            )
            ->addOption(
                'user-agent',
                null,
                InputOption::VALUE_REQUIRED,
                'The http user agent to run as for this command.'
            )
            ->addOption(
                'in-memory',
                null,
                InputOption::VALUE_NONE,
                'Forces all transports to be "in_memory". Useful for debugging or ensuring sequential processing.'
            )
            ->addOption(
                'device-view',
                null,
                InputOption::VALUE_REQUIRED,
                'When gdbots/app-bundle is in use you can provide device-view to ' .
                'populate request and server attributes.'
            )
            ->addOption(
                'dry-run',
                null,
                InputOption::VALUE_NONE,
                'Reads lines and creates messages but will NOT process them.'
            )
            ->addOption(
                'skip-invalid',
                null,
                InputOption::VALUE_NONE,
                'Skip any lines that fail to deserialize.'
            )
            ->addOption(
                'skip-errors',
                null,
                InputOption::VALUE_NONE,
                'Skip any messages that fail to send/publish.'
            )
            ->addOption(
                'batch-size',
                null,
                InputOption::VALUE_REQUIRED,
                'Number of lines to read at a time.',
                100
            )
            ->addOption(
                'batch-delay',
                null,
                InputOption::VALUE_REQUIRED,
                'Number of milliseconds (1000 = 1 second) to delay between batches.',
                1000
            )
            ->addOption(
                'start-line',
                null,
                InputOption::VALUE_REQUIRED,
                'Start processing AT this line number.',
                1
            )
            ->addOption(
                'end-line',
                null,
                InputOption::VALUE_REQUIRED,
                'Stop processing AFTER this line number.',
                PHP_INT_MAX
            )
            ->addArgument(
                'file',
                InputArgument::REQUIRED,
                'The full path to a json line delimited file with pbj messages.'
            );
    }

    protected function execute(InputInterface $input, OutputInterface $output): int
    {
        $dryRun = $input->getOption('dry-run');
        $skipInvalid = $input->getOption('skip-invalid');
        $skipErrors = $input->getOption('skip-errors');
        $batchSize = NumberUtil::bound((int)$input->getOption('batch-size'), 1, 5000);
        $batchDelay = NumberUtil::bound((int)$input->getOption('batch-delay'), 100, 600000);
        $startLine = (int)$input->getOption('start-line');
        $endLine = (int)$input->getOption('end-line');
        $file = $input->getArgument('file');

        $io = new SymfonyStyle($input, $output);
        $io->title(sprintf('Reading messages from "%s"', $file));
        $this->useInMemoryTransports($input, $io);
        if (!$this->readyForPbjxTraffic($io, 'Aborting json lines processing.')) {
            return self::FAILURE;
        }

        if (!file_exists($file) || !is_readable($file)) {
            $io->error(sprintf('File "%s" must exist and be readable.', $file));
            return self::FAILURE;
        }

        /*
         * Pbjx processes are somewhat origination agnostic so we'll just make their
         * environments seem similar
         */
        $_SERVER['REQUEST_METHOD'] = 'POST';
        $_SERVER['CONTENT_TYPE'] = 'application/json';
        $_SERVER['HTTP_ACCEPT'] = 'application/json';
        $_SERVER['HTTP_ACCEPT_CHARSET'] = 'utf-8';
        $_SERVER['HTTP_USER_AGENT'] = $input->getOption('user-agent') ?: 'pbjx-console/2.x';

        $deviceView = $input->getOption('device-view');
        if (!empty($deviceView)) {
            $_SERVER['DEVICE_VIEW'] = $deviceView;
        }

        $handle = @fopen($file, 'r');
        if (!$handle) {
            $io->error(sprintf('Unable to open file "%s".', $file));
            return self::FAILURE;
        }

        $requestStack = $this->getRequestStack();
        $pbjx = $this->getPbjx();
        $serializer = new JsonSerializer();
        $batch = 1;
        $i = 0;
        $processed = 0;
        $io->comment(sprintf('Processing batch %d from file "%s".', $batch, $file));
        $io->newLine();

        while (($line = fgets($handle)) !== false) {
            ++$i;

            $line = trim($line);
            if (empty($line)) {
                continue;
            }

            if ($i < $startLine) {
                continue;
            }

            if ($i > $endLine) {
                $io->note(sprintf('%d. End line reached, stopping process.', $endLine));
                break;
            }

            try {
                $message = $serializer->deserialize($line);
                $curie = $message::schema()->getCurie();
                $ref = $message->generateMessageRef()->toString();

                $request = Request::create(
                    sprintf(
                        '/pbjx/%s/%s/%s/%s',
                        $curie->getVendor(),
                        $curie->getPackage(),
                        $curie->getCategory() ?: '_',
                        $curie->getMessage()
                    ),
                    $_SERVER['REQUEST_METHOD'],
                    [], // GET and POST (aka $_REQUEST)
                    $_COOKIE,
                    $_FILES,
                    $_SERVER,
                    $line
                );

                /*
                 * prepare the request object so http and console processing are virtually the same
                 */
                $request->setRequestFormat('json');
                $request->attributes->set('pbjx_vendor', $curie->getVendor());
                $request->attributes->set('pbjx_package', $curie->getPackage());
                $request->attributes->set('pbjx_category', $curie->getCategory());
                $request->attributes->set('pbjx_message', $curie->getMessage());
                $request->attributes->set('pbjx_bind_unrestricted', true);
                $request->attributes->set('pbjx_console', true);
                if (!empty($deviceView)) {
                    $request->attributes->set('device_view', $deviceView);
                }

                $requestStack->pop();
                $requestStack->push($request);
                ++$processed;

                if ($dryRun) {
                    $io->text(sprintf(
                        '<info>%d.</info> DRY RUN <comment>ref:</comment>%s, <comment>occurred_at:</comment>%s',
                        $i,
                        $ref,
                        $message->get('occurred_at')
                    ));
                } else {
                    if ($message::schema()->hasMixin('gdbots:pbjx:mixin:command')) {
                        $io->text(sprintf(
                            '<info>%d.</info> Sending <comment>ref:</comment>%s, <comment>occurred_at:</comment>%s',
                            $i,
                            $ref,
                            $message->get('occurred_at')
                        ));
                        $pbjx->send($message);
                    } elseif ($message::schema()->hasMixin('gdbots:pbjx:mixin:event')) {
                        $io->text(sprintf(
                            '<info>%d.</info> Publishing <comment>ref:</comment>%s, <comment>occurred_at:</comment>%s',
                            $i,
                            $ref,
                            $message->get('occurred_at')
                        ));
                        $pbjx->publish($message);
                    } else {
                        $io->warning(sprintf('%d. Ignoring "%s" since it\'s not a command or event.', $i, $ref));
                        --$processed;
                    }
                }
            } catch (DeserializeMessageFailed $de) {
                $io->error(sprintf('%d. %s', $i, $de->getMessage()));
                $io->note(sprintf('%d. Failed to deserialize json line below:', $i));
                $io->text($line);
                $io->newLine(2);
                if (!$skipInvalid) {
                    break;
                }
            } catch (\Throwable $e) {
                $io->error(sprintf('%d. %s', $i, $e->getMessage()));
                $io->newLine(2);
                if (!$skipErrors) {
                    break;
                }
            }

            if (0 === $i % $batchSize) {
                ++$batch;

                if ($batchDelay > 0) {
                    $io->newLine();
                    $io->note(sprintf('Pausing for %d milliseconds.', $batchDelay));
                    usleep($batchDelay * 1000);
                }

                $io->comment(sprintf('Processing batch %d from file "%s".', $batch, $file));
                $io->newLine();
            }
        }

        @fclose($handle);

        $io->newLine();
        $io->success(sprintf('Processed %s messages from "%s".', number_format($processed), $file));

        return self::SUCCESS;
    }
}