wikimedia/mediawiki-core

View on GitHub
includes/objectcache/SqlBagOStuff.php

Summary

Maintainability
F
1 wk
Test Coverage
<?php
/**
 * Object caching using a SQL database.
 *
 * This program is free software; you can redistribute it and/or modify
 * it under the terms of the GNU General Public License as published by
 * the Free Software Foundation; either version 2 of the License, or
 * (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
 * GNU General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License along
 * with this program; if not, write to the Free Software Foundation, Inc.,
 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
 * http://www.gnu.org/copyleft/gpl.html
 *
 * @file
 * @ingroup Cache
 */

use MediaWiki\MediaWikiServices;
use Wikimedia\AtEase\AtEase;
use Wikimedia\Rdbms\Blob;
use Wikimedia\Rdbms\Database;
use Wikimedia\Rdbms\DBConnectionError;
use Wikimedia\Rdbms\DBError;
use Wikimedia\Rdbms\DBQueryError;
use Wikimedia\Rdbms\IDatabase;
use Wikimedia\Rdbms\ILoadBalancer;
use Wikimedia\Rdbms\IMaintainableDatabase;
use Wikimedia\Rdbms\SelectQueryBuilder;
use Wikimedia\ScopedCallback;
use Wikimedia\Timestamp\ConvertibleTimestamp;

/**
 * RDBMS-based caching module
 *
 * The following database sharding schemes are supported:
 *   - None; all keys map to the same shard
 *   - Hash; keys map to shards via consistent hashing
 *
 * The following database replication topologies are supported:
 *   - A primary database server for each shard, all within one datacenter
 *   - A co-primary database server for each shard within each datacenter
 *
 * @ingroup Cache
 */
class SqlBagOStuff extends MediumSpecificBagOStuff {
    /** @var callable|null Injected function which returns a LoadBalancer */
    protected $loadBalancerCallback;
    /** @var ILoadBalancer|null */
    protected $loadBalancer;
    /** @var string|false|null DB name used for keys using the LoadBalancer */
    protected $dbDomain;
    /** @var bool Whether to use the LoadBalancer */
    protected $useLB = false;

    /** @var array[] (server index => server config) */
    protected $serverInfos = [];
    /** @var string[] (server index => tag/host name) */
    protected $serverTags = [];
    /** @var float UNIX timestamp */
    protected $lastGarbageCollect = 0;
    /** @var int Average number of writes required to trigger garbage collection */
    protected $purgePeriod = 10;
    /** @var int Max expired rows to purge during randomized garbage collection */
    protected $purgeLimit = 100;
    /** @var int Number of table shards to use on each server */
    protected $numTableShards = 1;
    /** @var int */
    protected $writeBatchSize = 100;
    /** @var string */
    protected $tableName = 'objectcache';
    /** @var bool Whether to use replicas instead of primaries (if using LoadBalancer) */
    protected $replicaOnly;
    /** @var bool Whether multi-primary mode is enabled */
    protected $multiPrimaryMode;

    /** @var IMaintainableDatabase[] Map of (shard index => DB handle) */
    protected $conns;
    /** @var float[] Map of (shard index => UNIX timestamps) */
    protected $connFailureTimes = [];
    /** @var Exception[] Map of (shard index => Exception) */
    protected $connFailureErrors = [];

    /** @var bool Whether zlib methods are available to PHP */
    private $hasZlib;

    /** A number of seconds well above any expected clock skew */
    private const SAFE_CLOCK_BOUND_SEC = 15;
    /** A number of seconds well above any expected clock skew and replication lag */
    private const SAFE_PURGE_DELAY_SEC = 3600;
    /** Distinct string for tombstones stored in the "serialized" value column */
    private const TOMB_SERIAL = '';
    /** Relative seconds-to-live to use for tombstones */
    private const TOMB_EXPTIME = -self::SAFE_CLOCK_BOUND_SEC;
    /** How many seconds must pass before triggering a garbage collection */
    private const GC_DELAY_SEC = 1;

    private const BLOB_VALUE = 0;
    private const BLOB_EXPIRY = 1;
    private const BLOB_CASTOKEN = 2;

    /**
     * Placeholder timestamp to use for TTL_INDEFINITE that can be stored in all RDBMs types.
     * We use BINARY(14) for MySQL, BLOB for Sqlite, and TIMESTAMPZ for Postgres (which goes
     * up to 294276 AD). The last second of the year 9999 can be stored in all these cases.
     * https://www.postgresql.org/docs/9.0/datatype-datetime.html
     */
    private const INF_TIMESTAMP_PLACEHOLDER = '99991231235959';

    /**
     * Create a new backend instance from parameters injected by ObjectCache::newFromParams()
     *
     * The database servers must be provided by *either* the "server" parameter, the "servers"
     * parameter or the "loadBalancer" parameter.
     *
     * The parameters are as described at {@link \MediaWiki\MainConfigSchema::ObjectCaches}
     * except that:
     *
     *   - the configured "cluster" and main LB fallback modes are implemented by
     *     the wiring by passing "loadBalancerCallback".
     *   - "dbDomain" is required if "loadBalancerCallback" is set, whereas in
     *     config it may be absent.
     *
     * @internal
     *
     * @param array $params
     *   - server: string
     *   - servers: string[]
     *   - loadBalancerCallback: A closure which provides a LoadBalancer object
     *      - dbDomain: string|false
     *   - multiPrimaryMode: bool
     *   - purgePeriod: int|float
     *   - purgeLimit: int
     *   - tableName: string
     *   - shards: int
     *   - replicaOnly: bool
     *   - writeBatchSize: int
     */
    public function __construct( $params ) {
        parent::__construct( $params );

        if ( isset( $params['servers'] ) || isset( $params['server'] ) ) {
            // Configuration uses a direct list of servers.
            // Object data is horizontally partitioned via key hash.
            $index = 0;
            foreach ( ( $params['servers'] ?? [ $params['server'] ] ) as $tag => $info ) {
                $this->serverInfos[$index] = $info;
                // Allow integer-indexes arrays for b/c
                $this->serverTags[$index] = is_string( $tag ) ? $tag : "#$index";
                ++$index;
            }
        } elseif ( isset( $params['loadBalancerCallback'] ) ) {
            $this->loadBalancerCallback = $params['loadBalancerCallback'];
            if ( !isset( $params['dbDomain'] ) ) {
                throw new InvalidArgumentException(
                    __METHOD__ . ": 'dbDomain' is required if 'loadBalancerCallback' is given"
                );
            }
            $this->dbDomain = $params['dbDomain'];
            $this->useLB = true;
        } else {
            throw new InvalidArgumentException(
                __METHOD__ . " requires 'server', 'servers', or 'loadBalancerCallback'"
            );
        }

        $this->purgePeriod = intval( $params['purgePeriod'] ?? $this->purgePeriod );
        $this->purgeLimit = intval( $params['purgeLimit'] ?? $this->purgeLimit );
        $this->tableName = $params['tableName'] ?? $this->tableName;
        $this->numTableShards = intval( $params['shards'] ?? $this->numTableShards );
        $this->writeBatchSize = intval( $params['writeBatchSize'] ?? $this->writeBatchSize );
        $this->replicaOnly = $params['replicaOnly'] ?? false;
        $this->multiPrimaryMode = $params['multiPrimaryMode'] ?? false;

        $this->attrMap[self::ATTR_DURABILITY] = self::QOS_DURABILITY_RDBMS;

        $this->hasZlib = extension_loaded( 'zlib' );
    }

    protected function doGet( $key, $flags = 0, &$casToken = null ) {
        $getToken = ( $casToken === self::PASS_BY_REF );
        $casToken = null;

        $data = $this->fetchBlobs( [ $key ], $getToken )[$key];
        if ( $data ) {
            $result = $this->unserialize( $data[self::BLOB_VALUE] );
            if ( $getToken && $result !== false ) {
                $casToken = $data[self::BLOB_CASTOKEN];
            }
            $valueSize = strlen( $data[self::BLOB_VALUE] );
        } else {
            $result = false;
            $valueSize = false;
        }

        $this->updateOpStats( self::METRIC_OP_GET, [ $key => [ 0, $valueSize ] ] );

        return $result;
    }

    protected function doSet( $key, $value, $exptime = 0, $flags = 0 ) {
        $mtime = $this->getCurrentTime();

        return $this->modifyBlobs(
            [ $this, 'modifyTableSpecificBlobsForSet' ],
            $mtime,
            [ $key => [ $value, $exptime ] ]
        );
    }

    protected function doDelete( $key, $flags = 0 ) {
        $mtime = $this->getCurrentTime();

        return $this->modifyBlobs(
            [ $this, 'modifyTableSpecificBlobsForDelete' ],
            $mtime,
            [ $key => [] ]
        );
    }

