wikimedia/mediawiki-core

View on GitHub
includes/config/EtcdConfig.php

Summary

Maintainability
C
1 day
Test Coverage
<?php
/**
 * This program is free software; you can redistribute it and/or modify
 * it under the terms of the GNU General Public License as published by
 * the Free Software Foundation; either version 2 of the License, or
 * (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
 * GNU General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License along
 * with this program; if not, write to the Free Software Foundation, Inc.,
 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
 * http://www.gnu.org/copyleft/gpl.html
 *
 * @file
 */

namespace MediaWiki\Config;

use BagOStuff;
use DnsSrvDiscoverer;
use HashBagOStuff;
use Psr\Log\LoggerAwareInterface;
use Psr\Log\LoggerInterface;
use Wikimedia\Http\MultiHttpClient;
use Wikimedia\IPUtils;
use Wikimedia\ObjectFactory\ObjectFactory;
use Wikimedia\WaitConditionLoop;

/**
 * Interface for configuration instances
 *
 * @since 1.29
 */
class EtcdConfig implements Config, LoggerAwareInterface {
    /** @var MultiHttpClient */
    private $http;
    /** @var BagOStuff */
    private $srvCache;
    /** @var array */
    private $procCache;
    /** @var DnsSrvDiscoverer */
    private $dsd;

    /** @var string */
    private $service;
    /** @var string */
    private $host;
    /** @var ?int */
    private $port;
    /** @var string */
    private $protocol;
    /** @var string */
    private $directory;
    /** @var int */
    private $baseCacheTTL;
    /** @var int */
    private $skewCacheTTL;
    /** @var int */
    private $timeout;

    /**
     * @param array $params Parameter map:
     *   - host: the host address
     *   - directory: the etc "directory" were MediaWiki specific variables are located
     *   - service: service name used in SRV discovery. Defaults to 'etcd'. [optional]
     *   - port: custom host port [optional]
     *   - protocol: one of ("http", "https"). Defaults to http. [optional]
     *   - cache: BagOStuff instance or ObjectFactory spec thereof for a server cache.
     *            The cache will also be used as a fallback if etcd is down. [optional]
     *   - cacheTTL: logical cache TTL in seconds [optional]
     *   - skewTTL: maximum seconds to randomly lower the assigned TTL on cache save [optional]
     *   - timeout: seconds to wait for etcd before throwing an error [optional]
     */
    public function __construct( array $params ) {
        $params += [
            'service' => 'etcd',
            'port' => null,
            'protocol' => 'http',
            'cacheTTL' => 10,
            'skewTTL' => 1,
            'timeout' => 2
        ];

        $this->service = $params['service'];
        $this->host = $params['host'];
        $this->port = $params['port'];
        $this->protocol = $params['protocol'];
        $this->directory = trim( $params['directory'], '/' );
        $this->skewCacheTTL = $params['skewTTL'];
        $this->baseCacheTTL = max( $params['cacheTTL'] - $this->skewCacheTTL, 0 );
        $this->timeout = $params['timeout'];

        // For backwards compatibility, check the host for an embedded port
        $hostAndPort = IPUtils::splitHostAndPort( $this->host );

        if ( $hostAndPort ) {
            $this->host = $hostAndPort[0];

            if ( $hostAndPort[1] ) {
                $this->port = $hostAndPort[1];
            }
        }

        // Also for backwards compatibility, check for a host in the format of
        // an SRV record and use the service specified therein
        if ( preg_match( '/^_([^\.]+)\._tcp\.(.+)$/', $this->host, $m ) ) {
            $this->service = $m[1];
            $this->host = $m[2];
        }

        if ( !isset( $params['cache'] ) ) {
            $this->srvCache = new HashBagOStuff();
        } elseif ( $params['cache'] instanceof BagOStuff ) {
            $this->srvCache = $params['cache'];
        } else {
            $this->srvCache = ObjectFactory::getObjectFromSpec( $params['cache'] );
        }

        $this->http = new MultiHttpClient( [
            'connTimeout' => $this->timeout,
            'reqTimeout' => $this->timeout,
        ] );
        $this->dsd = new DnsSrvDiscoverer( $this->service, 'tcp', $this->host );
    }

