includes/jobqueue/jobs/CategoryMembershipChangeJob.php
<?php
/**
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 2 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License along
* with this program; if not, write to the Free Software Foundation, Inc.,
* 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
* http://www.gnu.org/copyleft/gpl.html
*
* @file
*/
use MediaWiki\MainConfigNames;
use MediaWiki\MediaWikiServices;
use MediaWiki\Page\PageIdentity;
use MediaWiki\Revision\RevisionRecord;
use MediaWiki\Revision\RevisionStoreRecord;
use MediaWiki\Title\Title;
use Wikimedia\Rdbms\LBFactory;
use Wikimedia\Rdbms\SelectQueryBuilder;
/**
* Job to add recent change entries mentioning category membership changes
*
* This allows users to easily scan categories for recent page membership changes
*
* Parameters include:
* - pageId : page ID
* - revTimestamp : timestamp of the triggering revision
*
* Category changes will be mentioned for revisions at/after the timestamp for this page
*
* @since 1.27
* @ingroup JobQueue
*/
class CategoryMembershipChangeJob extends Job {
/** @var int|null */
private $ticket;
private const ENQUEUE_FUDGE_SEC = 60;
/**
* @param PageIdentity $page the page for which to update category membership.
* @param string $revisionTimestamp The timestamp of the new revision that triggered the job.
* @return JobSpecification
*/
public static function newSpec( PageIdentity $page, $revisionTimestamp ) {
return new JobSpecification(
'categoryMembershipChange',
[
'pageId' => $page->getId(),
'revTimestamp' => $revisionTimestamp,
],
[
'removeDuplicates' => true,
'removeDuplicatesIgnoreParams' => [ 'revTimestamp' ]
],
$page
);
}
/**
* Constructor for use by the Job Queue infrastructure.
* @note Don't call this when queueing a new instance, use newSpec() instead.
* @param PageIdentity $page the categorized page.
* @param array $params Such latest revision instance of the categorized page.
*/
public function __construct( PageIdentity $page, array $params ) {
parent::__construct( 'categoryMembershipChange', $page, $params );
// Only need one job per page. Note that ENQUEUE_FUDGE_SEC handles races where an
// older revision job gets inserted while the newer revision job is de-duplicated.
$this->removeDuplicates = true;
}
public function run() {
$services = MediaWikiServices::getInstance();
$lbFactory = $services->getDBLoadBalancerFactory();
$lb = $lbFactory->getMainLB();
$dbw = $lb->getConnection( DB_PRIMARY );
$this->ticket = $lbFactory->getEmptyTransactionTicket( __METHOD__ );
$page = $services->getWikiPageFactory()->newFromID( $this->params['pageId'], IDBAccessObject::READ_LATEST );
if ( !$page ) {
$this->setLastError( "Could not find page #{$this->params['pageId']}" );
return false; // deleted?
}
// Cut down on the time spent in waitForPrimaryPos() in the critical section
$dbr = $lb->getConnection( DB_REPLICA );
if ( !$lb->waitForPrimaryPos( $dbr ) ) {
$this->setLastError( "Timed out while pre-waiting for replica DB to catch up" );
return false;
}
// Use a named lock so that jobs for this page see each others' changes
$lockKey = "{$dbw->getDomainID()}:CategoryMembershipChange:{$page->getId()}"; // per-wiki
$scopedLock = $dbw->getScopedLockAndFlush( $lockKey, __METHOD__, 3 );
if ( !$scopedLock ) {
$this->setLastError( "Could not acquire lock '$lockKey'" );
return false;
}
// Wait till replica DB is caught up so that jobs for this page see each others' changes
if ( !$lb->waitForPrimaryPos( $dbr ) ) {
$this->setLastError( "Timed out while waiting for replica DB to catch up" );
return false;
}
// Clear any stale REPEATABLE-READ snapshot
$dbr->flushSnapshot( __METHOD__ );
$cutoffUnix = wfTimestamp( TS_UNIX, $this->params['revTimestamp'] );
// Using ENQUEUE_FUDGE_SEC handles jobs inserted out of revision order due to the delay
// between COMMIT and actual enqueueing of the CategoryMembershipChangeJob job.
$cutoffUnix -= self::ENQUEUE_FUDGE_SEC;
// Get the newest page revision that has a SRC_CATEGORIZE row.
// Assume that category changes before it were already handled.
$subQuery = $dbr->newSelectQueryBuilder()
->select( '1' )
->from( 'recentchanges' )
->where( 'rc_this_oldid = rev_id' )
->andWhere( [ 'rc_source' => RecentChange::SRC_CATEGORIZE ] );
$row = $dbr->newSelectQueryBuilder()
->select( [ 'rev_timestamp', 'rev_id' ] )
->from( 'revision' )
->where( [ 'rev_page' => $page->getId() ] )
->andWhere( $dbr->expr( 'rev_timestamp', '>=', $dbr->timestamp( $cutoffUnix ) ) )
->andWhere( 'EXISTS (' . $subQuery->caller( __METHOD__ )->getSQL() . ')' )
->orderBy( [ 'rev_timestamp', 'rev_id' ], SelectQueryBuilder::SORT_DESC )
->caller( __METHOD__ )->fetchRow();
// Only consider revisions newer than any such revision
if ( $row ) {
$cutoffUnix = wfTimestamp( TS_UNIX, $row->rev_timestamp );
$lastRevId = (int)$row->rev_id;
} else {
$lastRevId = 0;
}
// Find revisions to this page made around and after this revision which lack category
// notifications in recent changes. This lets jobs pick up were the last one left off.
$revisionStore = $services->getRevisionStore();
$res = $revisionStore->newSelectQueryBuilder( $dbr )
->joinComment()
->where( [
'rev_page' => $page->getId(),
$dbr->buildComparison( '>', [
'rev_timestamp' => $dbr->timestamp( $cutoffUnix ),
'rev_id' => $lastRevId,
] )
] )
->orderBy( [ 'rev_timestamp', 'rev_id' ], SelectQueryBuilder::SORT_ASC )
->caller( __METHOD__ )->fetchResultSet();
// Apply all category updates in revision timestamp order
foreach ( $res as $row ) {
$this->notifyUpdatesForRevision( $lbFactory, $page, $revisionStore->newRevisionFromRow( $row ) );
}
return true;
}
/**
* @param LBFactory $lbFactory
* @param WikiPage $page
* @param RevisionRecord $newRev
*/
protected function notifyUpdatesForRevision(
LBFactory $lbFactory, WikiPage $page, RevisionRecord $newRev
) {
$title = $page->getTitle();
// Get the new revision
if ( $newRev->isDeleted( RevisionRecord::DELETED_TEXT ) ) {
return;
}
$services = MediaWikiServices::getInstance();
// Get the prior revision (the same for null edits)
if ( $newRev->getParentId() ) {
$oldRev = $services->getRevisionLookup()
->getRevisionById( $newRev->getParentId(), IDBAccessObject::READ_LATEST );
if ( !$oldRev || $oldRev->isDeleted( RevisionRecord::DELETED_TEXT ) ) {
return;
}
} else {
$oldRev = null;
}
// Parse the new revision and get the categories
$categoryChanges = $this->getExplicitCategoriesChanges( $page, $newRev, $oldRev );
[ $categoryInserts, $categoryDeletes ] = $categoryChanges;
if ( !$categoryInserts && !$categoryDeletes ) {
return; // nothing to do
}
$blc = $services->getBacklinkCacheFactory()->getBacklinkCache( $title );
$catMembChange = new CategoryMembershipChange( $title, $blc, $newRev );
$catMembChange->checkTemplateLinks();
$batchSize = $services->getMainConfig()->get( MainConfigNames::UpdateRowsPerQuery );
$insertCount = 0;
foreach ( $categoryInserts as $categoryName ) {
$categoryTitle = Title::makeTitle( NS_CATEGORY, $categoryName );
$catMembChange->triggerCategoryAddedNotification( $categoryTitle );
if ( $insertCount++ && ( $insertCount % $batchSize ) == 0 ) {
$lbFactory->commitAndWaitForReplication( __METHOD__, $this->ticket );
}
}
foreach ( $categoryDeletes as $categoryName ) {
$categoryTitle = Title::makeTitle( NS_CATEGORY, $categoryName );
$catMembChange->triggerCategoryRemovedNotification( $categoryTitle );
if ( $insertCount++ && ( $insertCount++ % $batchSize ) == 0 ) {
$lbFactory->commitAndWaitForReplication( __METHOD__, $this->ticket );
}
}
}
private function getExplicitCategoriesChanges(
WikiPage $page, RevisionRecord $newRev, RevisionRecord $oldRev = null
) {
// Inject the same timestamp for both revision parses to avoid seeing category changes
// due to time-based parser functions. Inject the same page title for the parses too.
// Note that REPEATABLE-READ makes template/file pages appear unchanged between parses.
$parseTimestamp = $newRev->getTimestamp();
// Parse the old rev and get the categories. Do not use link tables as that
// assumes these updates are perfectly FIFO and that link tables are always
// up to date, neither of which are true.
$oldCategories = $oldRev
? $this->getCategoriesAtRev( $page, $oldRev, $parseTimestamp )
: [];
// Parse the new revision and get the categories
$newCategories = $this->getCategoriesAtRev( $page, $newRev, $parseTimestamp );
$categoryInserts = array_values( array_diff( $newCategories, $oldCategories ) );
$categoryDeletes = array_values( array_diff( $oldCategories, $newCategories ) );
return [ $categoryInserts, $categoryDeletes ];
}
/**
* @param WikiPage $page
* @param RevisionRecord $rev
* @param string $parseTimestamp TS_MW
*
* @return string[] category names
*/
private function getCategoriesAtRev( WikiPage $page, RevisionRecord $rev, $parseTimestamp ) {
$services = MediaWikiServices::getInstance();
$options = $page->makeParserOptions( 'canonical' );
$options->setTimestamp( $parseTimestamp );
$options->setRenderReason( 'CategoryMembershipChangeJob' );
$output = $rev instanceof RevisionStoreRecord && $rev->isCurrent()
? $services->getParserCache()->get( $page, $options )
: null;
if ( !$output || $output->getCacheRevisionId() !== $rev->getId() ) {
$output = $services->getRevisionRenderer()->getRenderedRevision( $rev, $options )
->getRevisionParserOutput();
}
// array keys will cast numeric category names to ints;
// ::getCategoryNames() is careful to cast them back to strings
// to avoid breaking things!
return $output->getCategoryNames();
}
public function getDeduplicationInfo() {
$info = parent::getDeduplicationInfo();
unset( $info['params']['revTimestamp'] ); // first job wins
return $info;
}
}