    protected function doAdd( $key, $value, $exptime = 0, $flags = 0 ) {
        $mtime = $this->newLockingWriteSectionModificationTimestamp( $key, $scope );
        if ( $mtime === null ) {
            // Timeout or I/O error during lock acquisition
            return false;
        }

        return $this->modifyBlobs(
            [ $this, 'modifyTableSpecificBlobsForAdd' ],
            $mtime,
            [ $key => [ $value, $exptime ] ]
        );
    }

    protected function doCas( $casToken, $key, $value, $exptime = 0, $flags = 0 ) {
        $mtime = $this->newLockingWriteSectionModificationTimestamp( $key, $scope );
        if ( $mtime === null ) {
            // Timeout or I/O error during lock acquisition
            return false;
        }

        return $this->modifyBlobs(
            [ $this, 'modifyTableSpecificBlobsForCas' ],
            $mtime,
            [ $key => [ $value, $exptime, $casToken ] ]
        );
    }

    protected function doChangeTTL( $key, $exptime, $flags ) {
        $mtime = $this->getCurrentTime();

        return $this->modifyBlobs(
            [ $this, 'modifyTableSpecificBlobsForChangeTTL' ],
            $mtime,
            [ $key => [ $exptime ] ]
        );
    }

    protected function doIncrWithInit( $key, $exptime, $step, $init, $flags ) {
        $mtime = $this->getCurrentTime();

        if ( $flags & self::WRITE_BACKGROUND ) {
            $callback = [ $this, 'modifyTableSpecificBlobsForIncrInitAsync' ];
        } else {
            $callback = [ $this, 'modifyTableSpecificBlobsForIncrInit' ];
        }

        $result = $this->modifyBlobs(
            $callback,
            $mtime,
            [ $key => [ $step, $init, $exptime ] ],
            $resByKey
        ) ? $resByKey[$key] : false;

        return $result;
    }

    protected function doGetMulti( array $keys, $flags = 0 ) {
        $result = [];
        $valueSizeByKey = [];

        $dataByKey = $this->fetchBlobs( $keys );
        foreach ( $keys as $key ) {
            $data = $dataByKey[$key];
            if ( $data ) {
                $serialValue = $data[self::BLOB_VALUE];
                $value = $this->unserialize( $serialValue );
                if ( $value !== false ) {
                    $result[$key] = $value;
                }
                $valueSize = strlen( $serialValue );
            } else {
                $valueSize = false;
            }
            $valueSizeByKey[$key] = [ 0, $valueSize ];
        }

        $this->updateOpStats( self::METRIC_OP_GET, $valueSizeByKey );

        return $result;
    }

    protected function doSetMulti( array $data, $exptime = 0, $flags = 0 ) {
        $mtime = $this->getCurrentTime();

        return $this->modifyBlobs(
            [ $this, 'modifyTableSpecificBlobsForSet' ],
            $mtime,
            array_map(
                static function ( $value ) use ( $exptime ) {
                    return [ $value, $exptime ];
                },
                $data
            )
        );
    }

    protected function doDeleteMulti( array $keys, $flags = 0 ) {
        $mtime = $this->getCurrentTime();

        return $this->modifyBlobs(
            [ $this, 'modifyTableSpecificBlobsForDelete' ],
            $mtime,
            array_fill_keys( $keys, [] )
        );
    }

    public function doChangeTTLMulti( array $keys, $exptime, $flags = 0 ) {
        $mtime = $this->getCurrentTime();

        return $this->modifyBlobs(
            [ $this, 'modifyTableSpecificBlobsForChangeTTL' ],
            $mtime,
            array_fill_keys( $keys, [ $exptime ] )
        );
    }

    /**
     * Get a connection to the specified database
     *
     * @param int $shardIndex Server index
     * @return IMaintainableDatabase
     * @throws DBConnectionError
     * @throws UnexpectedValueException
     */
    private function getConnection( $shardIndex ) {
        if ( $this->useLB ) {
            return $this->getConnectionViaLoadBalancer();
        }

        // Don't keep timing out trying to connect if the server is down
        if (
            isset( $this->connFailureErrors[$shardIndex] ) &&
            ( $this->getCurrentTime() - $this->connFailureTimes[$shardIndex] ) < 60
        ) {
            throw $this->connFailureErrors[$shardIndex];
        }

        if ( isset( $this->serverInfos[$shardIndex] ) ) {
            $server = $this->serverInfos[$shardIndex];
            $conn = $this->getConnectionFromServerInfo( $shardIndex, $server );
        } else {
            throw new UnexpectedValueException( "Invalid server index #$shardIndex" );
        }

        return $conn;
    }

    /**
     * Get the server index and table name for a given key
     * @param string $key
     * @return array (server index, table name)
     */
    private function getKeyLocation( $key ) {
        if ( $this->useLB ) {
            // LoadBalancer based configuration
            $shardIndex = 0;
        } else {
            // Striped array of database servers
            if ( count( $this->serverTags ) == 1 ) {
                $shardIndex = 0; // short-circuit
            } else {
                $sortedServers = $this->serverTags;
                ArrayUtils::consistentHashSort( $sortedServers, $key );
                $shardIndex = array_key_first( $sortedServers );
            }
        }

        if ( $this->numTableShards > 1 ) {
            $hash = hexdec( substr( md5( $key ), 0, 8 ) ) & 0x7fffffff;
            $tableIndex = $hash % $this->numTableShards;
        } else {
            $tableIndex = null;
        }

        return [ $shardIndex, $this->getTableNameByShard( $tableIndex ) ];
    }

    /**
     * Get the table name for a given shard index
     * @param int|null $index
     * @return string
     */
    private function getTableNameByShard( $index ) {
        if ( $index !== null && $this->numTableShards > 1 ) {
            $decimals = strlen( (string)( $this->numTableShards - 1 ) );

            return $this->tableName . sprintf( "%0{$decimals}d", $index );
        }

        return $this->tableName;
    }

    /**
     * @param string[] $keys
     * @param bool $getCasToken Whether to get a CAS token
     * @return array<string,array|null> Order-preserved map of (key => (value,expiry,token) or null)
     */
    private function fetchBlobs( array $keys, bool $getCasToken = false ) {
        /** @noinspection PhpUnusedLocalVariableInspection */
        $silenceScope = $this->silenceTransactionProfiler();

        // Initialize order-preserved per-key results; set values for live keys below
        $dataByKey = array_fill_keys( $keys, null );

        $readTime = (int)$this->getCurrentTime();
        $keysByTableByShard = [];
        foreach ( $keys as $key ) {
            [ $shardIndex, $partitionTable ] = $this->getKeyLocation( $key );
            $keysByTableByShard[$shardIndex][$partitionTable][] = $key;
        }

        foreach ( $keysByTableByShard as $shardIndex => $serverKeys ) {
            try {
                $db = $this->getConnection( $shardIndex );
                foreach ( $serverKeys as $partitionTable => $tableKeys ) {
                    $res = $db->newSelectQueryBuilder()
                        ->select(
                            $getCasToken
                            ? $this->addCasTokenFields( $db, [ 'keyname', 'value', 'exptime' ] )
                            : [ 'keyname', 'value', 'exptime' ] )
                        ->from( $partitionTable )
                        ->where( $this->buildExistenceConditions( $db, $tableKeys, $readTime ) )
                        ->caller( __METHOD__ )
                        ->fetchResultSet();
                    foreach ( $res as $row ) {
                        $row->shardIndex = $shardIndex;
                        $row->tableName = $partitionTable;
                        $dataByKey[$row->keyname] = $row;
                    }
                }
            } catch ( DBError $e ) {
                $this->handleDBError( $e, $shardIndex );
            }
        }

        foreach ( $keys as $key ) {
            $row = $dataByKey[$key] ?? null;
            if ( !$row ) {
                continue;
            }

            $this->debug( __METHOD__ . ": retrieved $key; expiry time is {$row->exptime}" );
            try {
                $db = $this->getConnection( $row->shardIndex );
                $dataByKey[$key] = [
                    self::BLOB_VALUE => $this->dbDecodeSerialValue( $db, $row->value ),
                    self::BLOB_EXPIRY => $this->decodeDbExpiry( $db, $row->exptime ),
                    self::BLOB_CASTOKEN => $getCasToken
                        ? $this->getCasTokenFromRow( $db, $row )
                        : null
                ];
            } catch ( DBQueryError $e ) {
                $this->handleDBError( $e, $row->shardIndex );
            }
        }

        return $dataByKey;
    }

