GetDKAN/dkan

View on GitHub
modules/datastore/src/Plugin/QueueWorker/ImportJob.php

Summary

Maintainability
A
0 mins
Test Coverage
A
99%
<?php

namespace Drupal\datastore\Plugin\QueueWorker;

use Contracts\ParserInterface;
use Drupal\common\Storage\DatabaseTableInterface;
use Procrastinator\Job\AbstractPersistentJob;
use Procrastinator\Result;
use ForceUTF8\Encoding;

/**
 * Procrastinator job for importing to the datastore.
 */
class ImportJob extends AbstractPersistentJob {

  /**
   * The maximum length of a MySQL table column name.
   *
   * @var int
   */
  protected const MAX_COLUMN_LENGTH = 64;

  /**
   * List of reserved words in MySQL 5.6-8 and MariaDB.
   *
   * @var string[]
   */
  protected const RESERVED_WORDS = [
    'accessible', 'add', 'all', 'alter', 'analyze', 'and', 'as', 'asc',
    'asensitive', 'before', 'between', 'bigint', 'binary', 'blob', 'both', 'by',
    'call', 'cascade', 'case', 'change', 'char', 'character', 'check',
    'collate', 'column', 'condition', 'constraint', 'continue', 'convert',
    'create', 'cross', 'cube', 'cume_dist', 'current_date', 'current_role',
    'current_time', 'current_timestamp', 'current_user', 'cursor', 'database',
    'databases', 'day_hour', 'day_microsecond', 'day_minute', 'day_second',
    'dec', 'decimal', 'declare', 'default', 'delayed', 'delete', 'dense_rank',
    'desc', 'describe', 'deterministic', 'distinct', 'distinctrow', 'div',
    'do_domain_ids', 'double', 'drop', 'dual', 'each', 'else', 'elseif',
    'empty', 'enclosed', 'escaped', 'except', 'exists', 'exit', 'explain',
    'false', 'fetch', 'first_value', 'float', 'float4', 'float8', 'for',
    'force', 'foreign', 'from', 'fulltext', 'function', 'general', 'generated',
    'get', 'grant', 'group', 'grouping', 'groups', 'having', 'high_priority',
    'hour_microsecond', 'hour_minute', 'hour_second', 'if', 'ignore',
    'ignore_domain_ids', 'ignore_server_ids', 'in', 'index', 'infile', 'inner',
    'inout', 'insensitive', 'insert', 'int', 'int1', 'int2', 'int3', 'int4',
    'int8', 'integer', 'intersect', 'interval', 'into', 'io_after_gtids',
    'io_before_gtids', 'is', 'iterate', 'join', 'json_table', 'key', 'keys',
    'kill', 'lag', 'last_value', 'lateral', 'lead', 'leading', 'leave', 'left',
    'like', 'limit', 'linear', 'lines', 'load', 'localtime', 'localtimestamp',
    'lock', 'long', 'longblob', 'longtext', 'loop', 'low_priority',
    'master_bind', 'master_heartbeat_period', 'master_ssl_verify_server_cert',
    'match', 'maxvalue', 'mediumblob', 'mediumint', 'mediumtext', 'middleint',
    'minute_microsecond', 'minute_second', 'mod', 'modifies', 'natural', 'not',
    'no_write_to_binlog', 'nth_value', 'ntile', 'null', 'numeric', 'of',
    'offset', 'on', 'optimize', 'optimizer_costs', 'option', 'optionally', 'or',
    'order', 'out', 'outer', 'outfile', 'over', 'page_checksum',
    'parse_vcol_expr', 'partition', 'percent_rank', 'position', 'precision',
    'primary', 'procedure', 'purge', 'range', 'rank', 'read', 'reads',
    'read_write', 'real', 'recursive', 'references', 'ref_system_id', 'regexp',
    'release', 'rename', 'repeat', 'replace', 'require', 'resignal', 'restrict',
    'return', 'returning', 'revoke', 'right', 'rlike', 'row', 'row_number',
    'rows', 'schema', 'schemas', 'second_microsecond', 'select', 'sensitive',
    'separator', 'set', 'show', 'signal', 'slow', 'smallint', 'spatial',
    'specific', 'sql', 'sql_big_result', 'sql_calc_found_rows', 'sqlexception',
    'sql_small_result', 'sqlstate', 'sqlwarning', 'ssl', 'starting',
    'stats_auto_recalc', 'stats_persistent', 'stats_sample_pages', 'stored',
    'straight_join', 'system', 'table', 'terminated', 'then', 'tinyblob',
    'tinyint', 'tinytext', 'to', 'trailing', 'trigger', 'true', 'undo', 'union',
    'unique', 'unlock', 'unsigned', 'update', 'usage', 'use', 'using',
    'utc_date', 'utc_time', 'utc_timestamp', 'values', 'varbinary', 'varchar',
    'varcharacter', 'varying', 'virtual', 'when', 'where', 'while', 'window',
    'with', 'write', 'xor', 'year_month', 'zerofill',
  ];

