wikimedia/mediawiki-extensions-Wikibase

View on GitHub
repo/includes/Store/Sql/ChangesSubscriptionTableBuilder.php

Summary

Maintainability
A
35 mins
Test Coverage
<?php

namespace Wikibase\Repo\Store\Sql;

use InvalidArgumentException;
use Onoi\MessageReporter\MessageReporter;
use Onoi\MessageReporter\NullMessageReporter;
use Wikibase\DataModel\Entity\Item;
use Wikibase\DataModel\Entity\ItemId;
use Wikibase\DataModel\Services\EntityId\EntityIdComposer;
use Wikibase\Lib\Rdbms\RepoDomainDb;
use Wikibase\Lib\Reporting\ExceptionHandler;
use Wikibase\Lib\Reporting\LogWarningExceptionHandler;
use Wikimedia\Rdbms\IDatabase;
use Wikimedia\Rdbms\IResultWrapper;

/**
 * Implements initial population (priming) for the wb_changes_subscription table,
 * based on the wb_items_per_site. Any wiki linked via the wb_items_per_site table
 * will be considered a subscriber.
 *
 * @license GPL-2.0-or-later
 * @author Daniel Kinzler
 */
class ChangesSubscriptionTableBuilder {

    /**
     * @var RepoDomainDb
     */
    private $db;

    /**
     * @var EntityIdComposer
     */
    private $entityIdComposer;

    /**
     * @var string
     */
    private $tableName;

    /**
     * @var int
     */
    private $batchSize;

    /**
     * @var ExceptionHandler
     */
    private $exceptionHandler;

    /**
     * @var MessageReporter
     */
    private $progressReporter;

    /**
     * @var string 'verbose' or 'standard'
     */
    private $verbosity;

    /**
     * @param RepoDomainDb $db
     * @param EntityIdComposer $entityIdComposer
     * @param string $tableName
     * @param int $batchSize
     * @param string $verbosity Either 'standard' or 'verbose'
     *
     * @throws InvalidArgumentException
     */
    public function __construct(
        RepoDomainDb $db,
        EntityIdComposer $entityIdComposer,
        $tableName,
        $batchSize,
        $verbosity = 'standard'
    ) {
        if ( !is_string( $tableName ) ) {
            throw new InvalidArgumentException( '$tableName must be a string' );
        }

        if ( !is_int( $batchSize ) || $batchSize < 1 ) {
            throw new InvalidArgumentException( '$batchSize must be an integer >= 1' );
        }

        if ( $verbosity !== 'standard' && $verbosity !== 'verbose' ) {
            throw new InvalidArgumentException( '$verbosity must be either "verbose"'
                . ' or "standard".' );
        }

        $this->db = $db;
        $this->entityIdComposer = $entityIdComposer;
        $this->tableName = $tableName;
        $this->batchSize = $batchSize;
        $this->verbosity = $verbosity;

        $this->exceptionHandler = new LogWarningExceptionHandler();
        $this->progressReporter = new NullMessageReporter();
    }

    public function setProgressReporter( MessageReporter $progressReporter ) {
        $this->progressReporter = $progressReporter;
    }

    public function setExceptionHandler( ExceptionHandler $exceptionHandler ) {
        $this->exceptionHandler = $exceptionHandler;
    }

    /**
     * Fill the subscription table with rows based on entries in wb_items_per_site.
     *
     * @param ItemId|null $startItem The item to start with.
     */
    public function fillSubscriptionTable( ItemId $startItem = null ) {
        $continuation = $startItem === null ? null : [ $startItem->getNumericId(), 0 ];

        while ( true ) {
            $count = $this->processSubscriptionBatch( $continuation );

            if ( $count > 0 ) {
                $this->progressReporter->reportMessage( 'Populating subscription table: '
                    // @phan-suppress-next-line PhanTypePossiblyInvalidDimOffset
                    . "inserted $count subscriptions, continuing at item #{$continuation[0]}." );
            } else {
                break;
            }
        }
    }

    /**
     * @param array &$continuation
     *
     * @return int The number of subscriptions inserted.
     */
    private function processSubscriptionBatch( &$continuation = [] ) {
        $dbw = $this->db->connections()->getWriteConnection();

        $subscriptionsPerItemBatch = $this->getSubscriptionsPerItemBatch( $dbw, $continuation );

        if ( !$subscriptionsPerItemBatch ) {
            return 0;
        }

        $count = $this->insertSubscriptionBatch( $dbw, $subscriptionsPerItemBatch );

        return $count;
    }