    /**
     * @param callable $tableWriteCallback Callback the takes the following arguments:
     *  - IDatabase instance
     *  - Partition table name string
     *  - UNIX modification timestamp
     *  - Map of (key => list of arguments) for keys belonging to the server/table partition
     *  - Map of (key => result) [returned]
     * @param float $mtime UNIX modification timestamp
     * @param array<string,array> $argsByKey Map of (key => list of arguments)
     * @param array<string,mixed> &$resByKey Order-preserved map of (key => result) [returned]
     * @return bool Whether all keys were processed
     * @param-taint $argsByKey none
     */
    private function modifyBlobs(
        callable $tableWriteCallback,
        float $mtime,
        array $argsByKey,
        &$resByKey = []
    ) {
        // Initialize order-preserved per-key results; callbacks mark successful results
        $resByKey = array_fill_keys( array_keys( $argsByKey ), false );

        /** @noinspection PhpUnusedLocalVariableInspection */
        $silenceScope = $this->silenceTransactionProfiler();

        $argsByKeyByTableByShard = [];
        foreach ( $argsByKey as $key => $args ) {
            [ $shardIndex, $partitionTable ] = $this->getKeyLocation( $key );
            $argsByKeyByTableByShard[$shardIndex][$partitionTable][$key] = $args;
        }

        $shardIndexesAffected = [];
        foreach ( $argsByKeyByTableByShard as $shardIndex => $argsByKeyByTables ) {
            foreach ( $argsByKeyByTables as $table => $ptKeyArgs ) {
                try {
                    $db = $this->getConnection( $shardIndex );
                    $shardIndexesAffected[] = $shardIndex;
                    $tableWriteCallback( $db, $table, $mtime, $ptKeyArgs, $resByKey );
                } catch ( DBError $e ) {
                    $this->handleDBError( $e, $shardIndex );
                    continue;
                }
            }
        }

        $success = !in_array( false, $resByKey, true );

        foreach ( $shardIndexesAffected as $shardIndex ) {
            try {
                if (
                    // Random purging is enabled
                    $this->purgePeriod &&
                    // Only purge on one in every $this->purgePeriod writes
                    mt_rand( 0, $this->purgePeriod - 1 ) == 0 &&
                    // Avoid repeating the delete within a few seconds
                    ( $this->getCurrentTime() - $this->lastGarbageCollect ) > self::GC_DELAY_SEC
                ) {
                    $this->garbageCollect( $shardIndex );
                }
            } catch ( DBError $e ) {
                $this->handleDBError( $e, $shardIndex );
            }
        }

        return $success;
    }

    /**
     * Set key/value pairs belonging to a partition table on the given server
     *
     * In multi-primary mode, if the current row for a key exists and has a modification token
     * with a greater integral UNIX timestamp than that of the provided modification timestamp,
     * then the write to that key will be aborted with a "false" result. Successfully modified
     * key rows will be assigned a new modification token using the provided timestamp.
     *
     * @param IDatabase $db Handle to the database server where the argument keys belong
     * @param string $ptable Name of the partition table where the argument keys belong
     * @param float $mtime UNIX modification timestamp
     * @param array<string,array> $argsByKey Non-empty (key => (value,exptime)) map
     * @param array<string,mixed> &$resByKey Map of (key => result) for successful writes [returned]
     * @throws DBError
     */
    private function modifyTableSpecificBlobsForSet(
        IDatabase $db,
        string $ptable,
        float $mtime,
        array $argsByKey,
        array &$resByKey
    ) {
        $valueSizesByKey = [];

        $mt = $this->makeTimestampedModificationToken( $mtime, $db );

        $rows = [];
        foreach ( $argsByKey as $key => [ $value, $exptime ] ) {
            $expiry = $this->makeNewKeyExpiry( $exptime, (int)$mtime );
            $serialValue = $this->getSerialized( $value, $key );
            $rows[] = $this->buildUpsertRow( $db, $key, $serialValue, $expiry, $mt );

            $valueSizesByKey[$key] = [ strlen( $serialValue ), 0 ];
        }

        if ( $this->multiPrimaryMode ) {
            $db->newInsertQueryBuilder()
                ->insertInto( $ptable )
                ->rows( $rows )
                ->onDuplicateKeyUpdate()
                ->uniqueIndexFields( [ 'keyname' ] )
                ->set( $this->buildMultiUpsertSetForOverwrite( $db, $mt ) )
                ->caller( __METHOD__ )->execute();
        } else {
            // T288998: use REPLACE, if possible, to avoid cluttering the binlogs
            $db->newReplaceQueryBuilder()
                ->replaceInto( $ptable )
                ->rows( $rows )
                ->uniqueIndexFields( [ 'keyname' ] )
                ->caller( __METHOD__ )->execute();
        }

        foreach ( $argsByKey as $key => $unused ) {
            $resByKey[$key] = true;
        }

        $this->updateOpStats( self::METRIC_OP_SET, $valueSizesByKey );
    }

    /**
     * Purge/tombstone key/value pairs belonging to a partition table on the given server
     *
     * In multi-primary mode, if the current row for a key exists and has a modification token
     * with a greater integral UNIX timestamp than that of the provided modification timestamp,
     * then the write to that key will be aborted with a "false" result. Successfully modified
     * key rows will be assigned a new modification token/timestamp, an empty value, and an
     * expiration timestamp dated slightly before the new modification timestamp.
     *
     * @param IDatabase $db Handle to the database server where the argument keys belong
     * @param string $ptable Name of the partition table where the argument keys belong
     * @param float $mtime UNIX modification timestamp
     * @param array<string,array> $argsByKey Non-empty (key => []) map
     * @param array<string,mixed> &$resByKey Map of (key => result) prefilled with false [returned]
     * @throws DBError
     */
    private function modifyTableSpecificBlobsForDelete(
        IDatabase $db,
        string $ptable,
        float $mtime,
        array $argsByKey,
        array &$resByKey
    ) {
        if ( $this->multiPrimaryMode ) {
            // Tombstone keys in order to respect eventual consistency
            $mt = $this->makeTimestampedModificationToken( $mtime, $db );
            $expiry = $this->makeNewKeyExpiry( self::TOMB_EXPTIME, (int)$mtime );
            $queryBuilder = $db->newInsertQueryBuilder()
                ->insertInto( $ptable )
                ->onDuplicateKeyUpdate()
                ->uniqueIndexFields( [ 'keyname' ] )
                ->set( $this->buildMultiUpsertSetForOverwrite( $db, $mt ) );
            foreach ( $argsByKey as $key => $arg ) {
                $queryBuilder->row( $this->buildUpsertRow( $db, $key, self::TOMB_SERIAL, $expiry, $mt ) );
            }
            $queryBuilder->caller( __METHOD__ )->execute();
        } else {
            // Just purge the keys since there is only one primary (e.g. "source of truth")
            $db->newDeleteQueryBuilder()
                ->deleteFrom( $ptable )
                ->where( [ 'keyname' => array_keys( $argsByKey ) ] )
                ->caller( __METHOD__ )->execute();
        }

        foreach ( $argsByKey as $key => $arg ) {
            $resByKey[$key] = true;
        }

        $this->updateOpStats( self::METRIC_OP_DELETE, array_keys( $argsByKey ) );
    }

