wikimedia/mediawiki-extensions-CirrusSearch

View on GitHub
includes/Elastica/ReindexTask.php

Summary

Maintainability
B
5 hrs
Test Coverage
<?php

namespace CirrusSearch\Elastica;

use Elastica\Client;
use Elastica\Exception\ResponseException;
use Elastica\Exception\RuntimeException;
use Elastica\Request;
use LogicException;
use MediaWiki\Logger\LoggerFactory;

class ReindexTask {
    /** @var Client */
    private $client;
    /** @var string */
    private $taskId;
    /** @var ReindexResponse|null */
    private $response;
    /** @var \Psr\Log\LoggerInterface */
    private $log;

    /**
     * @param Client $client
     * @param string $taskId
     */
    public function __construct( Client $client, $taskId ) {
        $this->client = $client;
        $this->taskId = $taskId;
        $this->log = LoggerFactory::getInstance( 'CirrusSearch' );
    }

    /**
     * @return string
     */
    public function getId() {
        return $this->taskId;
    }

    /**
     * @param bool $check When true queries the remote
     *  to see if the task is complete. Otherwise reports
     *  last requested status.
     * @return bool True if the reindex task is complete
     */
    public function isComplete( $check = false ) {
        return $this->response !== null;
    }

    /**
     * Cancel the in-progress reindex task.
     * @return bool True if cancel was succesful
     */
    public function cancel() {
        if ( $this->response ) {
            throw new LogicException( 'Cannot cancel completed task' );
        }

        $response = $this->client->request( "_tasks/{$this->taskId}/_cancel", Request::POST );

        return $response->isOK();
    }

    /**
     * Delete the task
     * @return bool True if delete was successful, false otherwise.
     *  Throws Elastica NotFoundException for unknown task (already
     *  deleted?) or HttpException for communication failures.
     */
    public function delete() {
        if ( !$this->response ) {
            throw new LogicException( 'Cannot delete in-progress task' );
        }
        $response =
            $this->client->getIndex( '.tasks' )->deleteById( $this->taskId );

        return $response->isOK();
    }

    /**
     * Get the final result of the reindexing task.
     *
     * @return ReindexResponse|null The result of the reindex, or null
     *  if the reindex is still running. self::getStatus must be used
     *  to update the task completion status.
     */
    public function getResponse() {
        return $this->response;
    }

    /**
     * @return ReindexStatus|null The status of the reindex, or null
     *  on failure. Transport may also throw exceptions for network
     *  failures.
     */
    public function getStatus() {
        if ( $this->response ) {
            // task complete
            return $this->response;
        }

        $response = $this->client->request( "_tasks/{$this->taskId}", Request::GET );
        if ( !$response->isOK() ) {
            $lastRequest = $this->client->getLastRequest();
            if ( $lastRequest !== null ) {
                throw new ResponseException( $lastRequest, $response );
            } else {
                throw new RuntimeException( "Client::getLastRequest() should not be null" );
            }
        }
        $data = $response->getData();
        $status = $data['task']['status'];

        if ( isset( $data['response'] ) ) {
            // task complete
            $this->response = new ReindexResponse( $data['response'] );

            return $this->response;
        }

        /**
         * the task.status.slices array contains null for each incomplete child
         * task. This fetches the children and merges their status in.
         */
        if ( isset( $data['task']['status']['slices'] ) ) {
            $childResponse = $this->client->request( "_tasks", Request::GET, [], [
                'parent_task_id' => $this->taskId,
                'detailed' => 'true',
            ] );
            if ( $childResponse->isOK() ) {
                $status = $this->mergeStatusWithChildren( $status, $childResponse->getData() );
            }
        }

        return new ReindexStatus( $status );
    }

    private function mergeStatusWithChildren( array $status, array $childResponse ) {
        foreach ( $childResponse['nodes'] as $nodeData ) {
            foreach ( $nodeData['tasks'] as $taskId => $childData ) {
                $sliceId = $childData['status']['slice_id'];
                $status['slices'][$sliceId] = $childData['status'];
            }
        }

        // Below mimics org.elasticsearch.action.bulk.byscroll.BulkByScrollTask.Status::Status
        // except that class doesn't have data about in-progress task's.
        $status['total'] = 0;
        $status['updated'] = 0;
        $status['created'] = 0;
        $status['deleted'] = 0;
        $status['batches'] = 0;
        $status['version_conflicts'] = 0;
        $status['noops'] = 0;
        $status['bulkRetries'] = 0;
        $status['searchRetries'] = 0;
        $status['throttled_millis'] = 0;
        $status['requests_per_second'] = 0;
        $status['throttled_until_millis'] = PHP_INT_MAX;
        $sliceFields = [
            'total',
            'updated',
            'created',
            'deleted',
            'batches',
            'version_conflicts',
            'noops',
            'retries',
            'throttled_millis',
            'requests_per_second',
            'throttled_until_millis',
        ];
        foreach ( $status['slices'] as $slice ) {
            if ( $slice === null ) {
                // slice has failed catastrophically
                continue;
            }
            $missing_status_fields = array_diff_key( array_flip( $sliceFields ), $slice );
            if ( $missing_status_fields !== [] ) {
                // slice has missing key fields
                $slice_to_json = json_encode( $slice );
                $this->log->warning( 'Missing key field(s) for reindex task status', [
                    'cirrus_reindex_task_slice' => $slice_to_json,
                    'exact_missing_fields' => $missing_status_fields,
                ] );
                continue;
            }
            $status['total'] += $slice['total'];
            $status['updated'] += $slice['updated'];
            $status['created'] += $slice['created'];
            $status['deleted'] += $slice['deleted'];
            $status['batches'] += $slice['batches'];
            $status['version_conflicts'] += $slice['version_conflicts'];
            $status['noops'] += $slice['noops'];
            $status['retries']['bulk'] += $slice['retries']['bulk'];
            $status['retries']['search'] += $slice['retries']['search'];
            $status['throttled_millis'] += $slice['throttled_millis'];
            $status['requests_per_second'] += $slice['requests_per_second'] === -1 ? INF
                : $slice['requests_per_second'];
            $status['throttled_until_millis'] += min( $status['throttled_until_millis'],
                $slice['throttled_until_millis'] );
        }

        if ( $status['requests_per_second'] === INF ) {
            $status['requests_per_second'] = -1;
        }

        return $status;
    }
}