includes/ElasticsearchIntermediary.php
<?php
namespace CirrusSearch;
use CirrusSearch\Search\SearchMetricsProvider;
use Elastica\Exception\ExceptionInterface;
use Elastica\Exception\ResponseException;
use Elastica\Exception\RuntimeException;
use Elastica\Multi\ResultSet as MultiResultSet;
use Elastica\Multi\Search;
use ISearchResultSet;
use MediaWiki\Config\ConfigException;
use MediaWiki\Context\RequestContext;
use MediaWiki\Logger\LoggerFactory;
use MediaWiki\Status\Status;
use MediaWiki\User\UserIdentity;
use Wikimedia\Assert\Assert;
/**
* Base class with useful functions for communicating with Elasticsearch.
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 2 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License along
* with this program; if not, write to the Free Software Foundation, Inc.,
* 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
* http://www.gnu.org/copyleft/gpl.html
*/
abstract class ElasticsearchIntermediary {
/**
* @var Connection
*/
protected $connection;
/**
* @var UserIdentity|null user for which we're performing this search or null in
* the case of requests kicked off by jobs
*/
protected $user;
/**
* @var RequestLog|null Log for in-progress search request
*/
protected $currentRequestLog = null;
/**
* @var int how many millis a request through this intermediary needs to
* take before it counts as slow. 0 means none count as slow.
*/
private $slowMillis;
/**
* @var array Metrics about a completed search
*/
private $searchMetrics = [];
/**
* @var int artificial extra backend latency in micro seconds
*/
private $extraBackendLatency;
/**
* @var RequestLogger
*/
protected static $requestLogger;
/**
* @param Connection $connection
* @param UserIdentity|null $user user for which this search is being performed.
* Attached to slow request logs. Note that null isn't for anonymous users
* - those are still User objects and should be provided if possible. Null
* is for when the action is being performed in some context where the user
* that caused it isn't available. Like when an action is being performed
* during a job.
* @param float|null $slowSeconds how many seconds a request through this
* intermediary needs to take before it counts as slow. 0 means none count
* as slow. Defaults to CirrusSearchSlowSearch config option.
* @param int $extraBackendLatency artificial backend latency.
*/
protected function __construct( Connection $connection, UserIdentity $user = null, $slowSeconds = null, $extraBackendLatency = 0 ) {
$this->connection = $connection;
$this->user = $user ?? RequestContext::getMain()->getUser();
$this->slowMillis = (int)( 1000 * ( $slowSeconds ?? $connection->getConfig()->get( 'CirrusSearchSlowSearch' ) ) );
$this->extraBackendLatency = $extraBackendLatency;
if ( self::$requestLogger === null ) {
self::$requestLogger = new RequestLogger;
}
// This isn't explicitly used, but we need to make sure it is
// instantiated so it has the opportunity to override global
// configuration for test buckets.
UserTestingStatus::getInstance();
}
/**
* This is set externally because we don't have complete control, from the
* SearchEngine interface, of what is actually sent to the user. Instead hooks
* receive the final results that will be sent to the user and set them here.
*
* Accepts two result sets because some places (Special:Search) perform multiple
* searches. This can be called multiple times, but only that last call wins. For
* API's that is correct, for Special:Search a hook catches the final results and
* sets them here.
*
* @param ISearchResultSet[] $matches
*/
public static function setResultPages( array $matches ) {
if ( self::$requestLogger === null ) {
// This could happen if Cirrus is not the active engine,
// but the hook is still loaded. In this case, do nothing.
return;
} else {
self::$requestLogger->setResultPages( $matches );
}
}
/**
* Report the types of queries that were issued
* within the current request.
*
* @return string[]
*/
public static function getQueryTypesUsed() {
if ( self::$requestLogger === null ) {
// This can happen when, for example, completion search is
// triggered against NS_SPECIAL, where searching is done strictly
// in PHP and never actually creates a SearchEngine.
return [];
} else {
return self::$requestLogger->getQueryTypesUsed();
}
}
/**
* @return bool True when query logs have been generated by the
* current php execution.
*/
public static function hasQueryLogs() {
if ( self::$requestLogger === null ) {
return false;
}
return self::$requestLogger->hasQueryLogs();
}
/**
* Mark the start of a request to Elasticsearch. Public so it can be
* called from pool counter methods.
*
* @param RequestLog $log
*/
public function start( RequestLog $log ) {
$this->currentRequestLog = $log;
$log->start();
if ( $this->extraBackendLatency ) {
usleep( $this->extraBackendLatency );
}
}
/**
* Log a successful request and return the provided result in a good
* Status. If you don't need the status just ignore the return. Public so
* it can be called from pool counter methods.
*
* @param mixed|null $result result of the request. defaults to null in case
* the request doesn't have a result
* @param Connection|null $connection The connection the succesful
* request was performed against. Will use $this->connection when not
* provided.
* @return Status wrapping $result
*/
public function success( $result = null, Connection $connection = null ) {
$this->finishRequest( $connection ?? $this->connection );
return Status::newGood( $result );
}
/**
* Log a successful request when the response comes from a cache outside
* elasticsearch. This is a combination of self::start() and self::success().
*
* @param RequestLog $log
*/
public function successViaCache( RequestLog $log ) {
if ( $this->extraBackendLatency ) {
usleep( $this->extraBackendLatency );
}
self::$requestLogger->addRequest( $log );
}
/**
* Log a failure and return an appropriate status. Public so it can be
* called from pool counter methods.
*
* @param ExceptionInterface|null $exception if the request failed
* @param Connection|null $connection The connection that the failed
* request was performed against. Will use $this->connection when not
* provided.
* @return Status representing a backend failure
*/
public function failure( ExceptionInterface $exception = null, Connection $connection = null ) {
$connection ??= $this->connection;
$log = $this->finishRequest( $connection );
if ( $log === null ) {
// Request was never started, likely trying to close a request
// a second time. If so that was already logged by finishRequest.
$context = [];
$logType = 'not_started';
} else {
$context = $log->getLogVariables();
$logType = $log->getDescription();
}
[ $status, $message ] = ElasticaErrorHandler::extractMessageAndStatus( $exception );
// This could be multiple MB if the failure is coming from an update
// script, as the whole update script is returned in the error
// including the parameters. Truncate to a reasonable level so
// downstream log processing doesn't truncate them (and then fail to
// parse the truncated json). Take the first 4k to leave plenty of room for
// whatever else.
$context['error_message'] = mb_substr( $message, 0, 4096 );
$stats = Util::getStatsFactory();
$type = ElasticaErrorHandler::classifyError( $exception );
$clusterName = $connection->getClusterName();
$context['cirrussearch_error_type'] = $type;
$stats->getCounter( "backend_failures_total" )
->setLabel( "search_cluster", $clusterName )
->setLabel( "type", $type )
->copyToStatsdAt( "CirrusSearch.$clusterName.backend_failure.$type" )
->increment();
LoggerFactory::getInstance( 'CirrusSearch' )->warning(
"Search backend error during {$logType} after {tookMs}: {error_message}",
$context
);
return $status;
}
/**
* Get the search metrics we have
* @return array
*/
public function getSearchMetrics() {
return $this->searchMetrics;
}
/**
* Log the completion of a request to Elasticsearch.
*
* @param Connection $connection
* @return RequestLog|null The log for the finished request, or null if no
* request was started.
*/
private function finishRequest( Connection $connection ) {
if ( !$this->currentRequestLog ) {
LoggerFactory::getInstance( 'CirrusSearch' )->warning(
'finishRequest called without staring a request'
);
return null;
}
$log = $this->currentRequestLog;
$this->currentRequestLog = null;
$log->finish();
$tookMs = $log->getTookMs();
$clusterName = $connection->getClusterName();
$this->searchMetrics['wgCirrusTookMs'] = $tookMs;
self::$requestLogger->addRequest( $log, $this->user, $this->slowMillis );
$type = $log->getQueryType();
$stats = Util::getStatsFactory();
$stats->getTiming( "request_time_seconds" )
->setLabel( "search_cluster", $clusterName )
->setLabel( "type", $type )
->copyToStatsdAt( [
"CirrusSearch.$clusterName.requestTimeMs.$type",
"CirrusSearch.$clusterName.requestTime"
] )
->observe( $tookMs );
if ( $log->getElasticTookMs() ) {
$this->searchMetrics['wgCirrusElasticTime'] = $log->getElasticTookMs();
}
return $log;
}
/**
* @param string $key
* @param string $value
*/
public static function appendLastLogPayload( $key, $value ) {
if ( self::$requestLogger !== null ) {
// Guard only for unit tests that heavily mock classes
self::$requestLogger->appendLastLogPayload( $key, $value );
} else {
Assert::invariant( defined( 'MW_PHPUNIT_TEST' ),
'appendLastLogPayload must only be called after self::$requestLogger has been set ' .
'or during unit tests' );
}
}
/**
* @param string $description A psr-3 compliant string describing the request
* @param string $queryType The type of search being performed such as
* fulltext, get, etc.
* @param array $extra A map of additional request-specific data
* @return RequestLog
*/
protected function startNewLog( $description, $queryType, array $extra = [] ) {
$log = $this->newLog( $description, $queryType, $extra );
$this->start( $log );
return $log;
}
/**
* @param string $description A psr-3 compliant string describing the request
* @param string $queryType The type of search being performed such as
* fulltext, get, etc.
* @param array $extra A map of additional request-specific data
* @return RequestLog
*/
abstract protected function newLog( $description, $queryType, array $extra = [] );
/**
* @param string $searchType
* @return string search retrieval timeout
*/
protected function getTimeout( $searchType = 'default' ) {
$timeout = $this->connection->getConfig()->getElement( 'CirrusSearchSearchShardTimeout', $searchType );
if ( $timeout !== null ) {
return $timeout;
}
$timeout = $this->connection->getConfig()->getElement( 'CirrusSearchSearchShardTimeout', 'default' );
if ( $timeout !== null ) {
return $timeout;
}
throw new ConfigException( "wgCirrusSearchSearchShardTimeout should have at least a 'default' entry configured" );
}
/**
* @param string $searchType
* @return int the client side timeout
*/
protected function getClientTimeout( $searchType = 'default' ) {
$timeout = $this->connection->getConfig()->getElement( 'CirrusSearchClientSideSearchTimeout', $searchType );
if ( $timeout !== null ) {
return $timeout;
}
$timeout = $this->connection->getConfig()->getElement( 'CirrusSearchClientSideSearchTimeout', 'default' );
if ( $timeout !== null ) {
return $timeout;
}
throw new ConfigException( "wgCirrusSearchClientSideSearchTimeout should have at least a 'default' entry configured" );
}
/**
* @param SearchMetricsProvider $provider
*/
protected function appendMetrics( SearchMetricsProvider $provider ) {
$this->searchMetrics += $provider->getMetrics();
}
/**
* check validity of the multisearch response
*
* @param MultiResultSet $multiResultSet
* @return bool
*/
public static function isMSearchResultSetOK( MultiResultSet $multiResultSet ): bool {
return !$multiResultSet->hasError() &&
// Catches HTTP errors (ex: 5xx) not reported
// by hasError()
$multiResultSet->getResponse()->isOk();
}
/**
* @param Search $search
* @param RequestLog $log
* @param Connection|null $connection
* @param callable|null $resultsTransformer that accepts a Multi/ResultSets
* @return Status
*/
protected function runMSearch(
Search $search,
RequestLog $log,
Connection $connection = null,
callable $resultsTransformer = null
): Status {
$connection = $connection ?: $this->connection;
$this->start( $log );
try {
$multiResultSet = $search->search();
$lastRequest = $connection->getClient()->getLastRequest();
if ( !$multiResultSet->getResponse()->isOk() ) {
// bad response from server. Should elastica be throwing an exception for this?
if ( $lastRequest !== null ) {
return $this->failure( new ResponseException( $lastRequest, $multiResultSet->getResponse() ), $connection );
} else {
return $this->failure( new RuntimeException( "Client::getLastRequest() should not be null" ), $connection );
}
}
foreach ( $multiResultSet->getResultSets() as $resultSet ) {
if ( $resultSet->getResponse()->hasError() ) {
if ( $lastRequest !== null ) {
return $this->failure( new ResponseException( $lastRequest, $resultSet->getResponse() ), $connection );
} else {
return $this->failure( new RuntimeException( "Client::getLastRequest() should not be null" ), $connection );
}
}
}
return $this->success( $resultsTransformer !== null ? $resultsTransformer( $multiResultSet ) : $multiResultSet, $connection );
} catch ( ExceptionInterface $e ) {
return $this->failure( $e, $connection );
}
}
}