Headoo/ElasticSearchBundle

View on GitHub
Command/PopulateElasticCommand.php

Summary

Maintainability
C
1 day
Test Coverage
<?php

namespace Headoo\ElasticSearchBundle\Command;

use Doctrine\ORM\Query;
use Symfony\Component\Console\Helper\ProgressBar;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Input\InputOption;
use Symfony\Component\Console\Output\OutputInterface;
use Symfony\Component\Process\Process;

class PopulateElasticCommand extends AbstractCommand
{

    protected function configure()
    {
        $this->setName('headoo:elastic:populate')
            ->setDescription('Repopulate Elastic Search')
            ->addOption('limit',   null, InputOption::VALUE_OPTIONAL, 'Limit For selected Type', 0)
            ->addOption('offset',  null, InputOption::VALUE_OPTIONAL, 'Offset For selected Type', 0)
            ->addOption('type',    null, InputOption::VALUE_OPTIONAL, 'Type of document you want to populate. You must to have configure it before use', null)
            ->addOption('threads', null, InputOption::VALUE_OPTIONAL, 'number of simultaneous threads', null)
            ->addOption('reset',   null, InputOption::VALUE_NONE,     'Reset the index')
            ->addOption('batch',   null, InputOption::VALUE_OPTIONAL, 'Number of Document per batch', null)
            ->addOption('id',      null, InputOption::VALUE_REQUIRED, 'Refresh a specific object with his Id', null)
            ->addOption('where',   null, InputOption::VALUE_REQUIRED, 'Refresh objects with specific field ', null)
            ->addOption('join',    null, InputOption::VALUE_REQUIRED, 'Join on another entity', null);
    }

    /**
     * @param InputInterface $input
     * @param OutputInterface $output
     * @return int
     * @throws \Doctrine\ORM\OptimisticLockException
     */
    protected function execute(InputInterface $input, OutputInterface $output)
    {
        $this->init($input, $output);

        if ($input->getOption('where') && ! $input->getOption('id')) {
            $output->writeln("<error>You must provide an 'id' with the 'where' option</error>");
            return self::EXIT_FAILED;
        }

        if ($input->getOption('id')) {
            if ($input->getOption('reset')) { // $input->getOption('batch') ||  || $input->getOption('threads')
                $output->writeln("<error>The option 'id' cannot be used with option 'reset'</error>");
                return self::EXIT_FAILED;
            }

            if (!$input->getOption('type')) {
                $output->writeln("<error>The option 'id' have to be used with option 'type'</error>");
                return self::EXIT_FAILED;
            }
        }

        // We add a limit per batch which equal of the batch option
        if($input->getOption('batch')){
            $this->limit = $this->batch ;
        }

        if($input->getOption('type')){
            return $this->_switchType($this->type, $this->batch);
        }

        $returnValue = self::EXIT_SUCCESS;
        foreach ($this->aTypes as $type){
            $returnedValue = $this->_switchType($type, $this->batch);
            if ($returnedValue != self::EXIT_SUCCESS) {
                $returnValue = self::EXIT_FAILED;
            }
        }

        return $returnValue;
    }

    /**
     * @param string $type
     * @param int $batch
     * @return int
     */
    private function _switchType($type, $batch)
    {
        if (in_array($type, $this->aTypes)) {
            $this->output->writeln(self::completeLine("BEGIN {$type}"));
            if ($this->reset) {
                $this->_resetType($type);
            }

            $returnValue = ($batch) ?
                $this->beginBatch($type) :
                $this->processBatch($type, $this->getContainer()->get($this->mappings[$type]['transformer']));

            $this->output->writeln(self::completeLine("FINISH {$type}"));

            return $returnValue;
        }

        $this->output->writeln(self::completeLine("Wrong Type"));

        return self::EXIT_FAILED;
    }