    /**
     * Insert key/value pairs belonging to a partition table on the given server
     *
     * If the current row for a key exists and has an integral UNIX timestamp of expiration
     * greater than that of the provided modification timestamp, then the write to that key
     * will be aborted with a "false" result. Acquisition of advisory key locks must be handled
     * by calling functions.
     *
     * In multi-primary mode, if the current row for a key exists and has a modification token
     * with a greater integral UNIX timestamp than that of the provided modification timestamp,
     * then the write to that key will be aborted with a "false" result. Successfully modified
     * key rows will be assigned a new modification token/timestamp.
     *
     * @param IDatabase $db Handle to the database server where the argument keys belong
     * @param string $ptable Name of the partition table where the argument keys belong
     * @param float $mtime UNIX modification timestamp
     * @param array<string,array> $argsByKey Non-empty (key => (value,exptime)) map
     * @param array<string,mixed> &$resByKey Map of (key => result) prefilled with false [returned]
     * @throws DBError
     */
    private function modifyTableSpecificBlobsForAdd(
        IDatabase $db,
        string $ptable,
        float $mtime,
        array $argsByKey,
        array &$resByKey
    ) {
        $valueSizesByKey = [];

        $mt = $this->makeTimestampedModificationToken( $mtime, $db );

        // This check must happen outside the write query to respect eventual consistency
        $existingKeys = $db->newSelectQueryBuilder()
            ->select( 'keyname' )
            ->from( $ptable )
            ->where( $this->buildExistenceConditions( $db, array_keys( $argsByKey ), (int)$mtime ) )
            ->caller( __METHOD__ )
            ->fetchFieldValues();
        $existingByKey = array_fill_keys( $existingKeys, true );

        $rows = [];
        foreach ( $argsByKey as $key => [ $value, $exptime ] ) {
            if ( isset( $existingByKey[$key] ) ) {
                $this->logger->debug( __METHOD__ . ": $key already exists" );
                continue;
            }

            $serialValue = $this->getSerialized( $value, $key );
            $expiry = $this->makeNewKeyExpiry( $exptime, (int)$mtime );
            $valueSizesByKey[$key] = [ strlen( $serialValue ), 0 ];
            $rows[] = $this->buildUpsertRow( $db, $key, $serialValue, $expiry, $mt );
        }
        if ( !$rows ) {
            return;
        }
        $db->newInsertQueryBuilder()
            ->insertInto( $ptable )
            ->rows( $rows )
            ->onDuplicateKeyUpdate()
            ->uniqueIndexFields( [ 'keyname' ] )
            ->set( $this->buildMultiUpsertSetForOverwrite( $db, $mt ) )
            ->caller( __METHOD__ )->execute();

        foreach ( $argsByKey as $key => $unused ) {
            $resByKey[$key] = !isset( $existingByKey[$key] );
        }

        $this->updateOpStats( self::METRIC_OP_ADD, $valueSizesByKey );
    }

    /**
     * Insert key/value pairs belonging to a partition table on the given server
     *
     * If the current row for a key exists, has an integral UNIX timestamp of expiration greater
     * than that of the provided modification timestamp, and the CAS token does not match, then
     * the write to that key will be aborted with a "false" result. Acquisition of advisory key
     * locks must be handled by calling functions.
     *
     * In multi-primary mode, if the current row for a key exists and has a modification token
     * with a greater integral UNIX timestamp than that of the provided modification timestamp,
     * then the write to that key will be aborted with a "false" result. Successfully modified
     * key rows will be assigned a new modification token/timestamp.
     *
     * @param IDatabase $db Handle to the database server where the argument keys belong
     * @param string $ptable Name of the partition table where the argument keys belong
     * @param float $mtime UNIX modification timestamp
     * @param array<string,array> $argsByKey Non-empty (key => (value, exptime, CAS token)) map
     * @param array<string,mixed> &$resByKey Map of (key => result) prefilled with false [returned]
     * @throws DBError
     */
    private function modifyTableSpecificBlobsForCas(
        IDatabase $db,
        string $ptable,
        float $mtime,
        array $argsByKey,
        array &$resByKey
    ) {
        $valueSizesByKey = [];

        $mt = $this->makeTimestampedModificationToken( $mtime, $db );

        // This check must happen outside the write query to respect eventual consistency
        $res = $db->newSelectQueryBuilder()
            ->select( $this->addCasTokenFields( $db, [ 'keyname' ] ) )
            ->from( $ptable )
            ->where( $this->buildExistenceConditions( $db, array_keys( $argsByKey ), (int)$mtime ) )
            ->caller( __METHOD__ )
            ->fetchResultSet();

        $curTokensByKey = [];
        foreach ( $res as $row ) {
            $curTokensByKey[$row->keyname] = $this->getCasTokenFromRow( $db, $row );
        }

        $nonMatchingByKey = [];
        $rows = [];
        foreach ( $argsByKey as $key => [ $value, $exptime, $casToken ] ) {
            $curToken = $curTokensByKey[$key] ?? null;
            if ( $curToken === null ) {
                $nonMatchingByKey[$key] = true;
                $this->logger->debug( __METHOD__ . ": $key does not exists" );
                continue;
            }

            if ( $curToken !== $casToken ) {
                $nonMatchingByKey[$key] = true;
                $this->logger->debug( __METHOD__ . ": $key does not have a matching token" );
                continue;
            }

            $serialValue = $this->getSerialized( $value, $key );
            $expiry = $this->makeNewKeyExpiry( $exptime, (int)$mtime );
            $valueSizesByKey[$key] = [ strlen( $serialValue ), 0 ];

            $rows[] = $this->buildUpsertRow( $db, $key, $serialValue, $expiry, $mt );
        }
        if ( !$rows ) {
            return;
        }
        $db->newInsertQueryBuilder()
            ->insertInto( $ptable )
            ->rows( $rows )
            ->onDuplicateKeyUpdate()
            ->uniqueIndexFields( [ 'keyname' ] )
            ->set( $this->buildMultiUpsertSetForOverwrite( $db, $mt ) )
            ->caller( __METHOD__ )->execute();

        foreach ( $argsByKey as $key => $unused ) {
            $resByKey[$key] = !isset( $nonMatchingByKey[$key] );
        }

        $this->updateOpStats( self::METRIC_OP_CAS, $valueSizesByKey );
    }

    /**
     * Update the TTL for keys belonging to a partition table on the given server
     *
     * If no current row for a key exists or the current row has an integral UNIX timestamp of
     * expiration less than that of the provided modification timestamp, then the write to that
     * key will be aborted with a "false" result.
     *
     * In multi-primary mode, if the current row for a key exists and has a modification token
     * with a greater integral UNIX timestamp than that of the provided modification timestamp,
     * then the write to that key will be aborted with a "false" result. Successfully modified
     * key rows will be assigned a new modification token/timestamp.
     *
     * @param IDatabase $db Handle to the database server where the argument keys belong
     * @param string $ptable Name of the partition table where the argument keys belong
     * @param float $mtime UNIX modification timestamp
     * @param array<string,array> $argsByKey Non-empty (key => (exptime)) map
     * @param array<string,mixed> &$resByKey Map of (key => result) prefilled with false [returned]
     * @throws DBError
     */
    private function modifyTableSpecificBlobsForChangeTTL(
        IDatabase $db,
        string $ptable,
        float $mtime,
        array $argsByKey,
        array &$resByKey
    ) {
        if ( $this->multiPrimaryMode ) {
            $mt = $this->makeTimestampedModificationToken( $mtime, $db );

            $res = $db->newSelectQueryBuilder()
                ->select( [ 'keyname', 'value' ] )
                ->from( $ptable )
                ->where( $this->buildExistenceConditions( $db, array_keys( $argsByKey ), (int)$mtime ) )
                ->caller( __METHOD__ )
                ->fetchResultSet();

            $rows = [];
            $existingKeys = [];
            foreach ( $res as $curRow ) {
                $key = $curRow->keyname;
                $existingKeys[$key] = true;
                $serialValue = $this->dbDecodeSerialValue( $db, $curRow->value );
                [ $exptime ] = $argsByKey[$key];
                $expiry = $this->makeNewKeyExpiry( $exptime, (int)$mtime );
                $rows[] = $this->buildUpsertRow( $db, $key, $serialValue, $expiry, $mt );
            }
            if ( !$rows ) {
                return;
            }
            $db->newInsertQueryBuilder()
                ->insertInto( $ptable )
                ->rows( $rows )
                ->onDuplicateKeyUpdate()
                ->uniqueIndexFields( [ 'keyname' ] )
                ->set( $this->buildMultiUpsertSetForOverwrite( $db, $mt ) )
                ->caller( __METHOD__ )->execute();

            foreach ( $argsByKey as $key => $unused ) {
                $resByKey[$key] = isset( $existingKeys[$key] );
            }
        } else {
            $keysBatchesByExpiry = [];
            foreach ( $argsByKey as $key => [ $exptime ] ) {
                $expiry = $this->makeNewKeyExpiry( $exptime, (int)$mtime );
                $keysBatchesByExpiry[$expiry][] = $key;
            }

            $existingCount = 0;
            foreach ( $keysBatchesByExpiry as $expiry => $keyBatch ) {
                $db->newUpdateQueryBuilder()
                    ->update( $ptable )
                    ->set( [ 'exptime' => $this->encodeDbExpiry( $db, $expiry ) ] )
                    ->where( $this->buildExistenceConditions( $db, $keyBatch, (int)$mtime ) )
                    ->caller( __METHOD__ )->execute();
                $existingCount += $db->affectedRows();
            }
            if ( $existingCount === count( $argsByKey ) ) {
                foreach ( $argsByKey as $key => $args ) {
                    $resByKey[$key] = true;
                }
            }
        }

        $this->updateOpStats( self::METRIC_OP_CHANGE_TTL, array_keys( $argsByKey ) );
    }