  /**
   * Storage class.
   *
   * @var \Drupal\common\Storage\DatabaseTableInterface
   */
  protected $dataStorage;

  /**
   * Parser object.
   *
   * @var \Contracts\ParserInterface
   */
  protected $parser;

  /**
   * Datastore resource.
   *
   * @var \Drupal\datastore\DatastoreResource
   */
  protected $resource;

  public const BYTES_PER_CHUNK = 8192;

  /**
   * Constructor method.
   *
   * @param string $identifier
   *   Job identifier.
   * @param mixed $storage
   *   Storage class.
   * @param array|null $config
   *   Configuration options.
   */
  protected function __construct(string $identifier, $storage, array $config = NULL) {
    parent::__construct($identifier, $storage, $config);

    $this->dataStorage = $config['storage'];

    if (!($this->dataStorage instanceof DatabaseTableInterface)) {
      throw new \Exception('Storage must be an instance of ' . DatabaseTableInterface::class);
    }

    $this->parser = $config['parser'];
    $this->resource = $config['resource'];
  }

  /**
   * Transform possible multiline string to single line for description.
   *
   * @param string $column
   *   Column name.
   *
   * @return string
   *   Column name on single line.
   */
  public static function sanitizeDescription(string $column) {
    $trimmed = array_filter(array_map('trim', explode("\n", $column)));
    return implode(" ", $trimmed);
  }

  /**
   * Sanitize table column name according to the MySQL supported characters.
   *
   * @param string $column
   *   The column name being sanitized.
   *
   * @returns string
   *   Sanitized column name.
   */
  public static function sanitizeHeader(string $column): string {
    // Replace all spaces and newline characters with underscores since they are
    // not supported.
    $column = preg_replace('/(?: |\r\n|\r|\n)/', '_', $column);
    // Strip unsupported characters from the header.
    $column = preg_replace('/[^A-Za-z0-9_]/', '', $column);
    // Trim underscores from the beginning and end of the column name.
    $column = trim($column, '_');
    // Convert the column name to lowercase.
    $column = strtolower($column);

    if (is_numeric($column) || in_array($column, ImportJob::RESERVED_WORDS)) {
      // Prepend "_" to column name that are not allowed in MySQL
      // This can be dropped after move to Drupal 9.
      // @see https://github.com/GetDKAN/dkan/issues/3606
      $column = '_' . $column;
    }

    return $column;
  }

  /**
   * Truncate column name if longer than the max column length for the database.
   *
   * @param string $column
   *   The column name being truncated.
   *
   * @returns string
   *   Truncated column name.
   */
  public static function truncateHeader(string $column): string {
    // If the supplied table column name is longer than the max column length,
    // truncate the column name to 5 characters under the max length and
    // substitute the truncated characters with a unique hash.
    if (strlen($column) > ImportJob::MAX_COLUMN_LENGTH) {
      $field = substr($column, 0, ImportJob::MAX_COLUMN_LENGTH - 5);
      $hash = substr(md5($column), 0, 4);
      $column = $field . '_' . $hash;
    }

    return $column;
  }

  /**
   * Get the storage object.
   */
  public function getStorage() {
    return $this->dataStorage;
  }

  /**
   * {@inheritdoc}
   */
  protected function runIt() {
    $filename = $this->resource->getFilePath();
    $size = @filesize($filename);
    if (!$size) {
      return $this->setResultError("Can't get size from file {$filename}");
    }

    if ($size <= $this->getBytesProcessed()) {
      return $this->getResult();
    }

    $maximum_execution_time = $this->getTimeLimit() ? (time() + $this->getTimeLimit()) : PHP_INT_MAX;

    try {
      $this->assertTextFile($filename);
      $this->parseAndStore($filename, $maximum_execution_time);
    }
    catch (\Exception $e) {
      return $this->setResultError($e->getMessage());
    }

    // Flush the parser.
    $this->store();

    if ($this->getBytesProcessed() >= $size) {
      $this->setStatus(Result::DONE);
    }
    else {
      $this->setStatus(Result::STOPPED);
    }

    return $this->getResult();
  }

