wikimedia/mediawiki-core

View on GitHub
includes/libs/rdbms/database/replication/MysqlReplicationReporter.php

Summary

Maintainability
D
2 days
Test Coverage
<?php
/**
 * This program is free software; you can redistribute it and/or modify
 * it under the terms of the GNU General Public License as published by
 * the Free Software Foundation; either version 2 of the License, or
 * (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
 * GNU General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License along
 * with this program; if not, write to the Free Software Foundation, Inc.,
 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
 * http://www.gnu.org/copyleft/gpl.html
 *
 * @file
 */
namespace Wikimedia\Rdbms\Replication;

use InvalidArgumentException;
use RuntimeException;
use stdClass;
use Wikimedia\Rdbms\DBPrimaryPos;
use Wikimedia\Rdbms\DBQueryError;
use Wikimedia\Rdbms\IDatabase;
use Wikimedia\Rdbms\MySQLPrimaryPos;
use Wikimedia\Rdbms\Platform\ISQLPlatform;
use Wikimedia\Rdbms\Query;

/**
 * @internal
 * @ingroup Database
 * @since 1.40
 */
class MysqlReplicationReporter extends ReplicationReporter {
    /** @var MySQLPrimaryPos */
    protected $lastKnownReplicaPos;
    /** @var string Method to detect replica DB lag */
    protected $lagDetectionMethod;
    /** @var array Method to detect replica DB lag */
    protected $lagDetectionOptions = [];
    /** @var bool bool Whether to use GTID methods */
    protected $useGTIDs = false;
    /** @var stdClass|null */
    private $replicationInfoRow;
    // Cache getServerId() for 24 hours
    private const SERVER_ID_CACHE_TTL = 86400;

    /** @var float Warn if lag estimates are made for transactions older than this many seconds */
    private const LAG_STALE_WARN_THRESHOLD = 0.100;

    public function __construct(
        $topologyRole,
        $logger,
        $srvCache,
        $lagDetectionMethod,
        $lagDetectionOptions,
        $useGTIDs
    ) {
        parent::__construct( $topologyRole, $logger, $srvCache );
        $this->lagDetectionMethod = $lagDetectionMethod;
        $this->lagDetectionOptions = $lagDetectionOptions;
        $this->useGTIDs = $useGTIDs;
    }

    protected function doGetLag( IDatabase $conn ) {
        if ( $this->lagDetectionMethod === 'pt-heartbeat' ) {
            return $this->getLagFromPtHeartbeat( $conn );
        } else {
            return $this->getLagFromSlaveStatus( $conn );
        }
    }

    /**
     * @param IDatabase $conn To make queries
     * @return int|false Second of lag
     */
    protected function getLagFromSlaveStatus( IDatabase $conn ) {
        $query = new Query(
            'SHOW SLAVE STATUS',
            ISQLPlatform::QUERY_SILENCE_ERRORS | ISQLPlatform::QUERY_IGNORE_DBO_TRX | ISQLPlatform::QUERY_CHANGE_NONE,
            'SHOW',
            null,
            'SHOW SLAVE STATUS'
        );
        $res = $conn->query( $query );
        $row = $res ? $res->fetchObject() : false;
        // If the server is not replicating, there will be no row
        if ( $row && strval( $row->Seconds_Behind_Master ) !== '' ) {
            // https://mariadb.com/kb/en/delayed-replication/
            // https://dev.mysql.com/doc/refman/5.6/en/replication-delayed.html
            return intval( $row->Seconds_Behind_Master + ( $row->SQL_Remaining_Delay ?? 0 ) );
        }

        return false;
    }