    /**
     * Either increment a counter key, if it exists, or initialize it, otherwise
     *
     * If no current row for a key exists or the current row has an integral UNIX timestamp of
     * expiration less than that of the provided modification timestamp, then the key row will
     * be set to the initial value. Otherwise, the current row will be incremented.
     *
     * In multi-primary mode, if the current row for a key exists and has a modification token
     * with a greater integral UNIX timestamp than that of the provided modification timestamp,
     * then the write to that key will be aborted with a "false" result. Successfully initialized
     * key rows will be assigned a new modification token/timestamp.
     *
     * @param IDatabase $db Handle to the database server where the argument keys belong
     * @param string $ptable Name of the partition table where the argument keys belong
     * @param float $mtime UNIX modification timestamp
     * @param array<string,array> $argsByKey Non-empty (key => (step, init, exptime) map
     * @param array<string,mixed> &$resByKey Map of (key => result) prefilled with false [returned]
     * @throws DBError
     */
    private function modifyTableSpecificBlobsForIncrInit(
        IDatabase $db,
        string $ptable,
        float $mtime,
        array $argsByKey,
        array &$resByKey
    ) {
        foreach ( $argsByKey as $key => [ $step, $init, $exptime ] ) {
            $mt = $this->makeTimestampedModificationToken( $mtime, $db );
            $expiry = $this->makeNewKeyExpiry( $exptime, (int)$mtime );

            // Use a transaction so that changes from other threads are not visible due to
            // "consistent reads". This way, the exact post-increment value can be returned.
            // The "live key exists" check can go inside the write query and remain safe for
            // replication since the TTL for such keys is either indefinite or very short.
            $atomic = $db->startAtomic( __METHOD__, IDatabase::ATOMIC_CANCELABLE );
            try {
                $db->newInsertQueryBuilder()
                    ->insertInto( $ptable )
                    ->rows( $this->buildUpsertRow( $db, $key, $init, $expiry, $mt ) )
                    ->onDuplicateKeyUpdate()
                    ->uniqueIndexFields( [ 'keyname' ] )
                    ->set( $this->buildIncrUpsertSet( $db, $step, $init, $expiry, $mt, (int)$mtime ) )
                    ->caller( __METHOD__ )->execute();
                $affectedCount = $db->affectedRows();
                $row = $db->newSelectQueryBuilder()
                    ->select( 'value' )
                    ->from( $ptable )
                    ->where( [ 'keyname' => $key ] )
                    ->caller( __METHOD__ )
                    ->fetchRow();
            } catch ( Exception $e ) {
                $db->cancelAtomic( __METHOD__, $atomic );
                throw $e;
            }
            $db->endAtomic( __METHOD__ );

            if ( !$affectedCount || $row === false ) {
                $this->logger->warning( __METHOD__ . ": failed to set new $key value" );
                continue;
            }

            $serialValue = $this->dbDecodeSerialValue( $db, $row->value );
            if ( !$this->isInteger( $serialValue ) ) {
                $this->logger->warning( __METHOD__ . ": got non-integer $key value" );
                continue;
            }

            $resByKey[$key] = (int)$serialValue;
        }

        $this->updateOpStats( self::METRIC_OP_INCR, array_keys( $argsByKey ) );
    }

    /**
     * Same as modifyTableSpecificBlobsForIncrInit() but does not return the
     * new value.
     *
     * @param IDatabase $db
     * @param string $ptable
     * @param float $mtime
     * @param array<string,array> $argsByKey
     * @param array<string,mixed> &$resByKey
     * @throws DBError
     */
    private function modifyTableSpecificBlobsForIncrInitAsync(
        IDatabase $db,
        string $ptable,
        float $mtime,
        array $argsByKey,
        array &$resByKey
    ) {
        foreach ( $argsByKey as $key => [ $step, $init, $exptime ] ) {
            $mt = $this->makeTimestampedModificationToken( $mtime, $db );
            $expiry = $this->makeNewKeyExpiry( $exptime, (int)$mtime );
            $db->newInsertQueryBuilder()
                ->insertInto( $ptable )
                ->rows( $this->buildUpsertRow( $db, $key, $init, $expiry, $mt ) )
                ->onDuplicateKeyUpdate()
                ->uniqueIndexFields( [ 'keyname' ] )
                ->set( $this->buildIncrUpsertSet( $db, $step, $init, $expiry, $mt, (int)$mtime ) )
                ->caller( __METHOD__ )->execute();
            if ( !$db->affectedRows() ) {
                $this->logger->warning( __METHOD__ . ": failed to set new $key value" );
            } else {
                $resByKey[$key] = true;
            }
        }
    }

    /**
     * @param int $exptime Relative or absolute expiration
     * @param int $nowTsUnix Current UNIX timestamp
     * @return int UNIX timestamp or TTL_INDEFINITE
     */
    private function makeNewKeyExpiry( $exptime, int $nowTsUnix ) {
        $expiry = $this->getExpirationAsTimestamp( $exptime );
        // Eventual consistency requires the preservation of recently modified keys.
        // Do not create rows with `exptime` fields so low that they might get garbage
        // collected before being replicated.
        if ( $expiry !== self::TTL_INDEFINITE ) {
            $expiry = max( $expiry, $nowTsUnix - self::SAFE_CLOCK_BOUND_SEC );
        }

        return $expiry;
    }

    /**
     * Get a scoped lock and modification timestamp for a critical section of reads/writes
     *
     * This is used instead of BagOStuff::getCurrentTime() for certain writes (such as "add",
     * "incr", and "cas"), for which we want to support tight race conditions where the same
     * key is repeatedly written to by multiple web servers that each get to see the previous
     * value, act on it, and modify it in some way.
     *
     * It is assumed that this method is normally only invoked from the primary datacenter.
     * A lock is acquired on the primary server of the local datacenter in order to avoid race
     * conditions within the critical section. The clock on the SQL server is used to get the
     * modification timestamp in order to minimize issues with clock drift between web servers;
     * thus key writes will not be rejected due to some web servers having lagged clocks.
     *
     * @param string $key
     * @param ?ScopedCallback &$scope Unlocker callback; null on failure [returned]
     * @return float|null UNIX timestamp with 6 decimal places; null on failure
     */
    private function newLockingWriteSectionModificationTimestamp( $key, &$scope ) {
        if ( !$this->lock( $key, 0 ) ) {
            return null;
        }

        $scope = new ScopedCallback( function () use ( $key ) {
            $this->unlock( $key );
        } );

        // sprintf is used to adjust precision
        return (float)sprintf( '%.6F', $this->locks[$key][self::LOCK_TIME] );
    }

    /**
     * Make a `modtoken` column value with the original time and source database server of a write
     *
     * @param float $mtime UNIX modification timestamp
     * @param IDatabase $db Handle to the primary database server sourcing the write
     * @return string String of the form "<SECONDS_SOURCE><MICROSECONDS>", where SECONDS_SOURCE
     *  is "<35 bit seconds portion of UNIX time><32 bit database server ID>" as 13 base 36 chars,
     *  and MICROSECONDS is "<20 bit microseconds portion of UNIX time>" as 4 base 36 chars
     */
    private function makeTimestampedModificationToken( float $mtime, IDatabase $db ) {
        // We have reserved space for upto 6 digits in the microsecond portion of the token.
        // This is for future use only (maybe CAS tokens) and not currently used.
        // It is currently populated by the microsecond portion returned by microtime,
        // which generally has fewer than 6 digits of meaningful precision but can still be useful
        // in debugging (to see the token continuously change even during rapid testing).
        $seconds = (int)$mtime;
        [ , $microseconds ] = explode( '.', sprintf( '%.6F', $mtime ) );

        $id = $db->getTopologyBasedServerId() ?? sprintf( '%u', crc32( $db->getServerName() ) );

        $token = implode( '', [
            // 67 bit integral portion of UNIX timestamp, qualified
            \Wikimedia\base_convert(
                // 35 bit integral seconds portion of UNIX timestamp
                str_pad( base_convert( (string)$seconds, 10, 2 ), 35, '0', STR_PAD_LEFT ) .
                // 32 bit ID of the primary database server handling the write
                str_pad( base_convert( $id, 10, 2 ), 32, '0', STR_PAD_LEFT ),
                2,
                36,
                13
            ),
            // 20 bit fractional portion of UNIX timestamp, as integral microseconds
            str_pad( base_convert( $microseconds, 10, 36 ), 4, '0', STR_PAD_LEFT )
        ] );

        if ( strlen( $token ) !== 17 ) {
            throw new RuntimeException( "Modification timestamp overflow detected" );
        }

        return $token;
    }

