wikimedia/mediawiki-core

View on GitHub
includes/deferred/DeferredUpdates.php

Summary

Maintainability
B
6 hrs
Test Coverage
<?php
/**
 * This program is free software; you can redistribute it and/or modify
 * it under the terms of the GNU General Public License as published by
 * the Free Software Foundation; either version 2 of the License, or
 * (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
 * GNU General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License along
 * with this program; if not, write to the Free Software Foundation, Inc.,
 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
 * http://www.gnu.org/copyleft/gpl.html
 *
 * @file
 */

namespace MediaWiki\Deferred;

use ErrorPageError;
use LogicException;
use MediaWiki\Logger\LoggerFactory;
use MWExceptionHandler;
use Throwable;
use Wikimedia\Rdbms\IDatabase;
use Wikimedia\ScopedCallback;

/**
 * Defer callable updates to run later in the PHP process
 *
 * This is a performance feature that enables MediaWiki to produce faster web responses.
 * It allows you to postpone non-blocking work (e.g. work that does not change the web
 * response) to after the HTTP response has been sent to the client (i.e. web browser).
 *
 * Once the response is finalized and sent to the browser, the webserver process stays
 * for a little while longer (detached from the web request) to run your POSTSEND tasks.
 *
 * There is also a PRESEND option, which runs your task right before the finalized response
 * is sent to the browser. This is for critical tasks that does need to block the response,
 * but where you'd like to benefit from other DeferredUpdates features. Such as:
 *
 * - MergeableUpdate: batch updates from different components without coupling
 *   or awareness of each other.
 * - Automatic cancellation: pass a IDatabase object (for any wiki or database) to
 *   DeferredUpdates::addCallableUpdate or AtomicSectionUpdate.
 * - Reducing lock contention: if the response is likely to take several seconds
 *   (e.g. uploading a large file to FileBackend, or saving an edit to a large article)
 *   much of that work may overlap with a database transaction that is staying open for
 *   the entire duration. By moving contentious writes out to a PRESEND update, these
 *   get their own transaction (after the main one is committed), which give up some
 *   atomicity for improved throughput.
 *
 * ## Expectation and comparison to job queue
 *
 * When scheduling a POSTSEND via the DeferredUpdates system you can generally expect
 * it to complete well before the client makes their next request. Updates runs directly after
 * the web response is sent, from the same process on the same server. This unlike the JobQueue,
 * where jobs may need to wait in line for some minutes or hours.
 *
 * If your update fails, this failure is not known to the client and gets no retry. For updates
 * that need re-tries for system consistency or data integrity, it is recommended to implement
 * it as a job instead and use JobQueueGroup::lazyPush. This has the caveat of being delayed
 * by default, the same as any other job.
 *
 * A hybrid solution is available via the EnqueueableDataUpdate interface. By implementing
 * this interface, you can queue your update via the DeferredUpdates first, and if it fails,
 * the system will automatically catch this and queue it as a job instead.
 *
 * ## How it works during web requests
 *
 * 1. Your request route is executed (e.g. Action or SpecialPage class, or API).
 * 2. Output is finalized and main database transaction is committed.
 * 3. PRESEND updates run via DeferredUpdates::doUpdates.
 * 5. The web response is sent to the browser.
 * 6. POSTSEND updates run via DeferredUpdates::doUpdates.
 *
 * @see MediaWiki::preOutputCommit
 * @see MediaWiki::restInPeace
 *
 * ## How it works for Maintenance scripts
 *
 * In CLI mode, no distinction is made between PRESEND and POSTSEND deferred updates,
 * and the queue is periodically executed throughout the process.
 *
 * @see DeferredUpdates::tryOpportunisticExecute
 *
 * ## How it works internally
 *
 * Each update is added via DeferredUpdates::addUpdate and stored in either the PRESEND or
 * POSTSEND queue. If an update gets queued while another update is already running, then
 * we store in a "sub"-queue associated with the current update. This allows nested updates
 * to be completed before other updates, which improves ordering for process caching.
 *
 * @since 1.19
 */