    /**
     * @param IDatabase $conn To make queries
     * @return float|false Seconds of lag
     */
    protected function getLagFromPtHeartbeat( IDatabase $conn ) {
        $currentTrxInfo = $this->getRecordedTransactionLagStatus( $conn );
        if ( $currentTrxInfo ) {
            // There is an active transaction and the initial lag was already queried
            $staleness = microtime( true ) - $currentTrxInfo['since'];
            if ( $staleness > self::LAG_STALE_WARN_THRESHOLD ) {
                // Avoid returning higher and higher lag value due to snapshot age
                // given that the isolation level will typically be REPEATABLE-READ
                // but UTC_TIMESTAMP() is not affected by point-in-time snapshots
                $this->logger->warning(
                    "Using cached lag value for {db_server} due to active transaction",
                    $this->getLogContext( $conn, [
                        'method' => __METHOD__,
                        'age' => $staleness,
                        'exception' => new RuntimeException()
                    ] )
                );
            }

            return $currentTrxInfo['lag'];
        }

        $ago = $this->fetchSecondsSinceHeartbeat( $conn );
        if ( $ago !== null ) {
            return max( $ago, 0.0 );
        }

        $this->logger->error(
            "Unable to find pt-heartbeat row for {db_server}",
            $this->getLogContext( $conn, [
                'method' => __METHOD__
            ] )
        );

        return false;
    }

    /**
     * @param IDatabase $conn To make queries
     * @return float|null Elapsed seconds since the newest beat or null if none was found
     * @see https://www.percona.com/doc/percona-toolkit/2.1/pt-heartbeat.html
     */
    protected function fetchSecondsSinceHeartbeat( IDatabase $conn ) {
        // Some setups might have pt-heartbeat running on each replica server.
        // Exclude the row for events originating on this DB server. Assume that
        // there is only one active replication channel and that any other row
        // getting updates must be the row for the primary DB server.
        $where = $conn->makeList(
            $this->lagDetectionOptions['conds'] ?? [ 'server_id != @@server_id' ],
            ISQLPlatform::LIST_AND
        );
        // User mysql server time so that query time and trip time are not counted.
        // Use ORDER BY for channel based queries since that field might not be UNIQUE.
        $query = new Query(
            "SELECT TIMESTAMPDIFF(MICROSECOND,ts,UTC_TIMESTAMP(6)) AS us_ago " .
            "FROM heartbeat.heartbeat WHERE $where ORDER BY ts DESC LIMIT 1",
            ISQLPlatform::QUERY_SILENCE_ERRORS | ISQLPlatform::QUERY_IGNORE_DBO_TRX | ISQLPlatform::QUERY_CHANGE_NONE,
            'SELECT',
            null,
            "SELECT TIMESTAMPDIFF(MICROSECOND,ts,UTC_TIMESTAMP(6)) AS us_ago " .
            "FROM heartbeat.heartbeat WHERE ? ORDER BY ts DESC LIMIT 1",
        );
        $res = $conn->query( $query );
        $row = $res ? $res->fetchObject() : false;

        return $row ? ( $row->us_ago / 1e6 ) : null;
    }

    public function getApproximateLagStatus( IDatabase $conn ) {
        if ( $this->lagDetectionMethod === 'pt-heartbeat' ) {
            // Disable caching since this is fast enough and we don't want
            // to be *too* pessimistic by having both the cache TTL and the
            // pt-heartbeat interval count as lag in getSessionLagStatus()
            return parent::getApproximateLagStatus( $conn );
        }

        $key = $this->srvCache->makeGlobalKey( 'mysql-lag', $conn->getServerName() );
        $approxLag = $this->srvCache->get( $key );
        if ( !$approxLag ) {
            $approxLag = parent::getApproximateLagStatus( $conn );
            $this->srvCache->set( $key, $approxLag, 1 );
        }

        return $approxLag;
    }

    /**
     * @param IDatabase $conn To make queries
     * @param string $fname
     * @return stdClass Process cached row
     */
    public function getReplicationSafetyInfo( IDatabase $conn, $fname = __METHOD__ ) {
        if ( $this->replicationInfoRow === null ) {
            $this->replicationInfoRow = $conn->selectRow(
                [],
                [
                    'innodb_autoinc_lock_mode' => '@@innodb_autoinc_lock_mode',
                    'binlog_format' => '@@binlog_format',
                ],
                [],
                $fname
            );
        }
        return $this->replicationInfoRow;
    }

    /**
     * @return bool Whether GTID support is used (mockable for testing)
     */
    protected function useGTIDs() {
        return $this->useGTIDs;
    }

