modules/datastore/src/DataDictionary/AlterTableQuery/MySQLQuery.php
<?php
namespace Drupal\datastore\DataDictionary\AlterTableQuery;
use Drupal\Core\Database\StatementInterface;
use Drupal\datastore\Plugin\QueueWorker\ImportJob;
use Drupal\datastore\DataDictionary\AlterTableQueryBase;
use Drupal\datastore\DataDictionary\AlterTableQueryInterface;
use Drupal\datastore\DataDictionary\IncompatibleTypeException;
/**
* MySQL table alter query.
*/
class MySQLQuery extends AlterTableQueryBase implements AlterTableQueryInterface {
/**
* Date time specific types.
*
* @var string[]
*/
protected const DATE_TIME_TYPES = [
'DATE',
'TIME',
'DATETIME',
];
/**
* Default type to use for fields if no data-dictionary type is specified.
*
* @var string
*/
protected const DEFAULT_FIELD_TYPE = 'TEXT';
/**
* Max total size of the MySQL decimal type.
*
* @var int
*/
protected const DECIMAL_MAX_SIZE = 65;
/**
* Max decimal size of the MySQL decimal type.
*
* @var int
*/
protected const DECIMAL_MAX_DECIMAL = 30;
/**
* Default index field length.
*
* @var int
*/
protected const DEFAULT_INDEX_FIELD_LENGTH = 50;
/**
* MySQL string field types.
*
* @var string[]
*/
protected const STRING_FIELD_TYPES = [
'CHAR',
'VARCHAR',
'TEXT',
'TINYTEXT',
'BINARY',
'VARBINARY',
'BLOB',
];
/**
* Mapping between frictionless types and SQL types.
*
* Frictionless type is key, SQL is value.
*
* @var string[]
*/
protected static $frictionlessTypes = [
'string' => 'TEXT',
'number' => 'DECIMAL',
'integer' => 'INT',
'date' => 'DATE',
'time' => 'TIME',
'datetime' => 'DATETIME',
'year' => 'YEAR',
'yearmonth' => 'TINYTEXT',
'boolean' => 'BOOL',
'object' => 'TEXT',
'geopoint' => 'TEXT',
'geojson' => 'TEXT',
'array' => 'TEXT',
'duration' => 'TINYTEXT',
];
/**
* {@inheritdoc}
*/
public function doExecute(): void {
// Sanitize field names to match database field names.
$this->fields = $this->sanitizeFields($this->fields);
// Filter out fields which are not present in the database table.
$this->fields = $this->mergeFields($this->fields, $this->table);
// Sanitize index field names to match database field names.
$this->indexes = $this->sanitizeIndexes($this->indexes);
// Filter out indexes with fields which are not present in the table.
$this->indexes = $this->mergeIndexes($this->indexes, $this->table);
// Build and execute SQL commands to prepare for table alter.
$pre_alter_commands = $this->buildPreAlterCommands($this->fields, $this->table);
array_map(fn ($cmd) => $cmd->execute(), $pre_alter_commands);
// Build SQL command to perform table alter.
$command = $this->buildAlterCommand($this->table, $this->fields, $this->indexes);
// Execute alter command.
$command->execute();
}
/**
* Sanitize field names.
*
* @param array $fields
* Query fields.
*
* @return array
* Query fields list with sanitized field names.
*/
protected function sanitizeFields(array $fields): array {
// Iterate through field list...
foreach (array_keys($fields) as $key) {
// Create reference to index field name.
$field_name = &$fields[$key]['name'];
// Sanitize field name.
$field_name = ImportJob::sanitizeHeader($field_name);
}
return $fields;
}
/**
* Sanitize index field names.
*
* @param array $indexes
* Query indexes.
*
* @return array
* Query indexes list with sanitized index field names.
*/
protected function sanitizeIndexes(array $indexes): array {
// Iterate through index list...
foreach (array_keys($indexes) as $index_key) {
// Create reference to index field list.
$index_fields = &$indexes[$index_key]['fields'];
// Iterate through index field list...
foreach (array_keys($index_fields) as $field_key) {
// Create reference to index field name.
$field_name = &$index_fields[$field_key]['name'];
// Sanitize field name.
$field_name = ImportJob::sanitizeHeader($field_name);
}
}
return $indexes;
}
/**
* Remove query fields not found in the given table and copy over names.
*
* @param array $fields
* Query fields.
* @param string $table
* MySQL table to filter against.
*
* @return array
* Filtered and updated list of applicable query fields.
*/
protected function mergeFields(array $fields, string $table): array {
$table_cols = $this->getTableColsAndComments($table);
$column_names = array_keys($table_cols);
// Filter out un-applicable query fields.
$filtered_fields = array_filter($fields, fn ($fields) => in_array($fields['name'], $column_names, TRUE));
// Fill missing field titles.
foreach ($table_cols as $column_name => $comment) {
if (isset($filtered_fields[$column_name])) {
$filtered_fields[$column_name]['title'] = $filtered_fields[$column_name]['title'] ?: $comment;
}
}
return $filtered_fields;
}
/**
* Remove query indexes with fields not found in the given table and copy over names.
*
* @param array $indexes
* Query indexes.
* @param string $table
* MySQL table to filter against.
*
* @return array
* Filtered list of applicable query indexes.
*/
protected function mergeIndexes(array $indexes, string $table): array {
$table_cols = $this->getTableColsAndComments($table);
$column_names = array_keys($table_cols);
// Filter out un-applicable query indexes.
$indexes = array_filter($indexes, function ($index) use ($column_names) {
$fields = array_column($index['fields'], 'name');
return empty(array_diff($fields, $column_names));
});
return $indexes;
}
/**
* Get list of MySQL table field details.
*
* @param string $table
* Table name.
*
* @return string[]
* List of column comments keyed by column names.
*/
protected function getTableColsAndComments(string $table): array {
return $this->connection->query("SHOW FULL COLUMNS FROM {{$table}};")->fetchAllKeyed(0, 8);
}
/**
* Build full MySQL equivalent of the given Frictionless "Table Schema" type.
*
* @param string $frictionless_type
* Frictionless "Table Schema" data type.
* @param string $column
* MySQL table column to get type for.
* @param string $table
* MySQL table to get type for.
*
* @return string
* Full MySQL data type.
*/
protected function getFieldType(string $frictionless_type, string $column, string $table): string {
// Determine MySQL base type.
$base_mysql_type = $this->getBaseType($frictionless_type);
// Build the MySQL type argument list.
$args = $this->buildTypeArgs($base_mysql_type, $column, $table);
$args_str = !empty($args) ? '(' . implode(', ', $args) . ')' : '';
// Build full MySQL type.
return $base_mysql_type . $args_str;
}
/**
* Get base MySQL equivalent of the given Frictionless "Table Schema" type.
*
* @param string $frictionless_type
* Frictionless "Table Schema" data type.
*
* @return string
* Base MySQL data type.
*/
protected function getBaseType(string $frictionless_type): string {
if ($sql_type = static::$frictionlessTypes[$frictionless_type] ?? FALSE) {
return $sql_type;
}
throw new \InvalidArgumentException($frictionless_type . ' is not a valid frictionless type.');
}
/**
* Build MySQL type arg list for MySQL type.
*
* @param string $type
* MySQL data type.
* @param string $column
* Column name.
* @param string $table
* Table name.
*
* @return array
* MySQL type arguments.
*
* @throws Drupal\datastore\DataDictionary\IncompatibleTypeException
* When incompatible data is found in the table for the specified type.
*/
protected function buildTypeArgs(string $type, string $column, string $table): array {
// If this field is a DECIMAL field, build decimal and size arguments.
if ($type === 'DECIMAL') {
$non_decimals = $this->connection->query("SELECT MAX(LENGTH(TRIM(LEADING '-' FROM SUBSTRING_INDEX({$column}, '.', 1)))) FROM {{$table}};")->fetchField();
$decimal = $this->connection->query("SELECT MAX(LENGTH(SUBSTRING_INDEX({$column}, '.', -1))) FROM {{$table}};")->fetchField();
$size = $non_decimals + $decimal;
if ($size > self::DECIMAL_MAX_SIZE || $decimal > self::DECIMAL_MAX_DECIMAL) {
throw new IncompatibleTypeException("Decimal values found in column too large for DECIMAL type; please use type 'string' for column '{$column}'");
}
return [$size, $decimal];
}
return [];
}
/**
* Build list of commands to prepare table for alter command.
*
* @param array $query_fields
* Query fields.
* @param string $table
* Mysql table name.
*
* @return \Drupal\Core\Database\StatementInterface[]
* Prep command statements.
*/
protected function buildPreAlterCommands(array $query_fields, string $table): array {
$pre_alter_cmds = [];
// Build pre-alter commands for each query field.
foreach ($query_fields as ['name' => $col, 'type' => $type, 'format' => $format]) {
// Determine base MySQL type for Frictionless column type.
$base_type = $this->getBaseType($type);
// Replace empty strings with NULL for non-text columns to prevent
// misc. errors (i.e. STR_TO_DATE function related and "Incorrect
// `type` value" errors).
if (!in_array($base_type, self::STRING_FIELD_TYPES, TRUE)) {
$pre_alter_cmds[] = $this->connection->update($table)->condition($col, '')->expression($col, 'NULL');
}
// Build pre-alter commands for date fields.
if (in_array($base_type, self::DATE_TIME_TYPES, TRUE)) {
$pre_alter_cmds = array_merge($pre_alter_cmds, $this->buildDatePreAlterCommands($table, $col, $format));
}
// Build pre-alter commands for boolean fields.
if ($base_type === 'BOOL') {
$pre_alter_cmds = array_merge($pre_alter_cmds, $this->buildBoolPreAlterCommands($table, $col));
}
}
return $pre_alter_cmds;
}
/**
* Build pre-alter commands for date fields.
*
* Update format of the date fields to ISO-8601 before importing into MySQL.
*
* @param string $table
* Table name.
* @param string $column
* Table column.
* @param string $format
* Field frictionless date format.
*
* @return \Drupal\Core\Database\Query\Update[]
* Pre-alter update DB queries.
*/
protected function buildDatePreAlterCommands(string $table, string $column, string $format): array {
$pre_alter_cmds = [];
// If a valid format is provided...
if (!empty($format) && $format !== 'default') {
$mysql_date_format = $this->dateFormatConverter->convert($format);
// Convert date formats for date column.
$pre_alter_cmds[] = $this->connection->update($table)->expression($column, "STR_TO_DATE({$column}, :date_format)", [
':date_format' => $mysql_date_format,
]);
}
return $pre_alter_cmds;
}
/**
* Build pre-alter commands for boolean fields.
*
* Convert strings 'true' and 'false' to '1' and '0' for boolean fields.
*
* @param string $table
* Table name.
* @param string $column
* Table column.
*
* @return \Drupal\Core\Database\Query\Update[]
* Pre-alter update DB queries.
*/
protected function buildBoolPreAlterCommands(string $table, string $column): array {
return [
$this->connection->update($table)->where("UPPER({$column}) = :value", [':value' => 'FALSE'])->expression($column, '0'),
$this->connection->update($table)->where("UPPER({$column}) = :value", [':value' => 'TRUE'])->expression($column, '1'),
];
}
/**
* Build alter command to modify table column data types.
*
* @param string $table
* MySQL table name.
* @param array $fields
* Query fields.
* @param array $indexes
* Query indexes.
*
* @return \Drupal\Core\Database\StatementInterface
* Prepared MySQL table alter command statement.
*/
protected function buildAlterCommand(string $table, array $fields, array $indexes): StatementInterface {
$mysql_type_map = $this->buildDatabaseTypeMap($fields, $table);
// Build alter options.
$alter_options = array_merge(
$this->buildModifyColumnOptions($fields, $mysql_type_map),
$this->buildAddIndexOptions($indexes, $table, $mysql_type_map)
);
return $this->connection->prepareStatement("ALTER TABLE {{$table}} " . implode(', ', $alter_options) . ';', []);
}
/**
* Build MySQL type map from Frictionless field definitions.
*
* @param array $fields
* Frictionless field definitions.
* @param string $table
* Table name.
*
* @return string[]
* Column name -> MySQL type map.
*/
protected function buildDatabaseTypeMap(array $fields, string $table): array {
$type_map = [];
foreach ($fields as ['name' => $field, 'type' => $type]) {
// Get MySQL type for column.
$type_map[$field] = $this->getFieldType($type, $field, $table);
}
return $type_map;
}
/**
* Build alter command modify column options.
*
* @param array $fields
* Query fields.
* @param string[] $type_map
* Field -> MySQL type map.
*
* @return string[]
* Modify column options.
*/
protected function buildModifyColumnOptions(array $fields, array $type_map): array {
$modify_column_options = [];
foreach ($fields as ['name' => $field, 'title' => $title]) {
$column_type = $type_map[$field];
// Escape characters in column title in preparation for it being used as
// a MySQL comment.
$comment = addslashes($title);
// Build modify line for alter command and add the appropriate arguments
// to the args list.
$modify_column_options[] = "MODIFY COLUMN {$field} {$column_type} COMMENT '{$comment}'";
}
return $modify_column_options;
}
/**
* Build alter command add index options.
*
* @param array $indexes
* Query indexes.
* @param string $table
* Table name.
* @param string[] $type_map
* Field -> MySQL type map.
*
* @return string[]
* Add index options.
*/
protected function buildAddIndexOptions(array $indexes, string $table, array $type_map): array {
$add_index_options = [];
foreach ($indexes as ['name' => $name, 'type' => $index_type, 'fields' => $fields, 'description' => $description]) {
// Translate Frictionless index type to MySQL.
$mysql_index_type = $this->getIndexType($index_type);
// Build field options.
$field_options = array_map(function ($field) use ($table, $type_map) {
$name = $field['name'];
$length = $field['length'];
$type = $type_map[$name] ?? self::DEFAULT_FIELD_TYPE;
return $this->buildIndexFieldOption($name, $length, $table, $type);
}, $fields);
$formatted_field_options = implode(', ', $field_options);
// Escape characters in index description in preparation for it being
// used as a MySQL comment.
$comment = addslashes($description);
// Build add index option list.
$add_index_options[] = "ADD {$mysql_index_type} INDEX {$name} ({$formatted_field_options}) COMMENT '{$comment}'";
}
return $add_index_options;
}
/**
* Convert Frictionless to MySQL index types.
*
* @param string $frictionless_type
* Frictionless index type.
*
* @return string
* MySQL index type.
*/
protected function getIndexType(string $frictionless_type): string {
return ([
'index' => '',
'fulltext' => 'FULLTEXT',
])[$frictionless_type];
}
/**
* Build formatted index field option.
*
* @param string $name
* Index field name.
* @param int|null $length
* Index field length.
* @param string $table
* Table name.
* @param string $type
* MySQL column type.
*
* @return string
* Formatted index field option string.
*/
protected function buildIndexFieldOption(string $name, ?int $length, string $table, string $type): string {
// Extract base type from full MySQL type ("DECIMAL(12, 3)" -> "DECIMAL").
$base_type = strtok($type, '(');
// If this field is a string type, determine what it's length should be...
if (in_array($base_type, self::STRING_FIELD_TYPES)) {
// Initialize length to the default index field length if not set.
$length ??= self::DEFAULT_INDEX_FIELD_LENGTH;
// Retrieve the max length for this table column.
$max_length = $this->getMaxColumnLength($name, $table);
// If the specified length is greater than the max length, defer to the
// max length.
$length = ($length > $max_length) ? $max_length : $length;
// Format the length.
$formatted_length = ' (' . strval($length) . ')';
}
// Otherwise, don't specify a length.
else {
$formatted_length = '';
}
return $name . $formatted_length;
}
/**
* Get the length of the largest value in the specified table column.
*
* @param string $column
* Table column name.
* @param string $table
* Table name.
*
* @return int
* Max table column length.
*/
protected function getMaxColumnLength(string $column, string $table): int {
$max_length = $this->connection->query("SELECT MAX(LENGTH({$column})) FROM {{$table}};")->fetchField();
return intval($max_length);
}
}