class DeferredUpdates {
    /** @var int Process all updates; in web requests, use only after flushing output buffer */
    public const ALL = 0;
    /** @var int Specify/process updates that should run before flushing output buffer */
    public const PRESEND = 1;
    /** @var int Specify/process updates that should run after flushing output buffer */
    public const POSTSEND = 2;

    /** @var int[] List of "defer until" queue stages that can be reached */
    public const STAGES = [ self::PRESEND, self::POSTSEND ];

    /** @var int Queue size threshold for converting updates into jobs */
    private const BIG_QUEUE_SIZE = 100;

    /** @var DeferredUpdatesScopeStack|null Queue states based on recursion level */
    private static $scopeStack;

    /**
     * @var int Nesting level for preventOpportunisticUpdates()
     */
    private static $preventOpportunisticUpdates = 0;

    /**
     * @return DeferredUpdatesScopeStack
     */
    private static function getScopeStack(): DeferredUpdatesScopeStack {
        self::$scopeStack ??= new DeferredUpdatesScopeMediaWikiStack();
        return self::$scopeStack;
    }

    /**
     * @param DeferredUpdatesScopeStack $scopeStack
     * @internal Only for use in tests.
     */
    public static function setScopeStack( DeferredUpdatesScopeStack $scopeStack ): void {
        if ( !defined( 'MW_PHPUNIT_TEST' ) ) {
            throw new LogicException( 'Cannot reconfigure DeferredUpdates outside tests' );
        }
        self::$scopeStack = $scopeStack;
    }

    /**
     * Add an update to the pending update queue for execution at the appropriate time
     *
     * In CLI mode, callback magic will also be used to run updates when safe
     *
     * If an update is already in progress, then what happens to this update is as follows:
     *  - If it has a "defer until" stage at/before the actual run stage of the innermost
     *    in-progress update, then it will go into the sub-queue of that in-progress update.
     *    As soon as that update completes, MergeableUpdate instances in its sub-queue will be
     *    merged into the top-queues and the non-MergeableUpdate instances will be executed.
     *    This is done to better isolate updates from the failures of other updates and reduce
     *    the chance of race conditions caused by updates not fully seeing the intended changes
     *    of previously enqueued and executed updates.
     *  - If it has a "defer until" stage later than the actual run stage of the innermost
     *    in-progress update, then it will go into the normal top-queue for that stage.
     *
     * @param DeferrableUpdate $update Some object that implements doUpdate()
     * @param int $stage One of (DeferredUpdates::PRESEND, DeferredUpdates::POSTSEND)
     * @since 1.28 Added the $stage parameter
     */
    public static function addUpdate( DeferrableUpdate $update, $stage = self::POSTSEND ) {
        self::getScopeStack()->current()->addUpdate( $update, $stage );
        self::tryOpportunisticExecute();
    }

    /**
     * Add an update to the pending update queue that invokes the specified callback when run
     *
     * @param callable $callable
     * @param int $stage One of (DeferredUpdates::PRESEND, DeferredUpdates::POSTSEND)
     * @param IDatabase|IDatabase[]|null $dbw Cancel the update if a DB transaction
     *  is rolled back [optional]
     * @since 1.27 Added $stage parameter
     * @since 1.28 Added the $dbw parameter
     */
    public static function addCallableUpdate( $callable, $stage = self::POSTSEND, $dbw = null ) {
        self::addUpdate( new MWCallableUpdate( $callable, wfGetCaller(), $dbw ), $stage );
    }