    /**
     * WHERE conditions that check for existence and liveness of keys
     *
     * @param IDatabase $db
     * @param string[]|string $keys
     * @param int $time UNIX modification timestamp
     * @return array
     */
    private function buildExistenceConditions( IDatabase $db, $keys, int $time ) {
        // Note that tombstones always have past expiration dates
        return [
            'keyname' => $keys,
            $db->expr( 'exptime', '>=', $db->timestamp( $time ) )
        ];
    }

    /**
     * INSERT array for handling key writes/overwrites when no live nor stale key exists
     *
     * @param IDatabase $db
     * @param string $key
     * @param string|int $serialValue New value
     * @param int $expiry Expiration timestamp or TTL_INDEFINITE
     * @param string $mt Modification token
     * @return array
     */
    private function buildUpsertRow(
        IDatabase $db,
        $key,
        $serialValue,
        int $expiry,
        string $mt
    ) {
        $row = [
            'keyname' => $key,
            'value'   => $this->dbEncodeSerialValue( $db, $serialValue ),
            'exptime' => $this->encodeDbExpiry( $db, $expiry )
        ];
        if ( $this->multiPrimaryMode ) {
            $row['modtoken'] = $mt;
        }

        return $row;
    }

    /**
     * SET array for handling key overwrites when a live or stale key exists
     *
     * @param IDatabase $db
     * @param string $mt Modification token
     * @return array
     */
    private function buildMultiUpsertSetForOverwrite( IDatabase $db, string $mt ) {
        $expressionsByColumn = [
            'value'   => $db->buildExcludedValue( 'value' ),
            'exptime' => $db->buildExcludedValue( 'exptime' )
        ];

        $set = [];
        if ( $this->multiPrimaryMode ) {
            // The query might take a while to replicate, during which newer values might get
            // written. Qualify the query so that it does not override such values. Note that
            // duplicate tokens generated around the same time for a key should still result
            // in convergence given the use of server_id in modtoken (providing a total order
            // among primary DB servers) and MySQL binlog ordering (providing a total order
            // for writes replicating from a given primary DB server).
            $expressionsByColumn['modtoken'] = $db->addQuotes( $mt );
            foreach ( $expressionsByColumn as $column => $updateExpression ) {
                $rhs = $db->conditional(
                    $db->addQuotes( substr( $mt, 0, 13 ) ) . ' >= ' .
                        $db->buildSubString( 'modtoken', 1, 13 ),
                    $updateExpression,
                    $column
                );
                $set[] = "{$column}=" . trim( $rhs );
            }
        } else {
            foreach ( $expressionsByColumn as $column => $updateExpression ) {
                $set[] = "{$column}={$updateExpression}";
            }
        }

        return $set;
    }

    /**
     * SET array for handling key overwrites when a live or stale key exists
     *
     * @param IDatabase $db
     * @param int $step Positive counter incrementation value
     * @param int $init Positive initial counter value
     * @param int $expiry Expiration timestamp or TTL_INDEFINITE
     * @param string $mt Modification token
     * @param int $mtUnixTs UNIX timestamp of modification token
     * @return array
     */
    private function buildIncrUpsertSet(
        IDatabase $db,
        int $step,
        int $init,
        int $expiry,
        string $mt,
        int $mtUnixTs
    ) {
        // Map of (column => (SQL for non-expired key rows, SQL for expired key rows))
        $expressionsByColumn = [
            'value'   => [
                $db->buildIntegerCast( 'value' ) . " + {$db->addQuotes( $step )}",
                $db->addQuotes( $this->dbEncodeSerialValue( $db, $init ) )
            ],
            'exptime' => [
                'exptime',
                $db->addQuotes( $this->encodeDbExpiry( $db, $expiry ) )
            ]
        ];
        if ( $this->multiPrimaryMode ) {
            $expressionsByColumn['modtoken'] = [ 'modtoken', $db->addQuotes( $mt ) ];
        }

        $set = [];
        foreach ( $expressionsByColumn as $column => [ $updateExpression, $initExpression ] ) {
            $rhs = $db->conditional(
                $db->expr( 'exptime', '>=', $db->timestamp( $mtUnixTs ) ),
                $updateExpression,
                $initExpression
            );
            $set[] = "{$column}=" . trim( $rhs );
        }

        return $set;
    }

    /**
     * @param IDatabase $db
     * @param int $expiry UNIX timestamp of expiration or TTL_INDEFINITE
     * @return string
     */
    private function encodeDbExpiry( IDatabase $db, int $expiry ) {
        return ( $expiry === self::TTL_INDEFINITE )
            // Use the maximum timestamp that the column can store
            ? $db->timestamp( self::INF_TIMESTAMP_PLACEHOLDER )
            // Convert the absolute timestamp into the DB timestamp format
            : $db->timestamp( $expiry );
    }

    /**
     * @param IDatabase $db
     * @param string $dbExpiry DB timestamp of expiration
     * @return int UNIX timestamp of expiration or TTL_INDEFINITE
     */
    private function decodeDbExpiry( IDatabase $db, string $dbExpiry ) {
        return ( $dbExpiry === $db->timestamp( self::INF_TIMESTAMP_PLACEHOLDER ) )
            ? self::TTL_INDEFINITE
            : (int)ConvertibleTimestamp::convert( TS_UNIX, $dbExpiry );
    }

    /**
     * @param IDatabase $db
     * @param string|int $serialValue
     * @return string
     */
    private function dbEncodeSerialValue( IDatabase $db, $serialValue ) {
        return is_int( $serialValue ) ? (string)$serialValue : $db->encodeBlob( $serialValue );
    }

    /**
     * @param IDatabase $db
     * @param Blob|string|int $blob
     * @return string|int
     */
    private function dbDecodeSerialValue( IDatabase $db, $blob ) {
        return $this->isInteger( $blob ) ? (int)$blob : $db->decodeBlob( $blob );
    }

    /**
     * Either append a 'castoken' field or append the fields needed to compute the CAS token
     *
     * @param IDatabase $db
     * @param string[] $fields SELECT field array
     * @return string[] SELECT field array
     */
    private function addCasTokenFields( IDatabase $db, array $fields ) {
        $type = $db->getType();

        if ( $type === 'mysql' ) {
            $fields['castoken'] = $db->buildConcat( [
                'SHA1(value)',
                $db->addQuotes( '@' ),
                'exptime'
            ] );
        } elseif ( $type === 'postgres' ) {
            $fields['castoken'] = $db->buildConcat( [
                'md5(value)',
                $db->addQuotes( '@' ),
                'exptime'
            ] );
        } else {
            if ( !in_array( 'value', $fields, true ) ) {
                $fields[] = 'value';
            }
            if ( !in_array( 'exptime', $fields, true ) ) {
                $fields[] = 'exptime';
            }
        }

        return $fields;
    }

    /**
     * Get a CAS token from a SELECT result row
     *
     * @param IDatabase $db
     * @param stdClass $row A row for a key
     * @return string CAS token
     */
    private function getCasTokenFromRow( IDatabase $db, stdClass $row ) {
        if ( isset( $row->castoken ) ) {
            $token = $row->castoken;
        } else {
            $token = sha1( $this->dbDecodeSerialValue( $db, $row->value ) ) . '@' . $row->exptime;
            $this->logger->debug( __METHOD__ . ": application computed hash for CAS token" );
        }

        return $token;
    }

    /**
     * @param int $shardIndex
     * @throws DBError
     */
    private function garbageCollect( $shardIndex ) {
        // set right away, avoid queuing duplicate async callbacks
        $this->lastGarbageCollect = $this->getCurrentTime();

        $garbageCollector = function () use ( $shardIndex ) {
            $db = $this->getConnection( $shardIndex );
            /** @noinspection PhpUnusedLocalVariableInspection */
            $silenceScope = $this->silenceTransactionProfiler();
            $this->deleteServerObjectsExpiringBefore(
                $db,
                (int)$this->getCurrentTime(),
                $this->purgeLimit
            );
            $this->lastGarbageCollect = $this->getCurrentTime();
        };

        if ( $this->asyncHandler ) {
            ( $this->asyncHandler )( $garbageCollector );
        } else {
            $garbageCollector();
        }
    }

    /**
     * @deprecated since 1.41, use deleteObjectsExpiringBefore() instead
     */
    public function expireAll() {
        wfDeprecated( __METHOD__, '1.41' );
        $this->deleteObjectsExpiringBefore( (int)$this->getCurrentTime() );
    }