  /**
   * Confirm this is a valid text file.
   *
   * Allow 'text/*' or 'application/*' per PHP 8.0 changes.
   *
   * @param string $filename
   *   Filename to test.
   *
   * @throws \Exception
   *   Will throw exception if not valid, do nothing if valid.
   */
  protected function assertTextFile(string $filename) {
    if ($mimeType = mime_content_type($filename)) {
      $mime_explode = explode('/', $mimeType);
      if (!in_array($mime_explode[0], ['text', 'application'])) {
        throw new \Exception("Invalid mime type: {$mimeType}");
      }
    }
  }

  /**
   * Add error message to result object.
   *
   * @param mixed $message
   *   Result message. Usually a string.
   *
   * @return \Procrastinator\Result
   *   Updated result object.
   */
  protected function setResultError($message): Result {
    // Use these two different call methods so that we only write the status to
    // the storage once.
    $this->getResult()->setStatus(Result::ERROR);
    $this->setError($message);
    return $this->getResult();
  }

  /**
   * Get current count of bytes processed of file.
   *
   * @return int
   *   Count of bytes processed.
   */
  protected function getBytesProcessed() {
    $chunksProcessed = $this->getStateProperty('chunksProcessed', 0);
    return $chunksProcessed * self::BYTES_PER_CHUNK;
  }

  /**
   * Parse a file and store results.
   *
   * @param string $filename
   *   The file name including path.
   * @param mixed $maximumExecutionTime
   *   Maximum time to parse for before exiting.
   */
  protected function parseAndStore($filename, $maximumExecutionTime) {
    $h = fopen($filename, 'r');
    fseek($h, $this->getBytesProcessed());

    $chunksProcessed = $this->getStateProperty('chunksProcessed', 0);
    while (time() < $maximumExecutionTime) {
      $chunk = fread($h, self::BYTES_PER_CHUNK);

      if (!$chunk) {
        $this->setStatus(Result::DONE);
        $this->parser->finish();
        break;
      }
      $chunk = Encoding::toUTF8($chunk);
      $this->parser->feed($chunk);
      $chunksProcessed++;

      $this->setStateProperty('chunksProcessed', $chunksProcessed);
      $this->store();
    }
    fclose($h);
  }

  /**
   * Drop all import jobs.
   */
  public function drop() {
    $results = $this->dataStorage->retrieveAll();
    foreach ($results as $id => $data) {
      $this->dataStorage->remove($id);
    }
    $this->setStatus(Result::STOPPED);
  }

  /**
   * Store the current instance of ImportJob.
   */
  protected function store() {
    $recordNumber = $this->getStateProperty('recordNumber', 0);
    $records = [];
    foreach ($this->parser->getRecords() as $record) {
      // Skip the first record. It is the header.
      if ($recordNumber != 0) {
        // @todo Identify if we need to pass an id to the storage.
        $records[] = json_encode($record);
      }
      else {
        $this->setStorageSchema($record);
      }
      $recordNumber++;
    }
    if (!empty($records)) {
      $this->dataStorage->storeMultiple($records);
    }
    $this->setStateProperty('recordNumber', $recordNumber);
  }

  /**
   * Set the schema for the datastore storage operation.
   *
   * @param array $header
   *   Array of header strings.
   */
  protected function setStorageSchema(array $header) {
    $schema = [];
    $this->assertUniqueHeaders($header);
    foreach ($header as $field) {
      $schema['fields'][$field] = [
        'type' => "text",
      ];
    }
    $this->dataStorage->setSchema($schema);
  }

  /**
   * Verify headers are unique.
   *
   * @param array $header
   *   List of strings.
   *
   * @throws \Exception
   */
  protected function assertUniqueHeaders(array $header) {
    if (count($header) != count(array_unique($header))) {
      $duplicates = array_keys(array_filter(array_count_values($header), function ($i) {
          return $i > 1;
      }));
      throw new \Exception("Duplicate headers error: " . implode(', ', $duplicates));
    }
  }

  /**
   * Get the parser object.
   *
   * @return \Contracts\ParserInterface
   *   Parser object.
   */
  public function getParser(): ParserInterface {
    return $this->parser;
  }

  /**
   * {@inheritdoc}
   */
  protected function serializeIgnoreProperties(): array {
    $ignore = parent::serializeIgnoreProperties();
    $ignore[] = "dataStorage";
    $ignore[] = "resource";
    return $ignore;
  }

}