    /**
     * Run an update, and, if an error was thrown, catch/log it and enqueue the update as
     * a job in the job queue system if possible (e.g. implements EnqueueableDataUpdate)
     *
     * @param DeferrableUpdate $update
     * @return Throwable|null
     */
    private static function run( DeferrableUpdate $update ): ?Throwable {
        $logger = LoggerFactory::getInstance( 'DeferredUpdates' );

        $type = get_class( $update )
            . ( $update instanceof DeferrableCallback ? '_' . $update->getOrigin() : '' );
        $updateId = spl_object_id( $update );
        $logger->debug( "DeferredUpdates::run: started $type #{updateId}", [ 'updateId' => $updateId ] );

        $updateException = null;

        $startTime = microtime( true );
        try {
            self::attemptUpdate( $update );
        } catch ( Throwable $updateException ) {
            MWExceptionHandler::logException( $updateException );
            $logger->error(
                "Deferred update '{deferred_type}' failed to run.",
                [
                    'deferred_type' => $type,
                    'exception' => $updateException,
                ]
            );
            self::getScopeStack()->onRunUpdateFailed( $update );
        } finally {
            $walltime = microtime( true ) - $startTime;
            $logger->debug( "DeferredUpdates::run: ended $type #{updateId}, processing time: {walltime}", [
                'updateId' => $updateId,
                'walltime' => $walltime,
            ] );
        }

        // Try to push the update as a job so it can run later if possible
        if ( $updateException && $update instanceof EnqueueableDataUpdate ) {
            try {
                self::getScopeStack()->queueDataUpdate( $update );
            } catch ( Throwable $jobException ) {
                MWExceptionHandler::logException( $jobException );
                $logger->error(
                    "Deferred update '{deferred_type}' failed to enqueue as a job.",
                    [
                        'deferred_type' => $type,
                        'exception' => $jobException,
                    ]
                );
                self::getScopeStack()->onRunUpdateFailed( $update );
            }
        }

        return $updateException;
    }

    /**
     * Consume and execute all pending updates
     *
     * Note that it is rarely the case that this method should be called outside of a few
     * select entry points. For simplicity, that kind of recursion is discouraged. Recursion
     * cannot happen if an explicit transaction round is active, which limits usage to updates
     * with TRX_ROUND_ABSENT that do not leave open any transactions round of their own during
     * the call to this method.
     *
     * In the less-common case of this being called within an in-progress DeferrableUpdate,
     * this will not see any top-queue updates (since they were consumed and are being run
     * inside an outer execution loop). In that case, it will instead operate on the sub-queue
     * of the innermost in-progress update on the stack.
     *
     * @internal For use by MediaWiki, Maintenance, JobRunner, JobExecutor
     * @param int $stage Which updates to process. One of
     *  (DeferredUpdates::PRESEND, DeferredUpdates::POSTSEND, DeferredUpdates::ALL)
     */
    public static function doUpdates( $stage = self::ALL ) {
        /** @var ErrorPageError $guiError First presentable client-level error thrown */
        $guiError = null;
        /** @var Throwable $exception First of any error thrown */
        $exception = null;

        $scope = self::getScopeStack()->current();

        // T249069: recursion is not possible once explicit transaction rounds are involved
        $activeUpdate = $scope->getActiveUpdate();
        if ( $activeUpdate ) {
            $class = get_class( $activeUpdate );
            if ( !( $activeUpdate instanceof TransactionRoundAwareUpdate ) ) {
                throw new LogicException(
                    __METHOD__ . ": reached from $class, which is not TransactionRoundAwareUpdate"
                );
            }
            if ( $activeUpdate->getTransactionRoundRequirement() !== $activeUpdate::TRX_ROUND_ABSENT ) {
                throw new LogicException(
                    __METHOD__ . ": reached from $class, which does not specify TRX_ROUND_ABSENT"
                );
            }
        }

        $scope->processUpdates(
            $stage,
            static function ( DeferrableUpdate $update, $activeStage ) use ( &$guiError, &$exception ) {
                $scopeStack = self::getScopeStack();
                $childScope = $scopeStack->descend( $activeStage, $update );
                try {
                    $e = self::run( $update );
                    $guiError = $guiError ?: ( $e instanceof ErrorPageError ? $e : null );
                    $exception = $exception ?: $e;
                    // Any addUpdate() calls between descend() and ascend() used the sub-queue.
                    // In rare cases, DeferrableUpdate::doUpdates() will process them by calling
                    // doUpdates() itself. In any case, process remaining updates in the subqueue.
                    // them, enqueueing them, or transferring them to the parent scope
                    // queues as appropriate...
                    $childScope->processUpdates(
                        $activeStage,
                        static function ( DeferrableUpdate $sub ) use ( &$guiError, &$exception ) {
                            $e = self::run( $sub );
                            $guiError = $guiError ?: ( $e instanceof ErrorPageError ? $e : null );
                            $exception = $exception ?: $e;
                        }
                    );
                } finally {
                    $scopeStack->ascend();
                }
            }
        );

        // VW-style hack to work around T190178, so we can make sure
        // PageMetaDataUpdater doesn't throw exceptions.
        if ( $exception && defined( 'MW_PHPUNIT_TEST' ) ) {
            throw $exception;
        }

        // Throw the first of any GUI errors as long as the context is HTTP pre-send. However,
        // callers should check permissions *before* enqueueing updates. If the main transaction
        // round actions succeed but some deferred updates fail due to permissions errors then
        // there is a risk that some secondary data was not properly updated.
        if ( $guiError && $stage === self::PRESEND && !headers_sent() ) {
            throw $guiError;
        }
    }