    public function primaryPosWait( IDatabase $conn, DBPrimaryPos $pos, $timeout ) {
        if ( !( $pos instanceof MySQLPrimaryPos ) ) {
            throw new InvalidArgumentException( "Position not an instance of MySQLPrimaryPos" );
        }

        if ( $this->topologyRole === IDatabase::ROLE_STATIC_CLONE ) {
            $this->logger->debug(
                "Bypassed replication wait; database has a static dataset",
                $this->getLogContext( $conn, [ 'method' => __METHOD__, 'raw_pos' => $pos ] )
            );

            return 0; // this is a copy of a read-only dataset with no primary DB
        } elseif ( $this->lastKnownReplicaPos && $this->lastKnownReplicaPos->hasReached( $pos ) ) {
            $this->logger->debug(
                "Bypassed replication wait; replication known to have reached {raw_pos}",
                $this->getLogContext( $conn, [ 'method' => __METHOD__, 'raw_pos' => $pos ] )
            );

            return 0; // already reached this point for sure
        }

        // Call doQuery() directly, to avoid opening a transaction if DBO_TRX is set
        if ( $pos->getGTIDs() ) {
            // Get the GTIDs from this replica server too see the domains (channels)
            $refPos = $this->getReplicaPos( $conn );
            if ( !$refPos ) {
                $this->logger->error(
                    "Could not get replication position on replica DB to compare to {raw_pos}",
                    $this->getLogContext( $conn, [ 'method' => __METHOD__, 'raw_pos' => $pos ] )
                );

                return -1; // this is the primary DB itself?
            }
            // GTIDs with domains (channels) that are active and are present on the replica
            $gtidsWait = $pos::getRelevantActiveGTIDs( $pos, $refPos );
            if ( !$gtidsWait ) {
                $this->logger->error(
                    "No active GTIDs in {raw_pos} share a domain with those in {current_pos}",
                    $this->getLogContext( $conn, [
                        'method' => __METHOD__,
                        'raw_pos' => $pos,
                        'current_pos' => $refPos
                    ] )
                );

                return -1; // $pos is from the wrong cluster?
            }
            // Wait on the GTID set
            $gtidArg = $conn->addQuotes( implode( ',', $gtidsWait ) );
            if ( strpos( $gtidArg, ':' ) !== false ) {
                // MySQL GTIDs, e.g "source_id:transaction_id"
                $query = new Query(
                    "SELECT WAIT_FOR_EXECUTED_GTID_SET($gtidArg, $timeout)",
                    ISQLPlatform::QUERY_IGNORE_DBO_TRX | ISQLPlatform::QUERY_CHANGE_NONE,
                    'SELECT',
                    null,
                    "SELECT WAIT_FOR_EXECUTED_GTID_SET(?, ?)"
                );
            } else {
                // MariaDB GTIDs, e.g."domain:server:sequence"
                $query = new Query(
                    "SELECT MASTER_GTID_WAIT($gtidArg, $timeout)",
                    ISQLPlatform::QUERY_IGNORE_DBO_TRX | ISQLPlatform::QUERY_CHANGE_NONE,
                    'SELECT',
                    null,
                    "SELECT MASTER_GTID_WAIT(?, ?)"
                );
            }
            $waitPos = implode( ',', $gtidsWait );
        } else {
            // Wait on the binlog coordinates
            $encFile = $conn->addQuotes( $pos->getLogFile() );
            // @phan-suppress-next-line PhanTypeArraySuspiciousNullable
            $encPos = intval( $pos->getLogPosition()[$pos::CORD_EVENT] );
            $query = new Query(
                "SELECT MASTER_POS_WAIT($encFile, $encPos, $timeout)",
                ISQLPlatform::QUERY_IGNORE_DBO_TRX | ISQLPlatform::QUERY_CHANGE_NONE,
                'SELECT',
                null,
                "SELECT MASTER_POS_WAIT(?, ?, ?)"
            );
            $waitPos = $pos->__toString();
        }

        $start = microtime( true );
        $res = $conn->query( $query, __METHOD__ );
        $row = $res->fetchRow();
        $seconds = max( microtime( true ) - $start, 0 );

        // Result can be NULL (error), -1 (timeout), or 0+ per the MySQL manual
        $status = ( $row[0] !== null ) ? intval( $row[0] ) : null;
        if ( $status === null ) {
            $this->logger->error(
                "An error occurred while waiting for replication to reach {wait_pos}",
                $this->getLogContext( $conn, [
                    'raw_pos' => $pos,
                    'wait_pos' => $waitPos,
                    'sql' => $query->getSQL(),
                    'seconds_waited' => $seconds,
                    'exception' => new RuntimeException()
                ] )
            );
        } elseif ( $status < 0 ) {
            $this->logger->info(
                "Timed out waiting for replication to reach {wait_pos}",
                $this->getLogContext( $conn, [
                    'raw_pos' => $pos,
                    'wait_pos' => $waitPos,
                    'timeout' => $timeout,
                    'sql' => $query->getSQL(),
                    'seconds_waited' => $seconds,
                ] )
            );
        } elseif ( $status >= 0 ) {
            $this->logger->debug(
                "Replication has reached {wait_pos}",
                $this->getLogContext( $conn, [
                    'raw_pos' => $pos,
                    'wait_pos' => $waitPos,
                    'seconds_waited' => $seconds,
                ] )
            );
            // Remember that this position was reached to save queries next time
            $this->lastKnownReplicaPos = $pos;
        }

        return $status;
    }