    /**
     * @param $type
     * @param $properties
     */
    private function _mappingFields($type, $properties)
    {
        // Define mapping
        $mapping = new \Elastica\Type\Mapping();
        $mapping->setType($type);

        // Set mapping
        $mapping->setProperties($properties);
        $mapping->send();
    }

    /**
     * @param \Elastica\Type $type
     * @param $aDocuments
     */
    private function _bulk($type, $aDocuments)
    {
        if(count($aDocuments)){
            $type->addDocuments($aDocuments);
            $type->getIndex()->refresh();
            unset($aDocuments);
        }
    }

    /**
     * @param $progressBar
     * @param array $processes
     * @param $maxParallel
     * @param int $poll
     * @param int $numberOfEntities
     * @return int
     */
    public function runParallel(ProgressBar $progressBar, array $processes, $maxParallel, $poll = 1000, $numberOfEntities)
    {
        // do not modify the object pointers in the argument, copy to local working variable
        $processesQueue = $processes;
        // fix maxParallel to be max the number of processes or positive
        $maxParallel = min(abs($maxParallel), count($processesQueue));
        // get the first stack of processes to start at the same time
        /** @var Process[] $currentProcesses */
        $currentProcesses = array_splice($processesQueue, 0, $maxParallel);
        // start the initial stack of processes
        foreach ($currentProcesses as $process) {
            $process->start();
        }

        $progression = $this->offset;
        $progressMax = $numberOfEntities + $this->offset;
        $returnValue = self::EXIT_SUCCESS;

        do {
            // wait for the given time
            usleep($poll);
            // remove all finished processes from the stack
            foreach ($currentProcesses as $index => $process) {
                if (!$process->isRunning()) {
                    if ($process->getExitCode() != self::EXIT_SUCCESS) {
                        $this->output->writeln($process->getErrorOutput());
                        $returnValue = self::EXIT_FAILED;
                    }

                    unset($currentProcesses[$index]);

                    $processDone = intval($process->getOutput());
                    $progression += $processDone;
                    $progressBar->setMessage("$progression/$progressMax");
                    $progressBar->advance($processDone);

                    // directly add and start new process after the previous finished
                    if (count($processesQueue) > 0) {
                        $nextProcess = array_shift($processesQueue);
                        $nextProcess->start();

                        $currentProcesses[] = $nextProcess;
                    }
                }
            }

            // continue loop while there are processes being executed or waiting for execution
        } while (count($processesQueue) > 0 || count($currentProcesses) > 0);

        $progressBar->finish();
        $progressBar->display();
        $this->output->writeln('');

        return $returnValue;
    }

    /**
     * @param $type
     * @return int
     */
    public function beginBatch($type)
    {
        $numberObjects = $this->entityManager->createQuery("SELECT COUNT(u) FROM {$this->mappings[$type]['class']} u")->getResult()[0][1];
        $aProcess = [];
        $numberOfEntities = $numberObjects - $this->offset;
        $numberOfProcess = floor($numberOfEntities / $this->limit);
        $sOptions = $this->getOptionsToString(['type', 'limit', 'offset', 'threads', 'batch', 'reset']);

        $progressBar = $this->getProgressBar($this->output, $numberOfEntities);

        for ($i = 0; $i <= $numberOfProcess; $i++) {
            $_offset = $this->offset + ($this->limit * $i);
            $process = new Process("php $this->consoleDir headoo:elastic:populate --type={$type} --limit={$this->limit} --offset={$_offset} --quiet " . $sOptions);
            $aProcess[] = $process;
        }

        return $this->runParallel($progressBar, $aProcess, $this->threads, 1000, $numberOfEntities);
    }