    /**
     * Consume and execute pending updates now if possible, instead of waiting.
     *
     * In web requests, updates are always deferred until the end of the request.
     *
     * In CLI mode, updates run earlier and more often. This is important for long-running
     * Maintenance scripts that would otherwise grow an excessively large queue, which increases
     * memory use, and risks losing all updates if the script ends early or crashes.
     *
     * The folllowing conditions are required for updates to run early in CLI mode:
     *
     * - No update is already in progress (ensure linear flow, recursion guard).
     * - LBFactory indicates that we don't have any "busy" database connections, i.e.
     *   there are no pending writes or otherwise active and uncommitted transactions,
     *   except if the transaction is empty and merely used for primary DB read queries,
     *   in which case the transaction (and its repeatable-read snapshot) can be safely flushed.
     *
     * How this works:
     *
     * - When a maintenance script commits a change or waits for replication, such as
     *   via. IConnectionProvider::commitAndWaitForReplication, then ILBFactory calls
     *   tryOpportunisticExecute(). This is injected via MWLBFactory::applyGlobalState.
     *
     * - For maintenance scripts that don't do much with the database, we also call
     *   tryOpportunisticExecute() after every addUpdate() call.
     *
     * - Upon the completion of Maintenance::execute() via Maintenance::shutdown(),
     *   any remaining updates are run.
     *
     * Note that this method runs both PRESEND and POSTSEND updates and thus should not be called
     * during web requests. It is only intended for long-running Maintenance scripts.
     *
     * @internal For use by Maintenance
     * @since 1.28
     * @return bool Whether updates were allowed to run
     */
    public static function tryOpportunisticExecute(): bool {
        // Leave execution up to the current loop if an update is already in progress
        // or if updates are explicitly disabled
        if ( self::getRecursiveExecutionStackDepth()
            || self::$preventOpportunisticUpdates
        ) {
            return false;
        }

        if ( self::getScopeStack()->allowOpportunisticUpdates() ) {
            self::doUpdates( self::ALL );
            return true;
        }

        if ( self::pendingUpdatesCount() >= self::BIG_QUEUE_SIZE ) {
            // There are a large number of pending updates and none of them can run yet.
            // The odds of losing updates due to an error increases when executing long queues
            // and when large amounts of time pass while tasks are queued. Mitigate this by
            // trying to eagerly move updates to the JobQueue when possible.
            //
            // TODO: Do we still need this now maintenance scripts automatically call
            // tryOpportunisticExecute from addUpdate, from every commit, and every
            // waitForReplication call?
            $enqueuedUpdates = [];
            self::getScopeStack()->current()->consumeMatchingUpdates(
                self::ALL,
                EnqueueableDataUpdate::class,
                static function ( EnqueueableDataUpdate $update ) use ( &$enqueuedUpdates ) {
                    self::getScopeStack()->queueDataUpdate( $update );
                    $type = get_class( $update );
                    $enqueuedUpdates[$type] ??= 0;
                    $enqueuedUpdates[$type]++;
                }
            );
            if ( $enqueuedUpdates ) {
                LoggerFactory::getInstance( 'DeferredUpdates' )->debug(
                    'Enqueued {enqueuedUpdatesCount} updates as jobs',
                    [
                        'enqueuedUpdatesCount' => array_sum( $enqueuedUpdates ),
                        'enqueuedUpdates' => implode( ', ',
                            array_map( fn ( $k, $v ) => "$k: $v", array_keys( $enqueuedUpdates ), $enqueuedUpdates ) ),
                    ]
                );
            }
        }

        return false;
    }