    /**
     * Get the position of the primary DB from SHOW SLAVE STATUS
     *
     * @param IDatabase $conn To make queries
     * @return MySQLPrimaryPos|false
     */
    public function getReplicaPos( IDatabase $conn ) {
        $now = microtime( true ); // as-of-time *before* fetching GTID variables

        if ( $this->useGTIDs() ) {
            // Try to use GTIDs, fallbacking to binlog positions if not possible
            $data = $this->getServerGTIDs( $conn, __METHOD__ );
            // Use gtid_slave_pos for MariaDB and gtid_executed for MySQL
            foreach ( [ 'gtid_slave_pos', 'gtid_executed' ] as $name ) {
                if ( isset( $data[$name] ) && strlen( $data[$name] ) ) {
                    return new MySQLPrimaryPos( $data[$name], $now );
                }
            }
        }

        $data = $this->getServerRoleStatus( $conn, 'SLAVE', __METHOD__ );
        if ( $data && strlen( $data['Relay_Master_Log_File'] ) ) {
            return new MySQLPrimaryPos(
                "{$data['Relay_Master_Log_File']}/{$data['Exec_Master_Log_Pos']}",
                $now
            );
        }

        return false;
    }

    /**
     * Get the position of the primary DB from SHOW MASTER STATUS
     *
     * @param IDatabase $conn To make queries
     * @return MySQLPrimaryPos|false
     */
    public function getPrimaryPos( IDatabase $conn ) {
        $now = microtime( true ); // as-of-time *before* fetching GTID variables

        $pos = false;
        if ( $this->useGTIDs() ) {
            // Try to use GTIDs, fallbacking to binlog positions if not possible
            $data = $this->getServerGTIDs( $conn, __METHOD__ );
            // Use gtid_binlog_pos for MariaDB and gtid_executed for MySQL
            foreach ( [ 'gtid_binlog_pos', 'gtid_executed' ] as $name ) {
                if ( isset( $data[$name] ) && strlen( $data[$name] ) ) {
                    $pos = new MySQLPrimaryPos( $data[$name], $now );
                    break;
                }
            }
            // Filter domains that are inactive or not relevant to the session
            if ( $pos ) {
                $pos->setActiveOriginServerId( $this->getServerId( $conn ) );
                $pos->setActiveOriginServerUUID( $this->getServerUUID( $conn ) );
                if ( isset( $data['gtid_domain_id'] ) ) {
                    $pos->setActiveDomain( $data['gtid_domain_id'] );
                }
            }
        }

        if ( !$pos ) {
            $data = $this->getServerRoleStatus( $conn, 'MASTER', __METHOD__ );
            if ( $data && strlen( $data['File'] ) ) {
                $pos = new MySQLPrimaryPos( "{$data['File']}/{$data['Position']}", $now );
            }
        }

        return $pos;
    }

