includes/libs/rdbms/loadmonitor/LoadMonitor.php
<?php
/**
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 2 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License along
* with this program; if not, write to the Free Software Foundation, Inc.,
* 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
* http://www.gnu.org/copyleft/gpl.html
*
* @file
*/
namespace Wikimedia\Rdbms;
use Liuggio\StatsdClient\Factory\StatsdDataFactoryInterface;
use NullStatsdDataFactory;
use Psr\Log\LoggerInterface;
use Psr\Log\NullLogger;
use RuntimeException;
use WANObjectCache;
use Wikimedia\ObjectCache\BagOStuff;
use Wikimedia\ObjectCache\IStoreKeyEncoder;
use Wikimedia\Rdbms\Platform\ISQLPlatform;
use Wikimedia\ScopedCallback;
/**
* Basic DB load monitor with no external dependencies
*
* This uses both local server and local datacenter caches for DB server state information.
*
*
* @ingroup Database
*/
class LoadMonitor implements ILoadMonitor {
/** @var ILoadBalancer */
protected $lb;
/** @var BagOStuff */
protected $srvCache;
/** @var WANObjectCache */
protected $wanCache;
/** @var LoggerInterface */
protected $logger;
/** @var StatsdDataFactoryInterface */
protected $statsd;
/** @var int|float */
private $maxConnCount;
private int $totalConnectionsAdjustment;
/** @var float|null */
private $wallClockOverride;
/** @var bool Whether the "server states" cache key is in the process of being updated */
private $serverStatesKeyLocked = false;
/** Cache key version */
private const VERSION = 3;
/** Target time-till-refresh for DB server states */
private const STATE_TARGET_TTL = 10;
/** Seconds to persist DB server states on cache (fresh or stale) */
private const STATE_PRESERVE_TTL = 60;
/**
* @inheritDoc
*/
public function __construct( ILoadBalancer $lb, BagOStuff $srvCache, WANObjectCache $wCache, $options ) {
$this->lb = $lb;
$this->srvCache = $srvCache;
$this->wanCache = $wCache;
$this->logger = new NullLogger();
$this->statsd = new NullStatsdDataFactory();
if ( isset( $options['maxConnCount'] ) ) {
$this->maxConnCount = (int)( $options['maxConnCount'] );
} else {
$this->maxConnCount = INF;
}
$this->totalConnectionsAdjustment = (int)( $options['totalConnectionsAdjustment'] ?? 10 );
}
public function setLogger( LoggerInterface $logger ) {
$this->logger = $logger;
}
public function setStatsdDataFactory( StatsdDataFactoryInterface $statsFactory ) {
$this->statsd = $statsFactory;
}
public function scaleLoads( array &$weightByServer ) {
if ( count( $weightByServer ) <= 1 ) {
// Single-server group; relative adjustments are pointless
return;
}
$serverIndexes = array_keys( $weightByServer );
$stateByServerIndex = $this->getServerStates( $serverIndexes );
$totalConnections = 0;
$totalWeights = 0;
$circuitBreakingEnabled = true;
foreach ( $weightByServer as $i => $weight ) {
$serverState = $stateByServerIndex[$i];
$totalConnections += (int)$serverState[self::STATE_CONN_COUNT] * $serverState[self::STATE_UP];
$totalWeights += $weight;
// Set up circuit breaking. If at least one replica can take more connections
// allow the flow.
if (
$serverState[self::STATE_UP] &&
$weight > 0 &&
$serverState[self::STATE_CONN_COUNT] < $this->maxConnCount
) {
$circuitBreakingEnabled = false;
}
}
if ( $circuitBreakingEnabled ) {
throw new DBUnexpectedError(
null, 'Database servers in ' . $this->lb->getClusterName() . ' are overloaded. ' .
'In order to protect application servers, the circuit breaking to databases of this section ' .
'have been activated. Please try again a few seconds.'
);
}
foreach ( $weightByServer as $i => $weight ) {
if (
// host is down or
!$stateByServerIndex[$i][self::STATE_UP] ||
// host is primary or explicitly set to zero
$weight <= 0
) {
$weightByServer[$i] = 0;
continue;
}
$connRatio = $stateByServerIndex[$i][self::STATE_CONN_COUNT] /
( $totalConnections + $this->totalConnectionsAdjustment );
$weightRatio = $weight / $totalWeights;
$diffRatio = $connRatio - $weightRatio;
$adjustedRatio = max( $weightRatio - ( $diffRatio / 2.0 ), 0 );
$weightByServer[$i] = (int)round( $totalWeights * $adjustedRatio );
}
}
protected function getServerStates( array $serverIndexes ): array {
$stateByServerIndex = array_fill_keys( $serverIndexes, null );
// Perform any cache regenerations in randomized order so that the
// DB servers will each have similarly up-to-date state cache entries.
$shuffledServerIndexes = $serverIndexes;
shuffle( $shuffledServerIndexes );
foreach ( $shuffledServerIndexes as $i ) {
$key = $this->makeStateKey( $this->srvCache, $i );
$state = $this->srvCache->get( $key ) ?: null;
if ( $this->isStateFresh( $state ) ) {
$this->logger->debug(
__METHOD__ . ": fresh local cache hit for '{db_server}'",
[ 'db_server' => $this->lb->getServerName( $i ) ]
);
} else {
$lock = $this->srvCache->getScopedLock( $key, 0, 10 );
if ( $lock || !$state ) {
$state = $this->getStateFromWanCache( $i, $state );
$this->srvCache->set( $key, $state, self::STATE_PRESERVE_TTL );
}
}
$stateByServerIndex[$i] = $state;
}
return $stateByServerIndex;
}
protected function getStateFromWanCache( $i, ?array $srvPrevState ) {
$hit = true;
$key = $this->makeStateKey( $this->wanCache, $i );
$state = $this->wanCache->getWithSetCallback(
$key,
self::STATE_PRESERVE_TTL,
function ( $wanPrevState ) use ( $srvPrevState, $i, &$hit ) {
$prevState = $wanPrevState ?: $srvPrevState ?: null;
$hit = false;
return $this->computeServerState( $i, $prevState );
},
[ 'lockTSE' => 30 ]
);
if ( $hit ) {
$this->logger->debug(
__METHOD__ . ": WAN cache hit for '{db_server}'",
[ 'db_server' => $this->lb->getServerName( $i ) ]
);
} else {
$this->logger->info(
__METHOD__ . ": mutex acquired; regenerated cache for '{db_server}'",
[ 'db_server' => $this->lb->getServerName( $i ) ]
);
}
return $state;
}
protected function makeStateKey( IStoreKeyEncoder $cache, int $i ) {
return $cache->makeGlobalKey(
'rdbms-gauge',
self::VERSION,
$this->lb->getClusterName(),
$this->lb->getServerName( ServerInfo::WRITER_INDEX ),
$this->lb->getServerName( $i )
);
}
/**
* @param int $i
* @param array|null $previousState
* @return array<string,mixed>
* @phan-return array{up:float,conn_count:float|int|false,time:float}
* @throws DBAccessError
*/
protected function computeServerState( int $i, ?array $previousState ) {
$startTime = $this->getCurrentTime();
// Double check for circular recursion in computeServerStates()/getWeightScale().
// Mainly, connection attempts should use LoadBalancer::getServerConnection()
// rather than something that will pick a server based on the server states.
$this->acquireServerStatesLoopGuard();
$cluster = $this->lb->getClusterName();
$serverName = $this->lb->getServerName( $i );
$statServerName = str_replace( '.', '_', $serverName );
$newState = $this->newInitialServerState();
if ( $this->lb->getServerInfo( $i )['load'] <= 0 ) {
// Callers only use this server when they have *no choice* anyway (e.g. primary)
$newState[self::STATE_AS_OF] = $this->getCurrentTime();
// Avoid connecting, especially since it might reside in a remote datacenter
return $newState;
}
// Get a new, untracked, connection in order to gauge server health
$flags = ILoadBalancer::CONN_UNTRACKED_GAUGE | ILoadBalancer::CONN_SILENCE_ERRORS;
// Get a connection to this server without triggering other server connections
$conn = $this->lb->getServerConnection( $i, ILoadBalancer::DOMAIN_ANY, $flags );
// Determine the number of open connections in this server
if ( $conn ) {
try {
$connCount = $this->getConnCountForDb( $conn );
} catch ( DBError $e ) {
$connCount = false;
}
} else {
$connCount = false;
}
// Only keep one connection open at a time
if ( $conn ) {
$conn->close( __METHOD__ );
}
$endTime = $this->getCurrentTime();
$newState[self::STATE_AS_OF] = $endTime;
$newState[self::STATE_CONN_COUNT] = $connCount;
$newState[self::STATE_UP] = $conn ? 1.0 : 0.0;
if ( $previousState ) {
$newState[self::STATE_CONN_COUNT] = ( $previousState[self::STATE_CONN_COUNT] + $connCount ) / 2;
}
if ( $connCount === false ) {
$this->logger->error(
__METHOD__ . ": host {db_server} is not up?",
[ 'db_server' => $serverName ]
);
} else {
$this->statsd->timing( "loadbalancer.connCount.$cluster.$statServerName", $connCount );
if ( $connCount > $this->maxConnCount ) {
$this->logger->warning(
"Server {db_server} has {conn_count} open connections (>= {max_conn})",
[
'db_server' => $serverName,
'conn_count' => $connCount,
'max_conn' => $this->maxConnCount
]
);
}
}
return $newState;
}
/**
* @return array<string,mixed>
* @phan-return array{up:float,conn_count:int|int|false,time:float}
*/
protected function newInitialServerState() {
return [
// Moving average of connectivity; treat as good
self::STATE_UP => 1.0,
// Number of connections to that replica; treat as none
self::STATE_CONN_COUNT => 0,
// UNIX timestamp of state generation completion; treat as "outdated"
self::STATE_AS_OF => 0.0,
];
}
/**
* @param array|null $state
* @return bool
*/
private function isStateFresh( $state ) {
if ( !$state ) {
return false;
}
return $this->getCurrentTime() - $state[self::STATE_AS_OF] > self::STATE_TARGET_TTL;
}
/**
* @return ScopedCallback
*/
private function acquireServerStatesLoopGuard() {
if ( $this->serverStatesKeyLocked ) {
throw new RuntimeException(
"Circular recursion detected while regenerating server states cache. " .
"This may indicate improper connection handling in " . get_class( $this )
);
}
$this->serverStatesKeyLocked = true; // lock
return new ScopedCallback( function () {
$this->serverStatesKeyLocked = false; // unlock
} );
}
/**
* @return float UNIX timestamp
* @codeCoverageIgnore
*/
protected function getCurrentTime() {
return $this->wallClockOverride ?: microtime( true );
}
/**
* @param float|null &$time Mock UNIX timestamp for testing
* @codeCoverageIgnore
*/
public function setMockTime( &$time ) {
$this->wallClockOverride =& $time;
}
private function getConnCountForDb( IDatabase $conn ): int {
if ( $conn->getType() !== 'mysql' ) {
return 0;
}
$query = new Query(
'SELECT COUNT(*) AS pcount FROM INFORMATION_SCHEMA.PROCESSLIST',
ISQLPlatform::QUERY_SILENCE_ERRORS | ISQLPlatform::QUERY_IGNORE_DBO_TRX | ISQLPlatform::QUERY_CHANGE_NONE,
'SELECT',
null,
'SELECT COUNT(*) AS pcount FROM INFORMATION_SCHEMA.PROCESSLIST'
);
$res = $conn->query( $query, __METHOD__ );
$row = $res ? $res->fetchObject() : false;
return $row ? (int)$row->pcount : 0;
}
}