    /**
     * @param $type
     * @param $transformer
     */
    public function processBatch($type, $transformer)
    {
        $this->output->writeln(self::completeLine("Creating Type {$type} and Mapping"));

        $objectType = $this->getIndexFromType($type)->getType($type);
        $this->_mappingFields($objectType, $this->mappings[$type]['mapping']);

        $this->output->writeln(self::completeLine("Finish Type {$type} and Mapping"));
        $this->output->writeln(self::completeLine("Start populate {$type}"));

        // Select a specific object from his ID
        $query = $this->_getQuery($type, $iResults);

        if($this->offset){
            $query->setFirstResult($this->offset);
            $iResults = $iResults - $this->offset;
        }

        if($this->limit){
            $query->setMaxResults($this->limit);
            $iResults = $this->limit;
        }

        $iterableResult = $query->iterate();

        $progressBar = $this->getProgressBar($this->output, $iResults);
        $progression = 0;
        $progressMax = $iResults + $this->offset;

        foreach ($iterableResult as $row) {
            try {
                $document = $transformer->transform($row[0]);
            } catch (\Doctrine\ORM\EntityNotFoundException $e) {
                # An object has not been found
                $this->output->writeln(get_class($row[0]) . "({$row[0]->getId()}): {$e->getMessage()}");
                $document = null;
            }

            if (!$document) {
                continue;
            }

            $objectType->addDocument($document);
            $this->entityManager->clear();

            $progressBar->setMessage((++$progression + $this->offset) . "/{$progressMax}");
            $progressBar->advance();

            gc_collect_cycles();
        }

        $objectType->getIndex()->refresh();

        try {
            $progressBar->setProgress($iResults);
        } catch (\Symfony\Component\Console\Exception\LogicException $e) {
            # You can't regress the progress bar.
        }

        $progressBar->display();
        $progressBar->finish();

        $this->output->writeln('');
        $this->output->writeln("<info>" . self::completeLine("Finish populate {$type}") . "</info>");
        # In quite mode: just write in output the number of documents treated
        if ($this->quiet) {
            $this->output->writeln("$progression", OutputInterface::VERBOSITY_QUIET);
        }
    }

    /**
     * @param string $type
     * @return bool
     */
    private function _resetType($type)
    {
        $this->output->writeln(self::completeLine("RESET INDEX"));

        $index_name = $this->getContainer()->get('headoo.elasticsearch.handler')->getIndexName($type);
        $connection = $this->mappings[$type]['connection'];
        $index      = $this->mappings[$type]['index'];

        $response = $this->elasticSearchHelper->getClient($connection)->getIndex($index_name)->create($index, true);

        if ($response->hasError()) {
            $this->output->writeln("Cannot reset index '{$type}': " . $response->getErrorMessage());
            return false;
        }

        return true;
    }

    /**
     * @param string $type
     * @param int $iResults
     * @return Query
     */
    private function _getQuery($type, &$iResults)
    {
        $id = filter_var($this->id, FILTER_SANITIZE_STRING);
        $where = filter_var($this->where, FILTER_SANITIZE_STRING);
        $joins = filter_var($this->join, FILTER_SANITIZE_STRING);
        $entity = 'u';

        # Forge clause JOIN
        $clauseJoin = '';
        $aJoins = explode(',', $joins);
        foreach ($aJoins as $join) {
            if (empty($join)) {
                break;
            }
            $newId = ($newId ?? 0) + 1;
            $newEntity = "u_$newId";
            $clauseJoin .= " LEFT JOIN {$entity}.{$join} {$newEntity} ";
            $entity = $newEntity;
        }

        # Forge clause WHERE
        $clauseWhere = '';
        if ($id && $where) {
            $clauseWhere = " WHERE {$entity}.{$where} = '{$id}'";
        }
        if ($id && !$where) {
            $clauseWhere = " WHERE {$entity}.id = '{$id}'";
        }

        # COUNT results
        try {
            $iResults = $this->entityManager->createQuery("SELECT COUNT(u) FROM {$this->mappings[$type]['class']} u $clauseJoin $clauseWhere")->getResult()[0][1];
        } catch (\Doctrine\ORM\Query\QueryException $e) {
            $iResults = 0;
        }

        # Return Query
        return $this->entityManager->createQuery("SELECT u FROM {$this->mappings[$type]['class']} u $clauseJoin $clauseWhere");
    }
}