    /**
     * @param IDatabase $conn To make queries
     * @return string Value of server_id (32-bit integer, unique to the replication topology)
     * @throws DBQueryError
     */
    protected function getServerId( IDatabase $conn ) {
        $fname = __METHOD__;
        return $this->srvCache->getWithSetCallback(
            $this->srvCache->makeGlobalKey( 'mysql-server-id', $conn->getServerName() ),
            self::SERVER_ID_CACHE_TTL,
            static function () use ( $conn, $fname ) {
                $query = new Query(
                    "SELECT @@server_id AS id",
                    ISQLPlatform::QUERY_IGNORE_DBO_TRX | ISQLPlatform::QUERY_CHANGE_NONE,
                    'SELECT',
                    null,
                    "SELECT @@server_id AS id"
                );
                $res = $conn->query( $query, $fname );

                return $res->fetchObject()->id;
            }
        );
    }

    /**
     * @param IDatabase $conn To make queries
     * @return string|null Value of server_uuid (hyphenated 128-bit hex string, globally unique)
     * @throws DBQueryError
     */
    protected function getServerUUID( IDatabase $conn ) {
        $fname = __METHOD__;
        return $this->srvCache->getWithSetCallback(
            $this->srvCache->makeGlobalKey( 'mysql-server-uuid', $conn->getServerName() ),
            self::SERVER_ID_CACHE_TTL,
            static function () use ( $conn, $fname ) {
                $query = new Query(
                    "SHOW GLOBAL VARIABLES LIKE 'server_uuid'",
                    ISQLPlatform::QUERY_IGNORE_DBO_TRX | ISQLPlatform::QUERY_CHANGE_NONE,
                    'SHOW',
                    null,
                    "SHOW GLOBAL VARIABLES LIKE 'server_uuid'"
                );
                $res = $conn->query( $query, $fname );
                $row = $res->fetchObject();

                return $row ? $row->Value : null;
            }
        );
    }

    /**
     * @param IDatabase $conn To make queries
     * @param string $fname
     * @return string[]
     */
    protected function getServerGTIDs( IDatabase $conn, $fname = __METHOD__ ) {
        $map = [];

        $flags = ISQLPlatform::QUERY_IGNORE_DBO_TRX | ISQLPlatform::QUERY_CHANGE_NONE;

        // Get global-only variables like gtid_executed
        $query = new Query(
            "SHOW GLOBAL VARIABLES LIKE 'gtid_%'",
            $flags,
            'SHOW',
            null,
            "SHOW GLOBAL VARIABLES LIKE 'gtid_%'"
        );
        $res = $conn->query( $query, $fname );
        foreach ( $res as $row ) {
            $map[$row->Variable_name] = $row->Value;
        }
        // Get session-specific (e.g. gtid_domain_id since that is were writes will log)
        $query = new Query(
            "SHOW SESSION VARIABLES LIKE 'gtid_%'",
            $flags,
            'SHOW',
            null,
            "SHOW SESSION VARIABLES LIKE 'gtid_%'"
        );
        $res = $conn->query( $query, $fname );
        foreach ( $res as $row ) {
            $map[$row->Variable_name] = $row->Value;
        }

        return $map;
    }

    /**
     * @param IDatabase $conn To make queries
     * @param string $role One of "MASTER"/"SLAVE"
     * @param string $fname
     * @return array<string,mixed>|null Latest available server status row; false on failure
     */
    protected function getServerRoleStatus( IDatabase $conn, $role, $fname = __METHOD__ ) {
        $query = new Query(
            "SHOW $role STATUS",
            ISQLPlatform::QUERY_SILENCE_ERRORS | ISQLPlatform::QUERY_IGNORE_DBO_TRX | ISQLPlatform::QUERY_CHANGE_NONE,
            'SHOW',
            null,
            "SHOW $role STATUS"
        );
        $res = $conn->query( $query, $fname );
        $row = $res ? $res->fetchRow() : false;

        return ( $row ?: null );
    }

}