    /**
     * Prevent opportunistic updates until the returned ScopedCallback is
     * consumed.
     *
     * @return ScopedCallback
     */
    public static function preventOpportunisticUpdates() {
        self::$preventOpportunisticUpdates++;
        return new ScopedCallback( static function () {
            self::$preventOpportunisticUpdates--;
        } );
    }

    /**
     * Get the number of pending updates for the current execution context
     *
     * If an update is in progress, then this operates on the sub-queues of the
     * innermost in-progress update. Otherwise, it acts on the top-queues.
     *
     * @return int
     * @since 1.28
     */
    public static function pendingUpdatesCount() {
        return self::getScopeStack()->current()->pendingUpdatesCount();
    }

    /**
     * Get a list of the pending updates for the current execution context
     *
     * If an update is in progress, then this operates on the sub-queues of the
     * innermost in-progress update. Otherwise, it acts on the top-queues.
     *
     * @param int $stage Look for updates with this "defer until" stage. One of
     *  (DeferredUpdates::PRESEND, DeferredUpdates::POSTSEND, DeferredUpdates::ALL)
     * @return DeferrableUpdate[]
     * @internal This method should only be used for unit tests
     * @since 1.29
     */
    public static function getPendingUpdates( $stage = self::ALL ) {
        return self::getScopeStack()->current()->getPendingUpdates( $stage );
    }

    /**
     * Cancel all pending updates for the current execution context
     *
     * If an update is in progress, then this operates on the sub-queues of the
     * innermost in-progress update. Otherwise, it acts on the top-queues.
     *
     * @internal This method should only be used for unit tests
     */
    public static function clearPendingUpdates() {
        self::getScopeStack()->current()->clearPendingUpdates();
    }

    /**
     * Get the number of in-progress calls to DeferredUpdates::doUpdates()
     *
     * @return int
     * @internal This method should only be used for unit tests
     */
    public static function getRecursiveExecutionStackDepth() {
        return self::getScopeStack()->getRecursiveDepth();
    }

    /**
     * Attempt to run an update with the appropriate transaction round state if needed
     *
     * It is allowed for a DeferredUpdate to directly execute one or more other DeferredUpdate
     * instances without queueing them by calling this method. In that case, the outer update
     * must use TransactionRoundAwareUpdate::TRX_ROUND_ABSENT, e.g. by extending
     * TransactionRoundDefiningUpdate, so that this method can give each update its own
     * transaction round.
     *
     * @param DeferrableUpdate $update
     * @since 1.34
     */
    public static function attemptUpdate( DeferrableUpdate $update ) {
        self::getScopeStack()->onRunUpdateStart( $update );

        $update->doUpdate();

        self::getScopeStack()->onRunUpdateEnd( $update );
    }
}

/** @deprecated class alias since 1.42 */
class_alias( DeferredUpdates::class, 'DeferredUpdates' );