    public function deleteObjectsExpiringBefore(
        $timestamp,
        callable $progress = null,
        $limit = INF,
        string $tag = null
    ) {
        /** @noinspection PhpUnusedLocalVariableInspection */
        $silenceScope = $this->silenceTransactionProfiler();

        if ( $tag !== null ) {
            // Purge one server only, to support concurrent purging in large wiki farms (T282761).
            $shardIndexes = [];
            if ( !$this->serverTags ) {
                throw new InvalidArgumentException( "Given a tag but no tags are configured" );
            }
            foreach ( $this->serverTags as $serverShardIndex => $serverTag ) {
                if ( $tag === $serverTag ) {
                    $shardIndexes[] = $serverShardIndex;
                    break;
                }
            }
            if ( !$shardIndexes ) {
                throw new InvalidArgumentException( "Unknown server tag: $tag" );
            }
        } else {
            $shardIndexes = $this->getShardServerIndexes();
            shuffle( $shardIndexes );
        }

        $ok = true;
        $numServers = count( $shardIndexes );

        $keysDeletedCount = 0;
        foreach ( $shardIndexes as $numServersDone => $shardIndex ) {
            try {
                $db = $this->getConnection( $shardIndex );

                // Avoid deadlock (T330377)
                $lockKey = "SqlBagOStuff-purge-shard:$shardIndex";
                if ( !$db->lock( $lockKey, __METHOD__, 0 ) ) {
                    $this->logger->info( "SqlBagOStuff purge for shard $shardIndex already locked, skip" );
                    continue;
                }

                $this->deleteServerObjectsExpiringBefore(
                    $db,
                    $timestamp,
                    $limit,
                    $keysDeletedCount,
                    [ 'fn' => $progress, 'serversDone' => $numServersDone, 'serversTotal' => $numServers ]
                );
                $db->unlock( $lockKey, __METHOD__ );
            } catch ( DBError $e ) {
                $this->handleDBError( $e, $shardIndex );
                $ok = false;
            }
        }

        return $ok;
    }

    /**
     * @param IDatabase $db
     * @param string|int $timestamp
     * @param int|float $limit Maximum number of rows to delete in total or INF for no limit
     * @param int &$keysDeletedCount
     * @param array|null $progress
     * @phan-param array{fn:?callback,serversDone:int,serversTotal:int}|null $progress
     * @throws DBError
     */
    private function deleteServerObjectsExpiringBefore(
        IDatabase $db,
        $timestamp,
        $limit,
        &$keysDeletedCount = 0,
        array $progress = null
    ) {
        $cutoffUnix = ConvertibleTimestamp::convert( TS_UNIX, $timestamp );
        if ( $this->multiPrimaryMode ) {
            // Eventual consistency requires the preservation of any key that was recently
            // modified. The key must exist on this database server long enough for the server
            // to receive, via replication, all writes to the key with lower timestamps. Such
            // writes should be no-ops since the existing key value should "win". If the network
            // partitions between datacenters A and B for 30 minutes, the database servers in
            // each datacenter will see an initial burst of writes with "old" timestamps via
            // replication. This might include writes with lower timestamps that the existing
            // key value. Therefore, clock skew and replication delay are both factors.
            $cutoffUnixSafe = (int)$this->getCurrentTime() - self::SAFE_PURGE_DELAY_SEC;
            $cutoffUnix = min( $cutoffUnix, $cutoffUnixSafe );
        }
        $tableIndexes = range( 0, $this->numTableShards - 1 );
        shuffle( $tableIndexes );

        $batchSize = min( $this->writeBatchSize, $limit );

        foreach ( $tableIndexes as $numShardsDone => $tableIndex ) {
            // The oldest expiry of a row we have deleted on this shard
            // (the first row that we deleted)
            $minExpUnix = null;
            // The most recent expiry time so far, from a row we have deleted on this shard
            $maxExp = null;
            // Size of the time range we'll delete, in seconds (for progress estimate)
            $totalSeconds = null;

            do {
                $res = $db->newSelectQueryBuilder()
                    ->select( [ 'keyname', 'exptime' ] )
                    ->from( $this->getTableNameByShard( $tableIndex ) )
                    ->where( $db->expr( 'exptime', '<', $db->timestamp( $cutoffUnix ) ) )
                    ->andWhere( $maxExp ? $db->expr( 'exptime', '>=', $maxExp ) : [] )
                    ->orderBy( 'exptime', SelectQueryBuilder::SORT_ASC )
                    ->limit( $batchSize )
                    ->caller( __METHOD__ )
                    ->fetchResultSet();

                if ( $res->numRows() ) {
                    $row = $res->current();
                    if ( $minExpUnix === null ) {
                        $minExpUnix = (int)ConvertibleTimestamp::convert( TS_UNIX, $row->exptime );
                        $totalSeconds = max( $cutoffUnix - $minExpUnix, 1 );
                    }

                    $keys = [];
                    foreach ( $res as $row ) {
                        $keys[] = $row->keyname;
                        $maxExp = $row->exptime;
                    }

                    $db->newDeleteQueryBuilder()
                        ->deleteFrom( $this->getTableNameByShard( $tableIndex ) )
                        ->where( [
                            'keyname' => $keys,
                            $db->expr( 'exptime', '<', $db->timestamp( $cutoffUnix ) ),
                        ] )
                        ->caller( __METHOD__ )->execute();
                    $keysDeletedCount += $db->affectedRows();
                }

                if ( $progress && is_callable( $progress['fn'] ) ) {
                    if ( $totalSeconds ) {
                        $maxExpUnix = (int)ConvertibleTimestamp::convert( TS_UNIX, $maxExp );
                        $remainingSeconds = $cutoffUnix - $maxExpUnix;
                        $processedSeconds = max( $totalSeconds - $remainingSeconds, 0 );
                        // For example, if we've done 1.5 table shard, and are thus half-way on the
                        // 2nd of perhaps 5 tables on this server, then this might be:
                        // `( 1 + ( 43200 / 86400 ) ) / 5 = 0.3`, or 30% done, of tables on this server.
                        $tablesDoneRatio =
                            ( $numShardsDone + ( $processedSeconds / $totalSeconds ) ) / $this->numTableShards;
                    } else {
                        $tablesDoneRatio = 1;
                    }

                    // For example, if we're 30% done on the last of 10 servers, then this might be:
                    // `( 9 / 10 ) + ( 0.3 / 10 ) = 0.93`, or 93% done, overall.
                    $overallRatio = ( $progress['serversDone'] / $progress['serversTotal'] ) +
                        ( $tablesDoneRatio / $progress['serversTotal'] );
                    ( $progress['fn'] )( $overallRatio * 100 );
                }
            } while ( $res->numRows() && $keysDeletedCount < $limit );
        }
    }

    /**
     * Delete content of shard tables in every server.
     * Return true if the operation is successful, false otherwise.
     *
     * @deprecated since 1.41, unused.
     *
     * @return bool
     */
    public function deleteAll() {
        wfDeprecated( __METHOD__, '1.41' );
        /** @noinspection PhpUnusedLocalVariableInspection */
        $silenceScope = $this->silenceTransactionProfiler();
        foreach ( $this->getShardServerIndexes() as $shardIndex ) {
            try {
                $db = $this->getConnection( $shardIndex );
                for ( $i = 0; $i < $this->numTableShards; $i++ ) {
                    $db->newDeleteQueryBuilder()
                        ->deleteFrom( $this->getTableNameByShard( $i ) )
                        ->where( $db::ALL_ROWS )
                        ->caller( __METHOD__ )->execute();
                }
            } catch ( DBError $e ) {
                $this->handleDBError( $e, $shardIndex );
                return false;
            }
        }
        return true;
    }

    public function doLock( $key, $timeout = 6, $exptime = 6 ) {
        /** @noinspection PhpUnusedLocalVariableInspection */
        $silenceScope = $this->silenceTransactionProfiler();

        $lockTsUnix = null;

        [ $shardIndex ] = $this->getKeyLocation( $key );
        try {
            $db = $this->getConnection( $shardIndex );
            $lockTsUnix = $db->lock( $key, __METHOD__, $timeout, $db::LOCK_TIMESTAMP );
        } catch ( DBError $e ) {
            $this->handleDBError( $e, $shardIndex );
            $this->logger->warning(
                __METHOD__ . ' failed due to I/O error for {key}.',
                [ 'key' => $key ]
            );
        }

        return $lockTsUnix;
    }

    public function doUnlock( $key ) {
        /** @noinspection PhpUnusedLocalVariableInspection */
        $silenceScope = $this->silenceTransactionProfiler();

        [ $shardIndex ] = $this->getKeyLocation( $key );

        try {
            $db = $this->getConnection( $shardIndex );
            $released = $db->unlock( $key, __METHOD__ );
        } catch ( DBError $e ) {
            $this->handleDBError( $e, $shardIndex );
            $released = false;
        }

        return $released;
    }