    /**
     * @param IDatabase $db
     * @param array[] $subscriptionsPerItem
     *
     * @return int The number of rows inserted.
     */
    private function insertSubscriptionBatch( IDatabase $db, array $subscriptionsPerItem ) {
        $db->startAtomic( __METHOD__ );

        $c = 0;
        foreach ( $subscriptionsPerItem as $itemId => $subscribers ) {
            $rows = $this->makeSubscriptionRows( $itemId, $subscribers );

            $db->newInsertQueryBuilder()
                ->insertInto( $this->tableName )
                ->ignore()
                ->rows( $rows )
                ->caller( __METHOD__ )->execute();

            if ( $this->verbosity === 'verbose' ) {
                $this->progressReporter->reportMessage( 'Inserted ' . $db->affectedRows()
                    . ' into wb_changes_subscription' );
            }

            $c += count( $rows );
        }

        $db->endAtomic( __METHOD__ );
        return $c;
    }

    /**
     * @param IDatabase $db
     * @param array &$continuation
     *
     * @return array[] An associative array mapping item IDs to lists of site IDs.
     */
    private function getSubscriptionsPerItemBatch( IDatabase $db, &$continuation = [] ) {
        $queryBuilder = $db->newSelectQueryBuilder();
        $queryBuilder->select( [ 'ips_row_id', 'ips_item_id', 'ips_site_id' ] )
            ->from( 'wb_items_per_site' );

        $continuationMsg = '';
        if ( $continuation ) {
            [ $fromItemId, $fromRowId ] = $continuation;
            $continuationCondition = $db->buildComparison( '>', [
                'ips_item_id' => (int)$fromItemId,
                'ips_row_id' => $fromRowId,
            ] );
            $queryBuilder->where( $continuationCondition );
            $continuationMsg = ' with continuation: ' . $continuationCondition;
        }

        $queryBuilder->orderBy( [ 'ips_item_id', 'ips_row_id' ] )
            ->limit( $this->batchSize );
        $res = $queryBuilder->caller( __METHOD__ )->fetchResultSet();

        if ( $this->verbosity === 'verbose' ) {
            $this->progressReporter->reportMessage( 'Selected ' . $res->numRows() . ' wb_item_per_site records'
                . $continuationMsg );
        }

        // @phan-suppress-next-line PhanTypeMismatchArgumentNullable False positive Pass-by-ref
        return $this->getSubscriptionsPerItemFromRows( $res, $continuation );
    }

    /**
     * @param IResultWrapper $res A result set with the ips_item_id and ips_site_id fields
     *        set for each row.
     * @param array &$continuation Single item ID => site ID pair or empty.
     *
     * @return array[] An associative array mapping item IDs to lists of site IDs.
     */
    private function getSubscriptionsPerItemFromRows(
        IResultWrapper $res,
        &$continuation = []
    ) {
        $subscriptionsPerItem = [];

        $currentItemId = 0;
        $itemId = '';

        foreach ( $res as $row ) {
            if ( $row->ips_item_id != $currentItemId ) {
                $currentItemId = $row->ips_item_id;
                $itemId = $this->entityIdComposer
                    ->composeEntityId( Item::ENTITY_TYPE, $currentItemId )
                    ->getSerialization();
            }

            $subscriptionsPerItem[$itemId][] = $row->ips_site_id;
            $continuation = [ $currentItemId, $row->ips_row_id ];
        }

        return $subscriptionsPerItem;
    }

    /**
     * Returns a list of rows for insertion, using Database's multi-row insert mechanism.
     * Each row is represented as [ $itemId, $subscriber ].
     *
     * @param string $itemId
     * @param string[] $subscribers
     *
     * @return array[] rows
     */
    private function makeSubscriptionRows( $itemId, array $subscribers ) {
        $rows = [];

        foreach ( $subscribers as $subscriber ) {
            $rows[] = [
                'cs_entity_id' => $itemId,
                'cs_subscriber_id' => $subscriber,
            ];
        }

        return $rows;
    }

}