wikimedia/mediawiki-extensions-CirrusSearch

View on GitHub
includes/Job/JobTraits.php

Summary

Maintainability
A
2 hrs
Test Coverage
<?php

namespace CirrusSearch\Job;

use CirrusSearch\ClusterSettings;
use CirrusSearch\Connection;
use CirrusSearch\ExternalIndex;
use CirrusSearch\HashSearchConfig;
use CirrusSearch\SearchConfig;
use MediaWiki\Logger\LoggerFactory;
use MediaWiki\MainConfigNames;

/**
 * Traits for CirrusSearch Jobs.
 */
trait JobTraits {
    /**
     * @return array
     */
    abstract public function getParams();

    /**
     * @return SearchConfig
     */
    abstract public function getSearchConfig(): SearchConfig;

    /**
     * @return string
     */
    abstract public function getType();

    /**
     * Actually perform the labor of the job.
     * The Job will be retried if true is returned from allowRetries() when
     * this method fails (thrown exception or returning false from this
     * method).
     * @return bool true for success, false for failures
     */
    abstract protected function doJob();

    /**
     * @param int $retryCount The number of times the job has errored out.
     * @return int Number of seconds to delay. With the default minimum exponent
     *  of 6 the possible return values are  64, 128, 256, 512 and 1024 giving a
     *  maximum delay of 17 minutes.
     */
    public function backoffDelay( $retryCount ) {
        $exponent = $this->getSearchConfig()->get( 'CirrusSearchWriteBackoffExponent' );
        $minIncrease = 0;
        if ( $retryCount > 1 ) {
            // Delay at least 2 minutes for everything that fails more than once
            $minIncrease = 1;
        }
        return ceil( pow( 2, $exponent + rand( $minIncrease, min( $retryCount, 4 ) ) ) );
    }

    /**
     * Construct the list of connections suited for this job.
     * NOTE: only suited for jobs that work on multiple clusters by
     * inspecting the 'cluster' job param
     *
     * @param string $updateGroup UpdateGroup::* constant
     * @return Connection[] indexed by cluster name
     */
    protected function decideClusters( string $updateGroup ) {
        $params = $this->getParams();
        $searchConfig = $this->getSearchConfig();
        $jobType = $this->getType();

        $cluster = $params['cluster'] ?? null;
        $assignment = $searchConfig->getClusterAssignment();
        if ( $cluster === null ) {
            $clusterNames = $assignment->getWritableClusters( $updateGroup );
        } elseif ( $assignment->canWriteToCluster( $cluster, $updateGroup ) ) {
            $clusterNames = [ $cluster ];
        } else {
            // Just in case a job is present in the queue but its cluster
            // has been removed from the config file.
            LoggerFactory::getInstance( 'CirrusSearch' )->warning(
                "Received {command} job for unwritable cluster {cluster}",
                [
                    'command' => $jobType,
                    'cluster' => $cluster
                ]
            );
            // this job does not allow retries so we just need to throw an exception
            throw new \RuntimeException( "Received {$jobType} job with {$updateGroup} updates for an unwritable cluster $cluster." );
        }

        $config = $searchConfig;
        if ( isset( $params['external-index'] ) ) {
            $otherIndex = new ExternalIndex( $searchConfig, $params['external-index'] );
            if ( $otherIndex->getCrossClusterName() !== null ) {
                // We assume that the cluster configs is mostly shared across cluster groups
                // e.g. this group config is available in CirrusSearchClusters
                // So that changing the CirrusSearchReplicaGroup to the CrossClusterName of the external
                // index we build the correct config to write to desired replica group.
                $config = new HashSearchConfig( [ 'CirrusSearchReplicaGroup' => $otherIndex->getCrossClusterName() ],
                    [ HashSearchConfig::FLAG_INHERIT ], $config );
            }
        }

        // Limit private data writes, such as archive index, to appropriately
        // flagged clusters
        if ( $params['private_data'] ?? false ) {
            // $clusterNames could be empty after this filter.  All consumers
            // must work appropriately with no connections returned, typically
            // by looping over the connections and doing nothing when no
            // connections are provided.
            $clusterNames = array_filter( $clusterNames, static function ( $name ) use ( $config ) {
                $settings = new ClusterSettings( $config, $name );
                return $settings->isPrivateCluster();
            } );
        }

        $conns = Connection::getClusterConnections( $clusterNames, $config );

        $timeout = $config->get( 'CirrusSearchClientSideUpdateTimeout' );
        foreach ( $conns as $connection ) {
            $connection->setTimeout( $timeout );
        }

        return $conns;
    }

    /**
     * Some boilerplate stuff for all jobs goes here
     *
     * @return bool
     */
    public function run() {
        if ( $this->getSearchConfig()->get( MainConfigNames::DisableSearchUpdate ) ||
            $this->getSearchConfig()->get( 'CirrusSearchDisableUpdate' )
        ) {
            LoggerFactory::getInstance( 'CirrusSearch' )->debug( "Skipping job: search updates disabled by config" );
            return true;
        }

        return $this->doJob();
    }

    /**
     * Get options to enable delayed jobs. Note that this might not be possible the JobQueue
     * implementation handling this job doesn't support it (JobQueueDB) but is possible
     * for the high performance JobQueueRedis.  Note also that delays are minimums -
     * at least JobQueueRedis makes no effort to remove the delay as soon as possible
     * after it has expired.  By default it only checks every five minutes or so.
     * Note yet again that if another delay has been set that is longer then this one
     * then the _longer_ delay stays.
     *
     * @param string $jobClass name of the job class
     * @param int $delay seconds to delay this job if possible
     * @param \JobQueueGroup $jobQueueGroup
     * @return array options to set to add to the job param
     */
    public static function buildJobDelayOptions( $jobClass, $delay, \JobQueueGroup $jobQueueGroup ): array {
        $jobQueue = $jobQueueGroup->get( $jobClass );
        if ( !$delay || !$jobQueue->delayedJobsEnabled() ) {
            return [];
        }
        return [ 'jobReleaseTimestamp' => time() + $delay ];
    }

    /**
     * @param string $clazz
     * @return string
     */
    public static function buildJobName( $clazz ) {
        return 'cirrusSearch' . str_replace( 'CirrusSearch\\Job\\', '', $clazz );
    }
}