    /**
     * @deprecated since 1.41 No longer used and did not work in practice
     */
    public function setLogger( LoggerInterface $logger ) {
        trigger_error( __METHOD__ . ' is deprecated since 1.41', E_USER_DEPRECATED );
    }

    public function has( $name ) {
        $this->load();

        return array_key_exists( $name, $this->procCache['config'] );
    }

    public function get( $name ) {
        $this->load();

        if ( !array_key_exists( $name, $this->procCache['config'] ) ) {
            throw new ConfigException( "No entry found for '$name'." );
        }

        return $this->procCache['config'][$name];
    }

    public function getModifiedIndex() {
        $this->load();
        return $this->procCache['modifiedIndex'];
    }

    /**
     * @throws ConfigException
     */
    private function load() {
        if ( $this->procCache !== null ) {
            return; // already loaded
        }

        $now = microtime( true );
        $key = $this->srvCache->makeGlobalKey(
            __CLASS__,
            $this->host,
            $this->directory
        );

        // Get the cached value or block until it is regenerated (by this or another thread)...
        $data = null; // latest config info
        $error = null; // last error message
        $loop = new WaitConditionLoop(
            function () use ( $key, $now, &$data, &$error ) {
                // Check if the values are in cache yet...
                $data = $this->srvCache->get( $key );
                if ( is_array( $data ) && $data['expires'] > $now ) {
                    return WaitConditionLoop::CONDITION_REACHED;
                }

                // Cache is either empty or stale;
                // refresh the cache from etcd, using a mutex to reduce stampedes...
                if ( $this->srvCache->lock( $key, 0, $this->baseCacheTTL ) ) {
                    try {
                        $etcdResponse = $this->fetchAllFromEtcd();
                        $error = $etcdResponse['error'];
                        if ( is_array( $etcdResponse['config'] ) ) {
                            // Avoid having all servers expire cache keys at the same time
                            $expiry = microtime( true ) + $this->baseCacheTTL;
                            // @phan-suppress-next-line PhanTypeMismatchArgumentInternal
                            $expiry += mt_rand( 0, 1e6 ) / 1e6 * $this->skewCacheTTL;
                            $data = [
                                'config' => $etcdResponse['config'],
                                'expires' => $expiry,
                                'modifiedIndex' => $etcdResponse['modifiedIndex']
                            ];
                            $this->srvCache->set( $key, $data, BagOStuff::TTL_INDEFINITE );

                            return WaitConditionLoop::CONDITION_REACHED;
                        } else {
                            trigger_error( "EtcdConfig failed to fetch data: $error", E_USER_WARNING );
                            if ( !$etcdResponse['retry'] && !is_array( $data ) ) {
                                // Fail fast since the error is likely to keep happening
                                return WaitConditionLoop::CONDITION_FAILED;
                            }
                        }
                    } finally {
                        $this->srvCache->unlock( $key ); // release mutex
                    }
                } else {
                    $error = 'lost lock';
                }

                if ( is_array( $data ) ) {
                    trigger_error( "EtcdConfig using stale data: $error", E_USER_NOTICE );

                    return WaitConditionLoop::CONDITION_REACHED;
                }

                return WaitConditionLoop::CONDITION_CONTINUE;
            },
            $this->timeout
        );

        if ( $loop->invoke() !== WaitConditionLoop::CONDITION_REACHED ) {
            // No cached value exists and etcd query failed; throw an error
            // @phan-suppress-next-line PhanTypeSuspiciousStringExpression WaitConditionLoop throws or error set
            throw new ConfigException( "Failed to load configuration from etcd: $error" );
        }

        // @phan-suppress-next-line PhanTypeMismatchProperty WaitConditionLoop throws ore data set
        $this->procCache = $data;
    }

    /**
     * @return array (containing the keys config, error, retry, modifiedIndex)
     */
    public function fetchAllFromEtcd() {
        $servers = $this->dsd->getServers() ?: [ [ $this->host, $this->port ] ];

        foreach ( $servers as [ $host, $port ] ) {
            // Try to load the config from this particular server
            $response = $this->fetchAllFromEtcdServer( $host, $port );
            if ( is_array( $response['config'] ) || $response['retry'] ) {
                break;
            }
        }

        return $response;
    }