    protected function makeKeyInternal( $keyspace, $components ) {
        // SQL schema for 'objectcache' specifies keys as varchar(255). From that,
        // subtract the number of characters we need for the keyspace and for
        // the separator character needed for each argument. To handle some
        // custom prefixes used by thing like WANObjectCache, limit to 205.
        $keyspace = strtr( $keyspace, ' ', '_' );
        $charsLeft = 205 - strlen( $keyspace ) - count( $components );
        foreach ( $components as &$component ) {
            $component = strtr( $component, [
                ' ' => '_', // Avoid unnecessary misses from pre-1.35 code
                ':' => '%3A',
            ] );

            // 33 = 32 characters for the MD5 + 1 for the '#' prefix.
            if ( $charsLeft > 33 && strlen( $component ) > $charsLeft ) {
                $component = '#' . md5( $component );
            }
            $charsLeft -= strlen( $component );
        }

        if ( $charsLeft < 0 ) {
            return $keyspace . ':BagOStuff-long-key:##' . md5( implode( ':', $components ) );
        }
        return $keyspace . ':' . implode( ':', $components );
    }

    protected function requireConvertGenericKey(): bool {
        return true;
    }

    protected function serialize( $value ) {
        if ( is_int( $value ) ) {
            return $value;
        }

        $serial = serialize( $value );
        if ( $this->hasZlib ) {
            // On typical message and page data, this can provide a 3X storage savings
            $serial = gzdeflate( $serial );
        }

        return $serial;
    }

    protected function unserialize( $value ) {
        if ( $value === self::TOMB_SERIAL ) {
            return false; // tombstone
        }

        if ( $this->isInteger( $value ) ) {
            return (int)$value;
        }

        if ( $this->hasZlib ) {
            AtEase::suppressWarnings();
            $decompressed = gzinflate( $value );
            AtEase::restoreWarnings();

            if ( $decompressed !== false ) {
                $value = $decompressed;
            }
        }

        return unserialize( $value );
    }

    private function getLoadBalancer(): ILoadBalancer {
        if ( !$this->loadBalancer ) {
            $this->loadBalancer = ( $this->loadBalancerCallback )();
        }
        return $this->loadBalancer;
    }

    /**
     * @return IMaintainableDatabase
     * @throws DBError
     */
    private function getConnectionViaLoadBalancer() {
        $lb = $this->getLoadBalancer();

        if ( $lb->getServerAttributes( $lb->getWriterIndex() )[Database::ATTR_DB_LEVEL_LOCKING] ) {
            // Use the main connection to avoid transaction deadlocks
            $conn = $lb->getMaintenanceConnectionRef( DB_PRIMARY, [], $this->dbDomain );
        } else {
            // If the RDBMS has row/table/page level locking, then use separate auto-commit
            // connection to avoid needless contention and deadlocks.
            $conn = $lb->getMaintenanceConnectionRef(
                $this->replicaOnly ? DB_REPLICA : DB_PRIMARY,
                [],
                $this->dbDomain,
                $lb::CONN_TRX_AUTOCOMMIT
            );
        }

        // Make sure any errors are thrown now while we can more easily handle them
        $conn->ensureConnection();
        return $conn;
    }

    /**
     * @param int $shardIndex
     * @param array $server Server config map
     * @return IMaintainableDatabase
     * @throws DBError
     */
    private function getConnectionFromServerInfo( $shardIndex, array $server ) {
        if ( !isset( $this->conns[$shardIndex] ) ) {
            $server['logger'] = $this->logger;
            // Make sure this handle always uses autocommit mode, even if DBO_TRX is
            // configured.
            $server['flags'] &= ~DBO_TRX;

            /** @var IMaintainableDatabase $conn Auto-commit connection to the server */
            $conn = MediaWikiServices::getInstance()->getDatabaseFactory()
                ->create( $server['type'], $server );

            // Automatically create the objectcache table for sqlite as needed
            if ( $conn->getType() === 'sqlite' ) {
                $this->initSqliteDatabase( $conn );
            }
            $this->conns[$shardIndex] = $conn;
        }

        // @phan-suppress-next-line PhanTypeMismatchReturnNullable False positive
        return $this->conns[$shardIndex];
    }

    /**
     * Handle a DBError which occurred during a read operation.
     *
     * @param DBError $exception
     * @param int $shardIndex Server index
     */
    private function handleDBError( DBError $exception, $shardIndex ) {
        if ( !$this->useLB && $exception instanceof DBConnectionError ) {
            unset( $this->conns[$shardIndex] ); // bug T103435

            $now = $this->getCurrentTime();
            if ( isset( $this->connFailureTimes[$shardIndex] ) ) {
                if ( $now - $this->connFailureTimes[$shardIndex] >= 60 ) {
                    unset( $this->connFailureTimes[$shardIndex] );
                    unset( $this->connFailureErrors[$shardIndex] );
                } else {
                    $this->logger->debug( __METHOD__ . ": Server #$shardIndex already down" );
                    return;
                }
            }
            $this->logger->info( __METHOD__ . ": Server #$shardIndex down until " . ( $now + 60 ) );
            $this->connFailureTimes[$shardIndex] = $now;
            $this->connFailureErrors[$shardIndex] = $exception;
        }
        $this->logger->error( "DBError: {$exception->getMessage()}", [ 'exception' => $exception ] );
        if ( $exception instanceof DBConnectionError ) {
            $this->setLastError( self::ERR_UNREACHABLE );
            $this->logger->warning( __METHOD__ . ": ignoring connection error" );
        } else {
            $this->setLastError( self::ERR_UNEXPECTED );
            $this->logger->warning( __METHOD__ . ": ignoring query error" );
        }
    }

    /**
     * @param IMaintainableDatabase $db
     * @throws DBError
     */
    private function initSqliteDatabase( IMaintainableDatabase $db ) {
        if ( $db->tableExists( 'objectcache', __METHOD__ ) ) {
            return;
        }
        // Use one table for SQLite; sharding does not seem to have much benefit
        $db->query( "PRAGMA journal_mode=WAL", __METHOD__ ); // this is permanent
        $db->startAtomic( __METHOD__ ); // atomic DDL
        try {
            $encTable = $db->tableName( 'objectcache' );
            $encExptimeIndex = $db->addIdentifierQuotes( $db->tablePrefix() . 'exptime' );
            $db->query(
                "CREATE TABLE $encTable (\n" .
                "    keyname BLOB NOT NULL default '' PRIMARY KEY,\n" .
                "    value BLOB,\n" .
                "    exptime BLOB NOT NULL\n" .
                ")",
                __METHOD__
            );
            $db->query( "CREATE INDEX $encExptimeIndex ON $encTable (exptime)", __METHOD__ );
            $db->endAtomic( __METHOD__ );
        } catch ( DBError $e ) {
            $db->rollback( __METHOD__ );
            throw $e;
        }
    }

    /**
     * Create the shard tables on all databases
     *
     * This is typically called manually by a sysadmin via eval.php, e.g. for ParserCache:
     *
     * @code
     *     ObjectCache::getInstance( 'myparsercache' )->createTables();
     * @endcode
     *
     * This is different from `$services->getParserCache()->getCacheStorage()->createTables()`,
     * which would use the backend set via $wgParserCacheType, which shouldn't be
     * set yet for the backend you are creating shard tables on. The expectation
     * is to first add the new backend to $wgObjectCaches, run the above, and then enable
     * it for live ParserCache traffic by setting $wgParserCacheType.
     */
    public function createTables() {
        foreach ( $this->getShardServerIndexes() as $shardIndex ) {
            $db = $this->getConnection( $shardIndex );
            if ( in_array( $db->getType(), [ 'mysql', 'postgres' ], true ) ) {
                for ( $i = 0; $i < $this->numTableShards; $i++ ) {
                    $encBaseTable = $db->tableName( 'objectcache' );
                    $encShardTable = $db->tableName( $this->getTableNameByShard( $i ) );
                    $db->query( "CREATE TABLE $encShardTable LIKE $encBaseTable", __METHOD__ );
                }
            }
        }
    }

    /**
     * @return int[] List of server indexes
     */
    private function getShardServerIndexes() {
        if ( $this->useLB ) {
            // LoadBalancer based configuration
            $shardIndexes = [ 0 ];
        } else {
            // Striped array of database servers
            $shardIndexes = array_keys( $this->serverTags );
        }

        return $shardIndexes;
    }

    /**
     * Silence the transaction profiler until the return value falls out of scope
     *
     * @return ScopedCallback|null
     */
    private function silenceTransactionProfiler() {
        if ( $this->serverInfos ) {
            return null; // no TransactionProfiler injected anyway
        }
        return Profiler::instance()->getTransactionProfiler()->silenceForScope();
    }
}