includes/libs/rdbms/platform/SQLPlatform.php
<?php
/**
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 2 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License along
* with this program; if not, write to the Free Software Foundation, Inc.,
* 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
* http://www.gnu.org/copyleft/gpl.html
*
* @file
*/
namespace Wikimedia\Rdbms\Platform;
use InvalidArgumentException;
use Psr\Log\LoggerInterface;
use Psr\Log\NullLogger;
use RuntimeException;
use Throwable;
use Wikimedia\Assert\Assert;
use Wikimedia\Rdbms\Database\DbQuoter;
use Wikimedia\Rdbms\DatabaseDomain;
use Wikimedia\Rdbms\DBLanguageError;
use Wikimedia\Rdbms\IExpression;
use Wikimedia\Rdbms\LikeMatch;
use Wikimedia\Rdbms\LikeValue;
use Wikimedia\Rdbms\Query;
use Wikimedia\Rdbms\QueryBuilderFromRawSql;
use Wikimedia\Rdbms\Subquery;
use Wikimedia\Timestamp\ConvertibleTimestamp;
/**
* Sql abstraction object.
* This class nor any of its subclasses shouldn't create a db connection.
* It also should not become stateful. The constructor should only rely on addQuotes() method in Database.
* Later that should be replaced with an implementation that doesn't use db connections.
* @since 1.39
*/
class SQLPlatform implements ISQLPlatform {
/** @var array[] Current map of (table => (dbname, schema, prefix) map) */
protected $tableAliases = [];
/** @var string[] Current map of (index alias => index) */
protected $indexAliases = [];
/** @var DatabaseDomain|null */
protected $currentDomain;
/** @var array|null Current variables use for schema element placeholders */
protected $schemaVars;
/** @var DbQuoter */
protected $quoter;
/** @var LoggerInterface */
protected $logger;
/** @var callable Error logging callback */
protected $errorLogger;
public function __construct(
DbQuoter $quoter,
LoggerInterface $logger = null,
DatabaseDomain $currentDomain = null,
$errorLogger = null
) {
$this->quoter = $quoter;
$this->logger = $logger ?? new NullLogger();
$this->currentDomain = $currentDomain ?: DatabaseDomain::newUnspecified();
$this->errorLogger = $errorLogger ?? static function ( Throwable $e ) {
trigger_error( get_class( $e ) . ': ' . $e->getMessage(), E_USER_WARNING );
};
}
public function bitNot( $field ) {
return "(~$field)";
}
public function bitAnd( $fieldLeft, $fieldRight ) {
return "($fieldLeft & $fieldRight)";
}
public function bitOr( $fieldLeft, $fieldRight ) {
return "($fieldLeft | $fieldRight)";
}
public function addIdentifierQuotes( $s ) {
if ( strcspn( $s, "\0\"`'." ) !== strlen( $s ) ) {
throw new DBLanguageError(
"Identifier must not contain quote, dot or null characters"
);
}
$quoteChar = $this->getIdentifierQuoteChar();
return $quoteChar . $s . $quoteChar;
}
/**
* Get the character used for identifier quoting
* @return string
*/
protected function getIdentifierQuoteChar() {
return '"';
}
/**
* @inheritDoc
*/
public function buildGreatest( $fields, $values ) {
return $this->buildSuperlative( 'GREATEST', $fields, $values );
}
/**
* @inheritDoc
*/
public function buildLeast( $fields, $values ) {
return $this->buildSuperlative( 'LEAST', $fields, $values );
}
/**
* Build a superlative function statement comparing columns/values
*
* Integer and float values in $values will not be quoted
*
* If $fields is an array, then each value with a string key is treated as an expression
* (which must be manually quoted); such string keys do not appear in the SQL and are only
* descriptive aliases.
*
* @param string $sqlfunc Name of a SQL function
* @param string|string[] $fields Name(s) of column(s) with values to compare
* @param string|int|float|string[]|int[]|float[] $values Values to compare
* @return string
*/
protected function buildSuperlative( $sqlfunc, $fields, $values ) {
$fields = is_array( $fields ) ? $fields : [ $fields ];
$values = is_array( $values ) ? $values : [ $values ];
$encValues = [];
foreach ( $fields as $alias => $field ) {
if ( is_int( $alias ) ) {
$encValues[] = $this->addIdentifierQuotes( $field );
} else {
$encValues[] = $field; // expression
}
}
foreach ( $values as $value ) {
if ( is_int( $value ) || is_float( $value ) ) {
$encValues[] = $value;
} elseif ( is_string( $value ) ) {
$encValues[] = $this->quoter->addQuotes( $value );
} elseif ( $value === null ) {
throw new DBLanguageError( 'Null value in superlative' );
} else {
throw new DBLanguageError( 'Unexpected value type in superlative' );
}
}
return $sqlfunc . '(' . implode( ',', $encValues ) . ')';
}
public function buildComparison( string $op, array $conds ): string {
if ( !in_array( $op, [ '>', '>=', '<', '<=' ] ) ) {
throw new InvalidArgumentException( "Comparison operator must be one of '>', '>=', '<', '<='" );
}
if ( count( $conds ) === 0 ) {
throw new InvalidArgumentException( "Empty input" );
}
// Construct a condition string by starting with the least significant part of the index, and
// adding more significant parts progressively to the left of the string.
//
// For example, given $conds = [ 'a' => 4, 'b' => 7, 'c' => 1 ], this will generate a condition
// like this:
//
// WHERE a > 4
// OR (a = 4 AND (b > 7
// OR (b = 7 AND (c > 1))))
//
// …which is equivalent to the following, which might be easier to understand:
//
// WHERE a > 4
// OR a = 4 AND b > 7
// OR a = 4 AND b = 7 AND c > 1
//
// …and also equivalent to the following, using tuple comparison syntax, which is most intuitive
// but apparently performs worse:
//
// WHERE (a, b, c) > (4, 7, 1)
$sql = '';
foreach ( array_reverse( $conds ) as $field => $value ) {
if ( is_int( $field ) ) {
throw new InvalidArgumentException(
'Non-associative array passed to buildComparison() (typo?)'
);
}
$encValue = $this->quoter->addQuotes( $value );
if ( $sql === '' ) {
$sql = "$field $op $encValue";
// Change '>=' to '>' etc. for remaining fields, as the equality is handled separately
$op = rtrim( $op, '=' );
} else {
$sql = "$field $op $encValue OR ($field = $encValue AND ($sql))";
}
}
return $sql;
}
public function makeList( array $a, $mode = self::LIST_COMMA ) {
$first = true;
$list = '';
foreach ( $a as $field => $value ) {
if ( $first ) {
$first = false;
} else {
if ( $mode == self::LIST_AND ) {
$list .= ' AND ';
} elseif ( $mode == self::LIST_OR ) {
$list .= ' OR ';
} else {
$list .= ',';
}
}
if ( ( $mode == self::LIST_AND || $mode == self::LIST_OR ) && is_numeric( $field ) ) {
if ( $value instanceof IExpression ) {
$list .= "(" . $value->toSql( $this->quoter ) . ")";
} else {
$list .= "($value)";
}
} elseif ( $mode == self::LIST_SET && is_numeric( $field ) ) {
$list .= "$value";
} elseif (
( $mode == self::LIST_AND || $mode == self::LIST_OR ) && is_array( $value )
) {
// Remove null from array to be handled separately if found
$includeNull = false;
foreach ( array_keys( $value, null, true ) as $nullKey ) {
$includeNull = true;
unset( $value[$nullKey] );
}
if ( count( $value ) == 0 && !$includeNull ) {
throw new InvalidArgumentException(
__METHOD__ . ": empty input for field $field" );
} elseif ( count( $value ) == 0 ) {
// only check if $field is null
$list .= "$field IS NULL";
} else {
// IN clause contains at least one valid element
if ( $includeNull ) {
// Group subconditions to ensure correct precedence
$list .= '(';
}
if ( count( $value ) == 1 ) {
// Special-case single values, as IN isn't terribly efficient
// Don't necessarily assume the single key is 0; we don't
// enforce linear numeric ordering on other arrays here.
$value = array_values( $value )[0];
$list .= $field . " = " . $this->quoter->addQuotes( $value );
} else {
$list .= $field . " IN (" . $this->makeList( $value ) . ") ";
}
// if null present in array, append IS NULL
if ( $includeNull ) {
$list .= " OR $field IS NULL)";
}
}
} elseif ( $value === null ) {
if ( $mode == self::LIST_AND || $mode == self::LIST_OR ) {
$list .= "$field IS ";
} elseif ( $mode == self::LIST_SET ) {
$list .= "$field = ";
}
$list .= 'NULL';
} else {
if (
$mode == self::LIST_AND || $mode == self::LIST_OR || $mode == self::LIST_SET
) {
$list .= "$field = ";
}
$list .= $mode == self::LIST_NAMES ? $value : $this->quoter->addQuotes( $value );
}
}
return $list;
}
public function makeWhereFrom2d( $data, $baseKey, $subKey ) {
$conds = [];
foreach ( $data as $base => $sub ) {
if ( count( $sub ) ) {
$conds[] = $this->makeList(
[ $baseKey => $base, $subKey => array_map( 'strval', array_keys( $sub ) ) ],
self::LIST_AND
);
}
}
if ( !$conds ) {
throw new InvalidArgumentException( "Data for $baseKey and $subKey must be non-empty" );
}
return $this->makeList( $conds, self::LIST_OR );
}
public function factorConds( $condsArray ) {
if ( count( $condsArray ) === 0 ) {
throw new InvalidArgumentException(
__METHOD__ . ": empty condition array" );
}
$condsByFieldSet = [];
foreach ( $condsArray as $conds ) {
if ( !count( $conds ) ) {
throw new InvalidArgumentException(
__METHOD__ . ": empty condition subarray" );
}
$fieldKey = implode( ',', array_keys( $conds ) );
$condsByFieldSet[$fieldKey][] = $conds;
}
$result = '';
foreach ( $condsByFieldSet as $conds ) {
if ( $result !== '' ) {
$result .= ' OR ';
}
$result .= $this->factorCondsWithCommonFields( $conds );
}
return $result;
}
/**
* Same as factorConds() but with each element in the array having the same
* set of array keys. Validation is done by the caller.
*
* @param array $condsArray
* @return string
*/
private function factorCondsWithCommonFields( $condsArray ) {
$first = $condsArray[array_key_first( $condsArray )];
if ( count( $first ) === 1 ) {
// IN clause
$field = array_key_first( $first );
$values = [];
foreach ( $condsArray as $conds ) {
$values[] = $conds[$field];
}
return $this->makeList( [ $field => $values ], self::LIST_AND );
}
$field1 = array_key_first( $first );
$nullExpressions = [];
$expressionsByField1 = [];
foreach ( $condsArray as $conds ) {
$value1 = $conds[$field1];
unset( $conds[$field1] );
if ( $value1 === null ) {
$nullExpressions[] = $conds;
} else {
$expressionsByField1[$value1][] = $conds;
}
}
$wrap = false;
$result = '';
foreach ( $expressionsByField1 as $value1 => $expressions ) {
if ( $result !== '' ) {
$result .= ' OR ';
$wrap = true;
}
$factored = $this->factorCondsWithCommonFields( $expressions );
$result .= "($field1 = " . $this->quoter->addQuotes( $value1 ) .
" AND $factored)";
}
if ( count( $nullExpressions ) ) {
$factored = $this->factorCondsWithCommonFields( $nullExpressions );
if ( $result !== '' ) {
$result .= ' OR ';
$wrap = true;
}
$result .= "($field1 IS NULL AND $factored)";
}
if ( $wrap ) {
return "($result)";
} else {
return $result;
}
}
/**
* @inheritDoc
* @stable to override
*/
public function buildConcat( $stringList ) {
return 'CONCAT(' . implode( ',', $stringList ) . ')';
}
public function limitResult( $sql, $limit, $offset = false ) {
if ( !is_numeric( $limit ) ) {
throw new DBLanguageError(
"Invalid non-numeric limit passed to " . __METHOD__
);
}
// This version works in MySQL and SQLite. It will very likely need to be
// overridden for most other RDBMS subclasses.
return "$sql LIMIT "
. ( ( is_numeric( $offset ) && $offset != 0 ) ? "{$offset}," : "" )
. "{$limit} ";
}
/**
* @stable to override
* @param string $s
* @param string $escapeChar
* @return string
*/
public function escapeLikeInternal( $s, $escapeChar = '`' ) {
return str_replace(
[ $escapeChar, '%', '_' ],
[ "{$escapeChar}{$escapeChar}", "{$escapeChar}%", "{$escapeChar}_" ],
$s
);
}
public function buildLike( $param, ...$params ) {
if ( is_array( $param ) ) {
$params = $param;
} else {
$params = func_get_args();
}
// @phan-suppress-next-line PhanParamTooFewUnpack
$likeValue = new LikeValue( ...$params );
return ' LIKE ' . $likeValue->toSql( $this->quoter );
}
public function anyChar() {
return new LikeMatch( '_' );
}
public function anyString() {
return new LikeMatch( '%' );
}
/**
* @inheritDoc
* @stable to override
*/
public function unionSupportsOrderAndLimit() {
return true; // True for almost every DB supported
}
public function unionQueries( $sqls, $all, $options = [] ) {
$glue = $all ? ') UNION ALL (' : ') UNION (';
$sql = '(' . implode( $glue, $sqls ) . ')';
if ( !$this->unionSupportsOrderAndLimit() ) {
return $sql;
}
$sql .= $this->makeOrderBy( $options );
$limit = $options['LIMIT'] ?? null;
$offset = $options['OFFSET'] ?? false;
if ( $limit !== null ) {
$sql = $this->limitResult( $sql, $limit, $offset );
}
return $sql;
}
public function conditional( $cond, $caseTrueExpression, $caseFalseExpression ) {
if ( is_array( $cond ) ) {
$cond = $this->makeList( $cond, self::LIST_AND );
}
if ( $cond instanceof IExpression ) {
$cond = $cond->toSql( $this->quoter );
}
return "(CASE WHEN $cond THEN $caseTrueExpression ELSE $caseFalseExpression END)";
}
public function strreplace( $orig, $old, $new ) {
return "REPLACE({$orig}, {$old}, {$new})";
}
public function timestamp( $ts = 0 ) {
$t = new ConvertibleTimestamp( $ts );
// Let errors bubble up to avoid putting garbage in the DB
return $t->getTimestamp( TS_MW );
}
public function timestampOrNull( $ts = null ) {
if ( $ts === null ) {
return null;
} else {
return $this->timestamp( $ts );
}
}
public function getInfinity() {
return 'infinity';
}
public function encodeExpiry( $expiry ) {
return ( $expiry == '' || $expiry == 'infinity' || $expiry == $this->getInfinity() )
? $this->getInfinity()
: $this->timestamp( $expiry );
}
public function decodeExpiry( $expiry, $format = TS_MW ) {
if ( $expiry == '' || $expiry == 'infinity' || $expiry == $this->getInfinity() ) {
return 'infinity';
}
return ConvertibleTimestamp::convert( $format, $expiry );
}
/**
* @inheritDoc
* @stable to override
*/
public function buildSubstring( $input, $startPosition, $length = null ) {
$this->assertBuildSubstringParams( $startPosition, $length );
$functionBody = "$input FROM $startPosition";
if ( $length !== null ) {
$functionBody .= " FOR $length";
}
return 'SUBSTRING(' . $functionBody . ')';
}
/**
* Check type and bounds for parameters to self::buildSubstring()
*
* All supported databases have substring functions that behave the same for
* positive $startPosition and non-negative $length, but behaviors differ when
* given negative $startPosition or negative $length. The simplest
* solution to that is to just forbid those values.
*
* @param int $startPosition
* @param int|null $length
* @since 1.31 in Database, moved to SQLPlatform in 1.39
*/
protected function assertBuildSubstringParams( $startPosition, $length ) {
if ( $startPosition === 0 ) {
// The DBMSs we support use 1-based indexing here.
throw new InvalidArgumentException( 'Use 1 as $startPosition for the beginning of the string' );
}
if ( !is_int( $startPosition ) || $startPosition < 0 ) {
throw new InvalidArgumentException(
'$startPosition must be a positive integer'
);
}
if ( !( ( is_int( $length ) && $length >= 0 ) || $length === null ) ) {
throw new InvalidArgumentException(
'$length must be null or an integer greater than or equal to 0'
);
}
}
public function buildStringCast( $field ) {
// In theory this should work for any standards-compliant
// SQL implementation, although it may not be the best way to do it.
return "CAST( $field AS CHARACTER )";
}
public function buildIntegerCast( $field ) {
return 'CAST( ' . $field . ' AS INTEGER )';
}
public function implicitOrderby() {
return true;
}
/**
* Allows for index remapping in queries where this is not consistent across DBMS
*
* TODO: Make it protected once all the code is moved over.
*
* @param string $index
* @return string
*/
public function indexName( $index ) {
return $this->indexAliases[$index] ?? $index;
}
public function setTableAliases( array $aliases ) {
$this->tableAliases = $aliases;
}
public function setIndexAliases( array $aliases ) {
$this->indexAliases = $aliases;
}
/**
* @return array[]
*/
public function getTableAliases() {
return $this->tableAliases;
}
public function setPrefix( $prefix ) {
$this->currentDomain = new DatabaseDomain(
$this->currentDomain->getDatabase(),
$this->currentDomain->getSchema(),
$prefix
);
}
public function setCurrentDomain( DatabaseDomain $currentDomain ) {
$this->currentDomain = $currentDomain;
}
public function selectSQLText(
$table, $vars, $conds = '', $fname = __METHOD__, $options = [], $join_conds = []
) {
if ( is_array( $table ) ) {
$tables = $table;
} elseif ( $table === '' || $table === null || $table === false ) {
$tables = [];
} elseif ( is_string( $table ) ) {
$tables = [ $table ];
} else {
throw new DBLanguageError( __METHOD__ . ' called with incorrect table parameter' );
}
if ( is_array( $vars ) ) {
$fields = implode( ',', $this->fieldNamesWithAlias( $vars ) );
} else {
$fields = $vars;
}
$options = (array)$options;
$useIndexByTable = $options['USE INDEX'] ?? [];
if ( !is_array( $useIndexByTable ) ) {
if ( count( $tables ) <= 1 ) {
$useIndexByTable = [ reset( $tables ) => $useIndexByTable ];
} else {
$e = new DBLanguageError( __METHOD__ . " got ambiguous USE INDEX ($fname)" );
( $this->errorLogger )( $e );
}
}
$ignoreIndexByTable = $options['IGNORE INDEX'] ?? [];
if ( !is_array( $ignoreIndexByTable ) ) {
if ( count( $tables ) <= 1 ) {
$ignoreIndexByTable = [ reset( $tables ) => $ignoreIndexByTable ];
} else {
$e = new DBLanguageError( __METHOD__ . " got ambiguous IGNORE INDEX ($fname)" );
( $this->errorLogger )( $e );
}
}
if (
$this->selectOptionsIncludeLocking( $options ) &&
$this->selectFieldsOrOptionsAggregate( $vars, $options )
) {
// Some DB types (e.g. postgres) disallow FOR UPDATE with aggregate
// functions. Discourage use of such queries to encourage compatibility.
$this->logger->warning(
__METHOD__ . ": aggregation used with a locking SELECT ($fname)"
);
}
if ( count( $tables ) ) {
$from = ' FROM ' . $this->tableNamesWithIndexClauseOrJOIN(
$tables,
$useIndexByTable,
$ignoreIndexByTable,
$join_conds
);
} else {
$from = '';
}
[ $startOpts, $preLimitTail, $postLimitTail ] = $this->makeSelectOptions( $options );
if ( is_array( $conds ) ) {
$where = $this->makeList( $conds, self::LIST_AND );
} elseif ( $conds instanceof IExpression ) {
$where = $conds->toSql( $this->quoter );
} elseif ( $conds === null || $conds === false ) {
$where = '';
$this->logger->warning(
__METHOD__
. ' called from '
. $fname
. ' with incorrect parameters: $conds must be a string or an array',
[ 'db_log_category' => 'sql' ]
);
} elseif ( is_string( $conds ) ) {
$where = $conds;
} else {
throw new DBLanguageError( __METHOD__ . ' called with incorrect parameters' );
}
// Keep historical extra spaces after FROM to avoid testing failures
if ( $where === '' || $where === '*' ) {
$sql = "SELECT $startOpts $fields $from $preLimitTail";
} else {
$sql = "SELECT $startOpts $fields $from WHERE $where $preLimitTail";
}
if ( isset( $options['LIMIT'] ) ) {
$sql = $this->limitResult( $sql, $options['LIMIT'], $options['OFFSET'] ?? false );
}
$sql = "$sql $postLimitTail";
if ( isset( $options['EXPLAIN'] ) ) {
$sql = 'EXPLAIN ' . $sql;
}
return $sql;
}
/**
* @param string|array $options
* @return bool
*/
private function selectOptionsIncludeLocking( $options ) {
$options = (array)$options;
foreach ( [ 'FOR UPDATE', 'LOCK IN SHARE MODE' ] as $lock ) {
if ( in_array( $lock, $options, true ) ) {
return true;
}
}
return false;
}
/**
* @param array|string $fields
* @param array|string $options
* @return bool
*/
private function selectFieldsOrOptionsAggregate( $fields, $options ) {
foreach ( (array)$options as $key => $value ) {
if ( is_string( $key ) ) {
if ( preg_match( '/^(?:GROUP BY|HAVING)$/i', $key ) ) {
return true;
}
} elseif ( is_string( $value ) ) {
if ( preg_match( '/^(?:DISTINCT|DISTINCTROW)$/i', $value ) ) {
return true;
}
}
}
$regex = '/^(?:COUNT|MIN|MAX|SUM|GROUP_CONCAT|LISTAGG|ARRAY_AGG)\s*\\(/i';
foreach ( (array)$fields as $field ) {
if ( is_string( $field ) && preg_match( $regex, $field ) ) {
return true;
}
}
return false;
}
/**
* Gets an array of aliased field names
*
* @param array $fields [ [alias] => field ]
* @return string[] See fieldNameWithAlias()
*/
protected function fieldNamesWithAlias( $fields ) {
$retval = [];
foreach ( $fields as $alias => $field ) {
if ( is_numeric( $alias ) ) {
$alias = $field;
}
$retval[] = $this->fieldNameWithAlias( $field, $alias );
}
return $retval;
}
/**
* Get an aliased field name
* e.g. fieldName AS newFieldName
*
* @stable to override
* @param string $name Field name
* @param string|false $alias Alias (optional)
* @return string SQL name for aliased field. Will not alias a field to its own name
*/
public function fieldNameWithAlias( $name, $alias = false ) {
if ( !$alias || (string)$alias === (string)$name ) {
return $name;
} else {
return $name . ' AS ' . $this->addIdentifierQuotes( $alias ); // PostgreSQL needs AS
}
}
/**
* Get the aliased table name clause for a FROM clause
* which might have a JOIN and/or USE INDEX or IGNORE INDEX clause
*
* @param array $tables ( [alias] => table )
* @param array $use_index Same as for select()
* @param array $ignore_index Same as for select()
* @param array $join_conds Same as for select()
* @return string
*/
protected function tableNamesWithIndexClauseOrJOIN(
$tables,
$use_index = [],
$ignore_index = [],
$join_conds = []
) {
$ret = [];
$retJOIN = [];
$use_index = (array)$use_index;
$ignore_index = (array)$ignore_index;
$join_conds = (array)$join_conds;
foreach ( $tables as $alias => $table ) {
if ( !is_string( $alias ) ) {
// No alias? Set it equal to the table name
$alias = $table;
}
if ( is_array( $table ) ) {
// A parenthesized group
if ( count( $table ) > 1 ) {
$joinedTable = '(' .
$this->tableNamesWithIndexClauseOrJOIN(
$table, $use_index, $ignore_index, $join_conds ) . ')';
} else {
// Degenerate case
$innerTable = reset( $table );
$innerAlias = key( $table );
$joinedTable = $this->tableNameWithAlias(
$innerTable,
is_string( $innerAlias ) ? $innerAlias : $innerTable
);
}
} else {
$joinedTable = $this->tableNameWithAlias( $table, $alias );
}
// Is there a JOIN clause for this table?
if ( isset( $join_conds[$alias] ) ) {
Assert::parameterType( 'array', $join_conds[$alias], "join_conds[$alias]" );
[ $joinType, $conds ] = $join_conds[$alias];
$tableClause = $this->normalizeJoinType( $joinType );
$tableClause .= ' ' . $joinedTable;
if ( isset( $use_index[$alias] ) ) { // has USE INDEX?
$use = $this->useIndexClause( implode( ',', (array)$use_index[$alias] ) );
if ( $use != '' ) {
$tableClause .= ' ' . $use;
}
}
if ( isset( $ignore_index[$alias] ) ) { // has IGNORE INDEX?
$ignore = $this->ignoreIndexClause(
implode( ',', (array)$ignore_index[$alias] ) );
if ( $ignore != '' ) {
$tableClause .= ' ' . $ignore;
}
}
$on = $this->makeList( (array)$conds, self::LIST_AND );
if ( $on != '' ) {
$tableClause .= ' ON (' . $on . ')';
}
$retJOIN[] = $tableClause;
} elseif ( isset( $use_index[$alias] ) ) {
// Is there an INDEX clause for this table?
$tableClause = $joinedTable;
$tableClause .= ' ' . $this->useIndexClause(
implode( ',', (array)$use_index[$alias] )
);
$ret[] = $tableClause;
} elseif ( isset( $ignore_index[$alias] ) ) {
// Is there an INDEX clause for this table?
$tableClause = $joinedTable;
$tableClause .= ' ' . $this->ignoreIndexClause(
implode( ',', (array)$ignore_index[$alias] )
);
$ret[] = $tableClause;
} else {
$tableClause = $joinedTable;
$ret[] = $tableClause;
}
}
// We can't separate explicit JOIN clauses with ',', use ' ' for those
$implicitJoins = implode( ',', $ret );
$explicitJoins = implode( ' ', $retJOIN );
// Compile our final table clause
return implode( ' ', [ $implicitJoins, $explicitJoins ] );
}
/**
* Validate and normalize a join type
*
* Subclasses may override this to add supported join types.
*
* @param string $joinType
* @return string
*/
protected function normalizeJoinType( string $joinType ) {
switch ( strtoupper( $joinType ) ) {
case 'JOIN':
case 'INNER JOIN':
return 'JOIN';
case 'LEFT JOIN':
return 'LEFT JOIN';
case 'STRAIGHT_JOIN':
case 'STRAIGHT JOIN':
// MySQL only
return 'JOIN';
default:
return $joinType;
}
}
/**
* Get an aliased table name
*
* This returns strings like "tableName AS newTableName" for aliased tables
* and "(SELECT * from tableA) newTablename" for subqueries (e.g. derived tables)
*
* @see Database::tableName()
* @param string|Subquery $table Table name or object with a 'sql' field
* @param string|false $alias Table alias (optional)
* @return string SQL name for aliased table. Will not alias a table to its own name
*/
protected function tableNameWithAlias( $table, $alias = false ) {
if ( is_string( $table ) ) {
$quotedTable = $this->tableName( $table );
} elseif ( $table instanceof Subquery ) {
$quotedTable = (string)$table;
} else {
throw new InvalidArgumentException( "Table must be a string or Subquery" );
}
if ( $alias === false || $alias === $table ) {
if ( $table instanceof Subquery ) {
throw new InvalidArgumentException( "Subquery table missing alias" );
}
return $quotedTable;
} else {
return $quotedTable . ' ' . $this->addIdentifierQuotes( $alias );
}
}
public function tableName( string $name, $format = 'quoted' ) {
// Extract necessary database, schema, table identifiers and quote them as needed
$formattedComponents = [];
foreach ( $this->qualifiedTableComponents( $name ) as $component ) {
if ( $format === 'quoted' ) {
$formattedComponents[] = $this->addIdentifierQuotes( $component );
} else {
$formattedComponents[] = $component;
}
}
return implode( '.', $formattedComponents );
}
/**
* Get the table components needed for a query given the currently selected database/schema
*
* The resulting array will take one of the follow forms:
* - <table identifier>
* - <database identifier>.<table identifier> (e.g. non-Postgres)
* - <schema identifier>.<table identifier> (e.g. Postgres-only)
* - <database identifier>.<schema identifier>.<table identifier> (e.g. Postgres-only)
*
* If the provided table name only consists of an unquoted table identifier that has an
* entry in ({@link getTableAliases()}), then, the resulting components will be determined
* from the alias configuration. If such alias configuration does not specify the table
* prefix, then the current DB domain prefix will be prepended to the table identifier.
*
* In all other cases where the provided table name only consists of an unquoted table
* identifier, the current DB domain prefix will be prepended to the table identifier.
*
* Empty database/schema identifiers are omitted from the resulting array.
*
* @param string $name Table name as database.schema.table, database.table, or table
* @return string[] Non-empty list of unquoted identifiers that form the qualified table name
*/
public function qualifiedTableComponents( $name ) {
$identifiers = $this->extractTableNameComponents( $name );
if ( count( $identifiers ) > 3 ) {
throw new DBLanguageError( "Too many components in table name '$name'" );
}
// Table alias config and prefixes only apply to unquoted single-identifier names
if ( count( $identifiers ) == 1 && !$this->isQuotedIdentifier( $identifiers[0] ) ) {
[ $table ] = $identifiers;
if ( isset( $this->tableAliases[$table] ) ) {
// This is an "alias" table that uses a different db/schema/prefix scheme
$database = $this->tableAliases[$table]['dbname'];
$schema = is_string( $this->tableAliases[$table]['schema'] )
? $this->tableAliases[$table]['schema']
: $this->relationSchemaQualifier();
$prefix = is_string( $this->tableAliases[$table]['prefix'] )
? $this->tableAliases[$table]['prefix']
: $this->currentDomain->getTablePrefix();
} else {
// Use the current database domain to resolve the schema and prefix
$database = '';
$schema = $this->relationSchemaQualifier();
$prefix = $this->currentDomain->getTablePrefix();
}
$qualifierIdentifiers = [ $database, $schema ];
$tableIdentifier = $prefix . $table;
} else {
$qualifierIdentifiers = array_slice( $identifiers, 0, -1 );
$tableIdentifier = end( $identifiers );
}
$components = [];
foreach ( $qualifierIdentifiers as $identifier ) {
if ( $identifier !== null && $identifier !== '' ) {
$components[] = $this->isQuotedIdentifier( $identifier )
? substr( $identifier, 1, -1 )
: $identifier;
}
}
$components[] = $this->isQuotedIdentifier( $tableIdentifier )
? substr( $tableIdentifier, 1, -1 )
: $tableIdentifier;
return $components;
}
/**
* Extract the dot-separated components of a table name, preserving identifier quotation
*
* @param string $name Table name, possible qualified with db or db+schema
* @return string[] Non-empty list of the identifiers included in the provided table name
*/
public function extractTableNameComponents( string $name ) {
$quoteChar = $this->getIdentifierQuoteChar();
$components = [];
foreach ( explode( '.', $name ) as $component ) {
if ( $this->isQuotedIdentifier( $component ) ) {
$unquotedComponent = substr( $component, 1, -1 );
} else {
$unquotedComponent = $component;
}
if ( str_contains( $unquotedComponent, $quoteChar ) ) {
throw new DBLanguageError(
'Table name component contains unexpected quote or dot character' );
}
$components[] = $component;
}
return $components;
}
/**
* @stable to override
* @return string|null Schema to use to qualify relations in queries
*/
protected function relationSchemaQualifier() {
return $this->currentDomain->getSchema();
}
public function tableNames( ...$tables ) {
wfDeprecated( __METHOD__, '1.39' );
$retVal = [];
foreach ( $tables as $name ) {
$retVal[$name] = $this->tableName( $name );
}
return $retVal;
}
public function tableNamesN( ...$tables ) {
$retVal = [];
foreach ( $tables as $name ) {
$retVal[] = $this->tableName( $name );
}
return $retVal;
}
/**
* Returns if the given identifier looks quoted or not according to
* the database convention for quoting identifiers
*
* @note Do not use this to determine if untrusted input is safe.
* A malicious user can trick this function.
* @param string $name
* @return bool
*/
public function isQuotedIdentifier( $name ) {
$quoteChar = $this->getIdentifierQuoteChar();
return strlen( $name ) > 1 && $name[0] === $quoteChar && $name[-1] === $quoteChar;
}
/**
* USE INDEX clause.
*
* This can be used as optimisation in queries that affect tables with multiple
* indexes if the database does not pick the most optimal one by default.
* The "right" index might vary between database backends and versions thereof,
* as such in practice this is biased toward specifically improving performance
* of large wiki farms that use MySQL or MariaDB (like Wikipedia).
*
* @stable to override
* @param string $index
* @return string
*/
public function useIndexClause( $index ) {
return '';
}
/**
* IGNORE INDEX clause.
*
* The inverse of Database::useIndexClause.
*
* @param string $index
* @return string
*/
public function ignoreIndexClause( $index ) {
return '';
}
/**
* Returns an optional USE INDEX clause to go after the table, and a
* string to go at the end of the query.
*
* @see Database::select()
*
* @stable to override
* @param array $options Associative array of options to be turned into
* an SQL query, valid keys are listed in the function.
* @return string[] (START OPTIONS, PRE-LIMIT TAIL, POST-LIMIT TAIL)
*/
protected function makeSelectOptions( array $options ) {
$preLimitTail = $postLimitTail = '';
$startOpts = '';
$noKeyOptions = [];
foreach ( $options as $key => $option ) {
if ( is_numeric( $key ) ) {
$noKeyOptions[$option] = true;
}
}
$preLimitTail .= $this->makeGroupByWithHaving( $options );
$preLimitTail .= $this->makeOrderBy( $options );
if ( isset( $noKeyOptions['FOR UPDATE'] ) ) {
$postLimitTail .= ' FOR UPDATE';
}
if ( isset( $noKeyOptions['LOCK IN SHARE MODE'] ) ) {
$postLimitTail .= ' LOCK IN SHARE MODE';
}
if ( isset( $noKeyOptions['DISTINCT'] ) || isset( $noKeyOptions['DISTINCTROW'] ) ) {
$startOpts .= 'DISTINCT';
}
# Various MySQL extensions
if ( isset( $noKeyOptions['STRAIGHT_JOIN'] ) ) {
$startOpts .= ' /*! STRAIGHT_JOIN */';
}
if ( isset( $noKeyOptions['SQL_BIG_RESULT'] ) ) {
$startOpts .= ' SQL_BIG_RESULT';
}
if ( isset( $noKeyOptions['SQL_BUFFER_RESULT'] ) ) {
$startOpts .= ' SQL_BUFFER_RESULT';
}
if ( isset( $noKeyOptions['SQL_SMALL_RESULT'] ) ) {
$startOpts .= ' SQL_SMALL_RESULT';
}
if ( isset( $noKeyOptions['SQL_CALC_FOUND_ROWS'] ) ) {
$startOpts .= ' SQL_CALC_FOUND_ROWS';
}
return [ $startOpts, $preLimitTail, $postLimitTail ];
}
/**
* Returns an optional GROUP BY with an optional HAVING
*
* @param array $options Associative array of options
* @return string
* @see Database::select()
* @since 1.21
*/
protected function makeGroupByWithHaving( $options ) {
$sql = '';
if ( isset( $options['GROUP BY'] ) ) {
$gb = is_array( $options['GROUP BY'] )
? implode( ',', $options['GROUP BY'] )
: $options['GROUP BY'];
$sql .= ' GROUP BY ' . $gb;
}
if ( isset( $options['HAVING'] ) ) {
$having = is_array( $options['HAVING'] )
? $this->makeList( $options['HAVING'], self::LIST_AND )
: $options['HAVING'];
$sql .= ' HAVING ' . $having;
}
return $sql;
}
/**
* Returns an optional ORDER BY
*
* @param array $options Associative array of options
* @return string
* @see Database::select()
* @since 1.21
*/
protected function makeOrderBy( $options ) {
if ( isset( $options['ORDER BY'] ) ) {
$ob = is_array( $options['ORDER BY'] )
? implode( ',', $options['ORDER BY'] )
: $options['ORDER BY'];
return ' ORDER BY ' . $ob;
}
return '';
}
public function buildGroupConcatField(
$delim, $table, $field, $conds = '', $join_conds = []
) {
$fld = "GROUP_CONCAT($field SEPARATOR " . $this->quoter->addQuotes( $delim ) . ')';
return '(' . $this->selectSQLText( $table, $fld, $conds, __METHOD__, [], $join_conds ) . ')';
}
public function buildSelectSubquery(
$table, $vars, $conds = '', $fname = __METHOD__,
$options = [], $join_conds = []
) {
return new Subquery(
$this->selectSQLText( $table, $vars, $conds, $fname, $options, $join_conds )
);
}
public function insertSqlText( $table, array $rows ) {
$encTable = $this->tableName( $table );
[ $sqlColumns, $sqlTuples ] = $this->makeInsertLists( $rows );
return [
"INSERT INTO $encTable ($sqlColumns) VALUES $sqlTuples",
"INSERT INTO $encTable ($sqlColumns) VALUES '?'"
];
}
/**
* Make SQL lists of columns, row tuples, and column aliases for INSERT/VALUES expressions
*
* The tuple column order is that of the columns of the first provided row.
* The provided rows must have exactly the same keys and ordering thereof.
*
* @param array[] $rows Non-empty list of (column => value) maps
* @param string $aliasPrefix Optional prefix to prepend to the magic alias names
* @param string[] $typeByColumn Optional map of (column => data type)
* @return array (comma-separated columns, comma-separated tuples, comma-separated aliases)
* @since 1.35
*/
public function makeInsertLists( array $rows, $aliasPrefix = '', array $typeByColumn = [] ) {
$firstRow = $rows[0];
if ( !is_array( $firstRow ) || !$firstRow ) {
throw new DBLanguageError( 'Got an empty row list or empty row' );
}
// List of columns that define the value tuple ordering
$tupleColumns = array_keys( $firstRow );
$valueTuples = [];
foreach ( $rows as $row ) {
$rowColumns = array_keys( $row );
// VALUES(...) requires a uniform correspondence of (column => value)
if ( $rowColumns !== $tupleColumns ) {
throw new DBLanguageError(
'Got row columns (' . implode( ', ', $rowColumns ) . ') ' .
'instead of expected (' . implode( ', ', $tupleColumns ) . ')'
);
}
// Make the value tuple that defines this row
$valueTuples[] = '(' . $this->makeList( $row, self::LIST_COMMA ) . ')';
}
$magicAliasFields = [];
foreach ( $tupleColumns as $column ) {
$magicAliasFields[] = $aliasPrefix . $column;
}
return [
$this->makeList( $tupleColumns, self::LIST_NAMES ),
implode( ',', $valueTuples ),
$this->makeList( $magicAliasFields, self::LIST_NAMES )
];
}
public function insertNonConflictingSqlText( $table, array $rows ) {
$encTable = $this->tableName( $table );
[ $sqlColumns, $sqlTuples ] = $this->makeInsertLists( $rows );
[ $sqlVerb, $sqlOpts ] = $this->makeInsertNonConflictingVerbAndOptions();
return [
rtrim( "$sqlVerb $encTable ($sqlColumns) VALUES $sqlTuples $sqlOpts" ),
rtrim( "$sqlVerb $encTable ($sqlColumns) VALUES '?' $sqlOpts" )
];
}
/**
* @stable to override
* @return string[] ("INSERT"-style SQL verb, "ON CONFLICT"-style clause or "")
* @since 1.35
*/
protected function makeInsertNonConflictingVerbAndOptions() {
return [ 'INSERT IGNORE INTO', '' ];
}
public function insertSelectNativeSqlText(
$destTable,
$srcTable,
array $varMap,
$conds,
$fname,
array $insertOptions,
array $selectOptions,
$selectJoinConds
) {
[ $sqlVerb, $sqlOpts ] = $this->isFlagInOptions( 'IGNORE', $insertOptions )
? $this->makeInsertNonConflictingVerbAndOptions()
: [ 'INSERT INTO', '' ];
$encDstTable = $this->tableName( $destTable );
$sqlDstColumns = implode( ',', array_keys( $varMap ) );
$selectSql = $this->selectSQLText(
$srcTable,
array_values( $varMap ),
$conds,
$fname,
$selectOptions,
$selectJoinConds
);
return rtrim( "$sqlVerb $encDstTable ($sqlDstColumns) $selectSql $sqlOpts" );
}
/**
* @param string $option Query option flag (e.g. "IGNORE" or "FOR UPDATE")
* @param array $options Combination option/value map and boolean option list
* @return bool Whether the option appears as an integer-keyed value in the options
* @since 1.35
*/
public function isFlagInOptions( $option, array $options ) {
foreach ( array_keys( $options, $option, true ) as $k ) {
if ( is_int( $k ) ) {
return true;
}
}
return false;
}
/**
* Build an SQL condition to find rows with matching key values to those in $rows.
*
* @param array[] $rows Non-empty list of rows
* @param string[] $uniqueKey List of columns that define a single unique index
* @return string
*/
public function makeKeyCollisionCondition( array $rows, array $uniqueKey ) {
if ( !$rows ) {
throw new DBLanguageError( "Empty row array" );
} elseif ( !$uniqueKey ) {
throw new DBLanguageError( "Empty unique key array" );
}
if ( count( $uniqueKey ) == 1 ) {
// Use a simple IN(...) clause
$column = reset( $uniqueKey );
$values = array_column( $rows, $column );
if ( count( $values ) !== count( $rows ) ) {
throw new DBLanguageError( "Missing values for unique key ($column)" );
}
return $this->makeList( [ $column => $values ], self::LIST_AND );
}
$nullByUniqueKeyColumn = array_fill_keys( $uniqueKey, null );
$orConds = [];
foreach ( $rows as $row ) {
$rowKeyMap = array_intersect_key( $row, $nullByUniqueKeyColumn );
if ( count( $rowKeyMap ) != count( $uniqueKey ) ) {
throw new DBLanguageError(
"Missing values for unique key (" . implode( ',', $uniqueKey ) . ")"
);
}
$orConds[] = $this->makeList( $rowKeyMap, self::LIST_AND );
}
return count( $orConds ) > 1
? $this->makeList( $orConds, self::LIST_OR )
: $orConds[0];
}
public function deleteJoinSqlText( $delTable, $joinTable, $delVar, $joinVar, $conds ) {
if ( !$conds ) {
throw new DBLanguageError( __METHOD__ . ' called with empty $conds' );
}
$delTable = $this->tableName( $delTable );
$joinTable = $this->tableName( $joinTable );
$sql = "DELETE FROM $delTable WHERE $delVar IN (SELECT $joinVar FROM $joinTable ";
if ( $conds != '*' ) {
$sql .= 'WHERE ' . $this->makeList( $conds, self::LIST_AND );
}
$sql .= ')';
return $sql;
}
/**
* @param string $table
* @param string|array $conds
* @return Query
*/
public function deleteSqlText( $table, $conds ) {
$isCondValid = ( is_string( $conds ) || is_array( $conds ) ) && $conds;
if ( !$isCondValid ) {
throw new DBLanguageError( __METHOD__ . ' called with empty conditions' );
}
$encTable = $this->tableName( $table );
$sql = "DELETE FROM $encTable";
$condsSql = '';
$cleanCondsSql = '';
if ( $conds !== self::ALL_ROWS && $conds !== [ self::ALL_ROWS ] ) {
$cleanCondsSql = ' WHERE ' . $this->scrubArray( $conds );
if ( is_array( $conds ) ) {
$conds = $this->makeList( $conds, self::LIST_AND );
}
$condsSql .= ' WHERE ' . $conds;
}
return new Query(
$sql . $condsSql,
self::QUERY_CHANGE_ROWS,
'DELETE',
$table,
$sql . $cleanCondsSql
);
}
private function scrubArray( $array, $listType = self::LIST_AND ) {
if ( is_array( $array ) ) {
$scrubbedArray = [];
foreach ( $array as $key => $value ) {
if ( $value instanceof IExpression ) {
$scrubbedArray[$key] = $value->toGeneralizedSql();
} else {
$scrubbedArray[$key] = '?';
}
}
return $this->makeList( $scrubbedArray, $listType );
}
return '?';
}
public function updateSqlText( $table, $set, $conds, $options ) {
$isCondValid = ( is_string( $conds ) || is_array( $conds ) ) && $conds;
if ( !$isCondValid ) {
throw new DBLanguageError( __METHOD__ . ' called with empty conditions' );
}
$encTable = $this->tableName( $table );
$opts = $this->makeUpdateOptions( $options );
$sql = "UPDATE $opts $encTable";
$condsSql = " SET " . $this->makeList( $set, self::LIST_SET );
$cleanCondsSql = " SET " . $this->scrubArray( $set, self::LIST_SET );
if ( $conds && $conds !== self::ALL_ROWS && $conds !== [ self::ALL_ROWS ] ) {
$cleanCondsSql .= ' WHERE ' . $this->scrubArray( $conds );
if ( is_array( $conds ) ) {
$conds = $this->makeList( $conds, self::LIST_AND );
}
$condsSql .= ' WHERE ' . $conds;
}
return new Query(
$sql . $condsSql,
self::QUERY_CHANGE_ROWS,
'UPDATE',
$table,
$sql . $cleanCondsSql
);
}
/**
* Make UPDATE options for the Database::update function
*
* @param array $options The options passed to Database::update
* @return string
*/
protected function makeUpdateOptions( $options ) {
$opts = $this->makeUpdateOptionsArray( $options );
return implode( ' ', $opts );
}
/**
* Make UPDATE options array for Database::makeUpdateOptions
*
* @stable to override
* @param array $options
* @return array
*/
protected function makeUpdateOptionsArray( $options ) {
$options = $this->normalizeOptions( $options );
$opts = [];
if ( in_array( 'IGNORE', $options ) ) {
$opts[] = 'IGNORE';
}
return $opts;
}
/**
* @param string|array $options
* @return array Combination option/value map and boolean option list
* @since 1.35, moved to SQLPlatform in 1.39
*/
final public function normalizeOptions( $options ) {
if ( is_array( $options ) ) {
return $options;
} elseif ( is_string( $options ) ) {
return ( $options === '' ) ? [] : [ $options ];
} else {
throw new DBLanguageError( __METHOD__ . ': expected string or array' );
}
}
public function dropTableSqlText( $table ) {
// https://mariadb.com/kb/en/drop-table/
// https://dev.mysql.com/doc/refman/8.0/en/drop-table.html
// https://www.postgresql.org/docs/9.2/sql-truncate.html
return "DROP TABLE " . $this->tableName( $table ) . " CASCADE";
}
/**
* @param string $sql SQL query
* @return string|null
* @deprecated Since 1.42
*/
public function getQueryVerb( $sql ) {
wfDeprecated( __METHOD__, '1.42' );
return QueryBuilderFromRawSql::buildQuery( $sql, 0 )->getVerb();
}
/**
* Determine whether a SQL statement is sensitive to isolation level.
*
* A SQL statement is considered transactable if its result could vary
* depending on the transaction isolation level. Operational commands
* such as 'SET' and 'SHOW' are not considered to be transactable.
*
* Main purpose: Used by query() to decide whether to begin a transaction
* before the current query (in DBO_TRX mode, on by default).
*
* @return bool
*/
public function isTransactableQuery( Query $sql ) {
return !in_array(
$sql->getVerb(),
[
'BEGIN',
'ROLLBACK',
'ROLLBACK TO SAVEPOINT',
'COMMIT',
'SET',
'SHOW',
'CREATE',
'ALTER',
'USE',
'SHOW'
],
true
);
}
public function buildExcludedValue( $column ) {
/* @see Database::upsert() */
// This can be treated like a single value since __VALS is a single row table
return "(SELECT __$column FROM __VALS)";
}
public function savepointSqlText( $identifier ) {
return 'SAVEPOINT ' . $this->addIdentifierQuotes( $identifier );
}
public function releaseSavepointSqlText( $identifier ) {
return 'RELEASE SAVEPOINT ' . $this->addIdentifierQuotes( $identifier );
}
public function rollbackToSavepointSqlText( $identifier ) {
return 'ROLLBACK TO SAVEPOINT ' . $this->addIdentifierQuotes( $identifier );
}
public function rollbackSqlText() {
return 'ROLLBACK';
}
public function dispatchingInsertSqlText( $table, $rows, $options ) {
$rows = $this->normalizeRowArray( $rows );
if ( !$rows ) {
return false;
}
$options = $this->normalizeOptions( $options );
if ( $this->isFlagInOptions( 'IGNORE', $options ) ) {
[ $sql, $cleanSql ] = $this->insertNonConflictingSqlText( $table, $rows );
} else {
[ $sql, $cleanSql ] = $this->insertSqlText( $table, $rows );
}
return new Query( $sql, self::QUERY_CHANGE_ROWS, 'INSERT', $table, $cleanSql );
}
/**
* @param array $rowOrRows A single (field => value) map or a list of such maps
* @return array[] List of (field => value) maps
* @since 1.35
*/
final protected function normalizeRowArray( array $rowOrRows ) {
if ( !$rowOrRows ) {
$rows = [];
} elseif ( isset( $rowOrRows[0] ) ) {
$rows = $rowOrRows;
} else {
$rows = [ $rowOrRows ];
}
foreach ( $rows as $row ) {
if ( !is_array( $row ) ) {
throw new DBLanguageError( "Got non-array in row array" );
} elseif ( !$row ) {
throw new DBLanguageError( "Got empty array in row array" );
}
}
return $rows;
}
/**
* Validate and normalize parameters to upsert() or replace()
*
* @param string|string[]|string[][] $uniqueKeys Unique indexes (only one is allowed)
* @param array[] &$rows The row array, which will be replaced with a normalized version.
* @return string[] List of columns that defines a single unique index
* @since 1.35
*/
final public function normalizeUpsertParams( $uniqueKeys, &$rows ) {
$rows = $this->normalizeRowArray( $rows );
if ( !$uniqueKeys ) {
throw new DBLanguageError( 'No unique key specified for upsert/replace' );
}
$uniqueKey = $this->normalizeUpsertKeys( $uniqueKeys );
$this->assertValidUpsertRowArray( $rows, $uniqueKey );
return $uniqueKey;
}
/**
* @param array|string $conds
* @param string $fname
* @return array
* @since 1.31
*/
final public function normalizeConditions( $conds, $fname ) {
if ( $conds === null || $conds === false ) {
$this->logger->warning(
__METHOD__
. ' called from '
. $fname
. ' with incorrect parameters: $conds must be a string or an array',
[ 'db_log_category' => 'sql' ]
);
return [];
} elseif ( $conds === '' ) {
return [];
}
return is_array( $conds ) ? $conds : [ $conds ];
}
/**
* @param string|string[]|string[][] $uniqueKeys Unique indexes (only one is allowed)
* @return string[] List of columns that defines a single unique index
* @since 1.35
*/
private function normalizeUpsertKeys( $uniqueKeys ) {
if ( is_string( $uniqueKeys ) ) {
return [ $uniqueKeys ];
} elseif ( !is_array( $uniqueKeys ) ) {
throw new DBLanguageError( 'Invalid unique key array' );
} else {
if ( count( $uniqueKeys ) !== 1 || !isset( $uniqueKeys[0] ) ) {
throw new DBLanguageError(
"The unique key array should contain a single unique index" );
}
$uniqueKey = $uniqueKeys[0];
if ( is_string( $uniqueKey ) ) {
// Passing a list of strings for single-column unique keys is too
// easily confused with passing the columns of composite unique key
$this->logger->warning( __METHOD__ .
" called with deprecated parameter style: " .
"the unique key array should be a string or array of string arrays",
[
'exception' => new RuntimeException(),
'db_log_category' => 'sql',
] );
return $uniqueKeys;
} elseif ( is_array( $uniqueKey ) ) {
return $uniqueKey;
} else {
throw new DBLanguageError( 'Invalid unique key array entry' );
}
}
}
/**
* @param array<int,array> $rows Normalized list of rows to insert
* @param string[] $uniqueKey Columns of the unique key to UPSERT upon
* @since 1.37
*/
final protected function assertValidUpsertRowArray( array $rows, array $uniqueKey ) {
foreach ( $rows as $row ) {
foreach ( $uniqueKey as $column ) {
if ( !isset( $row[$column] ) ) {
throw new DBLanguageError(
"NULL/absent values for unique key (" . implode( ',', $uniqueKey ) . ")"
);
}
}
}
}
/**
* @param array $set Combined column/literal assignment map and SQL assignment list
* @param string[] $uniqueKey Columns of the unique key to UPSERT upon
* @param array<int,array> $rows List of rows to upsert
* @since 1.37
*/
final public function assertValidUpsertSetArray(
array $set,
array $uniqueKey,
array $rows
) {
if ( !$set ) {
throw new DBLanguageError( "Update assignment list can't be empty for upsert" );
}
// Sloppy callers might construct the SET array using the ROW array, leaving redundant
// column definitions for unique key columns. Detect this for backwards compatibility.
$soleRow = ( count( $rows ) == 1 ) ? reset( $rows ) : null;
// Disallow value changes for any columns in the unique key. This avoids additional
// insertion order dependencies that are unwieldy and difficult to implement efficiently
// in PostgreSQL.
foreach ( $set as $k => $v ) {
if ( is_string( $k ) ) {
// Key is a column name and value is a literal (e.g. string, int, null, ...)
if ( in_array( $k, $uniqueKey, true ) ) {
if ( $soleRow && array_key_exists( $k, $soleRow ) && $soleRow[$k] === $v ) {
$this->logger->warning(
__METHOD__ . " called with redundant assignment to column '$k'",
[
'exception' => new RuntimeException(),
'db_log_category' => 'sql',
]
);
} else {
throw new DBLanguageError(
"Cannot reassign column '$k' since it belongs to the provided unique key"
);
}
}
} elseif ( preg_match( '/^([a-zA-Z0-9_]+)\s*=/', $v, $m ) ) {
// Value is of the form "<unquoted alphanumeric column> = <SQL expression>"
if ( in_array( $m[1], $uniqueKey, true ) ) {
throw new DBLanguageError(
"Cannot reassign column '{$m[1]}' since it belongs to the provided unique key"
);
}
}
}
}
/**
* @param array|string $var Field parameter in the style of select()
* @return string|null Column name or null; ignores aliases
*/
final public function extractSingleFieldFromList( $var ) {
if ( is_array( $var ) ) {
if ( !$var ) {
$column = null;
} elseif ( count( $var ) == 1 ) {
$column = $var[0] ?? reset( $var );
} else {
throw new DBLanguageError( __METHOD__ . ': got multiple columns' );
}
} else {
$column = $var;
}
return $column;
}
public function setSchemaVars( $vars ) {
$this->schemaVars = is_array( $vars ) ? $vars : null;
}
/**
* Get schema variables. If none have been set via setSchemaVars(), then
* use some defaults from the current object.
*
* @return array
*/
protected function getSchemaVars() {
return $this->schemaVars ?? $this->getDefaultSchemaVars();
}
/**
* Get schema variables to use if none have been set via setSchemaVars().
*
* Override this in derived classes to provide variables for tables.sql
* and SQL patch files.
*
* @stable to override
* @return array
*/
protected function getDefaultSchemaVars() {
return [];
}
/**
* Database-independent variable replacement. Replaces a set of variables
* in an SQL statement with their contents as given by $this->getSchemaVars().
*
* Supports '{$var}' `{$var}` and / *$var* / (without the spaces) style variables.
*
* - '{$var}' should be used for text and is passed through the database's
* addQuotes method.
* - `{$var}` should be used for identifiers (e.g. table and database names).
* It is passed through the database's addIdentifierQuotes method which
* can be overridden if the database uses something other than backticks.
* - / *_* / or / *$wgDBprefix* / passes the name that follows through the
* database's tableName method.
* - / *i* / passes the name that follows through the database's indexName method.
* - In all other cases, / *$var* / is left unencoded. Except for table options,
* its use should be avoided. In 1.24 and older, string encoding was applied.
*
* @param string $ins SQL statement to replace variables in
* @return string The new SQL statement with variables replaced
*/
public function replaceVars( $ins ) {
$vars = $this->getSchemaVars();
return preg_replace_callback(
'!
/\* (\$wgDBprefix|[_i]) \*/ (\w*) | # 1-2. tableName, indexName
\'\{\$ (\w+) }\' | # 3. addQuotes
`\{\$ (\w+) }` | # 4. addIdentifierQuotes
/\*\$ (\w+) \*/ # 5. leave unencoded
!x',
function ( $m ) use ( $vars ) {
// Note: Because of <https://bugs.php.net/bug.php?id=51881>,
// check for both nonexistent keys *and* the empty string.
if ( isset( $m[1] ) && $m[1] !== '' ) {
if ( $m[1] === 'i' ) {
return $this->indexName( $m[2] );
} else {
return $this->tableName( $m[2] );
}
} elseif ( isset( $m[3] ) && $m[3] !== '' && array_key_exists( $m[3], $vars ) ) {
return $this->quoter->addQuotes( $vars[$m[3]] );
} elseif ( isset( $m[4] ) && $m[4] !== '' && array_key_exists( $m[4], $vars ) ) {
return $this->addIdentifierQuotes( $vars[$m[4]] );
} elseif ( isset( $m[5] ) && $m[5] !== '' && array_key_exists( $m[5], $vars ) ) {
return $vars[$m[5]];
} else {
return $m[0];
}
},
$ins
);
}
public function lockSQLText( $lockName, $timeout ) {
throw new RuntimeException( 'locking must be implemented in subclasses' );
}
public function lockIsFreeSQLText( $lockName ) {
throw new RuntimeException( 'locking must be implemented in subclasses' );
}
public function unlockSQLText( $lockName ) {
throw new RuntimeException( 'locking must be implemented in subclasses' );
}
}