    /**
     * @param string $address Host
     * @param ?int $port Port
     * @return array (containing the keys config, error, retry, modifiedIndex)
     */
    protected function fetchAllFromEtcdServer( string $address, ?int $port = null ) {
        $host = $address;

        if ( $port !== null ) {
            $host = IPUtils::combineHostAndPort( $address, $port );
        }

        // Retrieve all the values under the MediaWiki config directory
        [ $rcode, $rdesc, /* $rhdrs */, $rbody, $rerr ] = $this->http->run( [
            'method' => 'GET',
            'url' => "{$this->protocol}://{$host}/v2/keys/{$this->directory}/?recursive=true",
            'headers' => [
                'content-type' => 'application/json',
            ]
        ] );

        $response = [ 'config' => null, 'error' => null, 'retry' => false, 'modifiedIndex' => 0 ];

        static $terminalCodes = [ 404 => true ];
        if ( $rcode < 200 || $rcode > 399 ) {
            $response['error'] = strlen( $rerr ?? '' ) ? $rerr : "HTTP $rcode ($rdesc)";
            $response['retry'] = empty( $terminalCodes[$rcode] );
            return $response;
        }

        try {
            $parsedResponse = $this->parseResponse( $rbody );
        } catch ( EtcdConfigParseError $e ) {
            $parsedResponse = [ 'error' => $e->getMessage() ];
        }
        return array_merge( $response, $parsedResponse );
    }

    /**
     * Parse a response body, throwing EtcdConfigParseError if there is a validation error
     *
     * @param string $rbody
     * @return array
     */
    protected function parseResponse( $rbody ) {
        $info = json_decode( $rbody, true );
        if ( $info === null ) {
            throw new EtcdConfigParseError( "Error unserializing JSON response." );
        }
        if ( !isset( $info['node'] ) || !is_array( $info['node'] ) ) {
            throw new EtcdConfigParseError(
                "Unexpected JSON response: Missing or invalid node at top level." );
        }
        $config = [];
        $lastModifiedIndex = $this->parseDirectory( '', $info['node'], $config );
        return [ 'modifiedIndex' => $lastModifiedIndex, 'config' => $config ];
    }

    /**
     * Recursively parse a directory node and populate the array passed by
     * reference, throwing EtcdConfigParseError if there is a validation error
     *
     * @param string $dirName The relative directory name
     * @param array $dirNode The decoded directory node
     * @param array &$config The output array
     * @return int lastModifiedIndex The maximum last modified index across all keys in the directory
     */
    protected function parseDirectory( $dirName, $dirNode, &$config ) {
        $lastModifiedIndex = 0;
        if ( !isset( $dirNode['nodes'] ) ) {
            throw new EtcdConfigParseError(
                "Unexpected JSON response in dir '$dirName'; missing 'nodes' list." );
        }
        if ( !is_array( $dirNode['nodes'] ) ) {
            throw new EtcdConfigParseError(
                "Unexpected JSON response in dir '$dirName'; 'nodes' is not an array." );
        }

        foreach ( $dirNode['nodes'] as $node ) {
            '@phan-var array $node';
            $baseName = basename( $node['key'] );
            $fullName = $dirName === '' ? $baseName : "$dirName/$baseName";
            if ( !empty( $node['dir'] ) ) {
                $lastModifiedIndex = max(
                    $this->parseDirectory( $fullName, $node, $config ),
                    $lastModifiedIndex );
            } else {
                $value = $this->unserialize( $node['value'] );
                if ( !is_array( $value ) || !array_key_exists( 'val', $value ) ) {
                    throw new EtcdConfigParseError( "Failed to parse value for '$fullName'." );
                }
                $lastModifiedIndex = max( $node['modifiedIndex'], $lastModifiedIndex );
                $config[$fullName] = $value['val'];
            }
        }
        return $lastModifiedIndex;
    }

    /**
     * @param string $string
     * @return mixed
     */
    private function unserialize( $string ) {
        return json_decode( $string, true );
    }
}

/** @deprecated class alias since 1.41 */
class_alias( EtcdConfig::class, 'EtcdConfig' );