Headoo/ElasticSearchBundle

View on GitHub
Command/ExodusElasticCommand.php

Summary

Maintainability
A
1 hr
Test Coverage
<?php

namespace Headoo\ElasticSearchBundle\Command;

use Elastica\Query;
use Elastica\ResultSet;
use Elastica\Scroll;
use Elastica\Search;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Input\InputOption;
use Symfony\Component\Console\Output\OutputInterface;

class ExodusElasticCommand extends AbstractCommand
{
    protected $nbrDocumentsFound = 0;
    protected $nbrDocumentTested = 0;
    protected $nbrEntitiesRemoved = 0;
    protected $nbrDone = 0;

    /** @var int */
    protected $batch = 100;

    protected function configure()
    {
        $this->setName('headoo:elastic:exodus')
            ->setDescription('Remove not linked entities from Elastic Search')
            ->addOption('type',    't', InputOption::VALUE_OPTIONAL, 'Type of document you want to exodus. You must to have configure it before use', null)
            ->addOption('batch',   'b', InputOption::VALUE_OPTIONAL, 'Number of Documents per batch', null)
            ->addOption('dry-run', 'd', InputOption::VALUE_OPTIONAL, 'Dry run: do not make any change on ES', false);
    }

    /**
     * @param InputInterface $input
     * @param OutputInterface $output
     * @return int
     */
    protected function execute(InputInterface $input, OutputInterface $output)
    {
        $this->init($input, $output);

        $this->output->writeln("<info>" . self::completeLine($this->getName() . " " . date('H:i:s')) . "</info>");

        if ($this->verbose) {
            $sMsg = ($this->type) ? "Type: {$this->type}\n" : "";
            $sMsg .= "Batch: {$this->batch}";
            $this->output->writeln($sMsg);
        }

        if ($this->type) {
            return $this->_switchType($this->type);
        }

        $returnValue = AbstractCommand::EXIT_SUCCESS;

        foreach ($this->aTypes as $type) {
            $result = $this->_switchType($type);
            $returnValue = ($result == AbstractCommand::EXIT_SUCCESS) ?
                $returnValue :
                $result;
        }

        return $returnValue;
    }

    /**
     * @param $type
     * @return int
     */
    private function _switchType($type)
    {
        if (!in_array($type, $this->aTypes)) {
            $this->output->writeln(self::completeLine("Wrong Type"));
            return AbstractCommand::EXIT_FAILED;
        }

        return $this->processBatch($type);
    }

    /**
     * @param \Elastica\Type $type
     * @param \Doctrine\ORM\EntityRepository $repository
     * @param ResultSet $resultSet
     */
    private function removeFromElasticSearch($type, $repository, $resultSet)
    {
        foreach ($resultSet as $result) {
            $this->nbrDocumentTested++;
            $documentId = $result->getDocument()->getId();

            # Look up into Doctrine the associated Entity with the given document
            $entity = $repository->find($documentId);
            $this->entityManager->clear();

            if (!is_null($entity)) {
                continue;
            }

            if ($this->verbose) {
                $this->output->writeln(self::CLEAR_LINE . "Entity not found: {$documentId}");
            }

            # Remove document from ElasticSearch
            $this->nbrEntitiesRemoved++;

            if ($this->dryRun) {
                continue;
            }

            $response = $type->deleteById($documentId);

            if ($response->hasError()&& $this->verbose) {
                $this->output->writeln(self::CLEAR_LINE . "\tError: {$response->getError()}");
            }
        }
    }

    /**
     * @param string $sType
     * @return int
     */
    private function countAllResult($sType)
    {
        $index = $this->getIndexFromType($sType);
        $query = new Query();

        return $index->count($query);
    }

    /**
     * @param string $sType
     * @return int
     */
    private function processBatch($sType)
    {
        $this->output->writeln('<info>' . self::completeLine("Start Exodus '{$sType}'") . '</info>');

        $repository = $this->getRepositoryFromType($sType);
        $index      = $this->getIndexFromType($sType);

        $countTotalDocuments = $this->countAllResult($sType);

        $progressBar = $this->getProgressBar($this->output, $countTotalDocuments);
        $this->initCounter();

        $search = new Search($this->getClient($sType));
        $search->addIndex($index)->addType($sType);
        $search->scroll('2h');
        $search->getQuery()->setSize($this->batch);

        $scroll = new Scroll($search);

        foreach ($scroll as $scrollId => $resultSet) {
            $this->removeFromElasticSearch($index->getType($sType), $repository, $resultSet);

            $this->nbrDone += $resultSet->count();
            $progressBar->setMessage("{$this->nbrDone}/{$countTotalDocuments}");
            $progressBar->setProgress($this->nbrDone);

            unset($resultSet);
            gc_collect_cycles();
        }

        $progressBar->finish();

        unset($progressBar);

        $this->output->writeln(self::CLEAR_LINE . "{$sType}: Documents tested: {$this->nbrDocumentTested} Entities removed: {$this->nbrEntitiesRemoved}");
        $this->output->writeln('<info>' . self::completeLine("Finish Exodus '{$sType}'") . '</info>');

        gc_collect_cycles();

        return AbstractCommand::EXIT_SUCCESS;
    }

    private function initCounter()
    {
        $this->nbrEntitiesRemoved = 0;
        $this->nbrDocumentTested = 0;
        $this->nbrDocumentsFound = 0;
        $this->nbrDone = 0;
    }

}