src/Extractors/UniqueKeyExtractorAbstract.php
<?php
/*
* This file is part of YaEtl
* (c) Fabrice de Stefanis / https://github.com/fab2s/YaEtl
* This source file is licensed under the MIT license which you will
* find in the LICENSE file or at https://opensource.org/licenses/MIT
*/
namespace fab2s\YaEtl\Extractors;
use fab2s\NodalFlow\NodalFlowException;
use fab2s\YaEtl\YaEtlException;
/**
* Abstract Class UniqueKeyExtractorAbstract
*/
abstract class UniqueKeyExtractorAbstract extends DbExtractorAbstract implements JoinableInterface
{
/**
* The joined record collection
*
* @var array
*/
protected $joinedRecords;
/**
* The composite key representation
*
* @var array|string
*/
protected $compositeKey;
/**
* The unique key name
*
* @var string|null
*/
protected $uniqueKeyName;
/**
* The unique key alias
*
* @var string|null
*/
protected $uniqueKeyAlias;
/**
* List of unique key values, used to be joinable
*
* @var array
*/
protected $uniqueKeyValues = [];
/**
* unique key value buffer, used to align batch sizes
*
* @var array
*/
protected $uniqueKeyValueBuffer = [];
/**
* This Node's OnClose object if any
*
* @var OnClauseInterface|null
*/
protected $onClose;
/**
* List of all joiners by their OnClose constraints
*
* @var array of OnClauseInterface
*/
protected $joinerOnCloses = [];
/**
* The record map
*
* @var array
*/
protected $recordMap;
/**
* The Joinable we may be joining against
*
* @var JoinableInterface
*/
protected $joinFrom;
/**
* Generic extraction from tables with unique (composite) key
*
* @param string|null $extractQuery
* @param array|string $uniqueKeySetup can be either a unique key name as
* string
* `'(table.)compositeKeyName' // ('id' by default)`
*
* or an array :
* `['(table.)compositeKey1'] // single unique key`
* `['(table.)compositeKey1', '(table.)compositeKey2', ] // composite unique key`
*
* or an associative array in case you are using aliases :
* `[
* '(table.)compositeKey1' => 'aliasNameAsInRecord',
* ]`
*
* and :
* `[
* '(table.)compositeKey1' => 'aliasNameAsInRecord1',
* '(table.)compositeKey2' => 'aliasNameAsInRecord2',
* // ...
* ]`
*
* @throws NodalFlowException
*/
public function __construct(?string $extractQuery = null, $uniqueKeySetup = 'id')
{
$this->configureUniqueKey($uniqueKeySetup);
$this->nodeIncrements = array_replace($this->nodeIncrements, [
'num_join' => 0,
]);
parent::__construct($extractQuery);
}
/**
* Get this Joiner's ON clause. Only used in Join mode
*
* @return OnClauseInterface|null
*/
public function getOnClause(): ?OnClauseInterface
{
return $this->onClose;
}
/**
* Set Joiner's ON clause. Only used in Join mode
*
* @param OnClauseInterface $onClause
*
* @return $this
*/
public function setOnClause(OnClauseInterface $onClause): JoinableInterface
{
$this->onClose = $onClause;
return $this;
}
/**
* Register ON clause field mapping. Used by an eventual joiner to this
*
* @param OnClauseInterface $onClause
*
* @return $this
*/
public function registerJoinerOnClause(OnClauseInterface $onClause): JoinableInterface
{
$this->joinerOnCloses[] = $onClause;
return $this;
}
/**
* Register the extractor we would be joining against
*
* @param JoinableInterface $joinFrom
*
* @throws YaEtlException
*
* @return $this
*/
public function setJoinFrom(JoinableInterface $joinFrom): JoinableInterface
{
// at least make sure this joinable extends this very class
// to enforce getRecordMap() type
if (!is_a($joinFrom, self::class)) {
throw new YaEtlException('The extractor joined against is not compatible, expected implementation of: ' . self::class . "\ngot: " . \get_class($joinFrom));
}
// since we are joining, we are not a traversable anymore
$this->isATraversable = false;
// and we return a value
$this->isAReturningVal = true;
$this->joinFrom = $joinFrom;
return $this;
}
/**
* Get record map, used to allow joiners to join
*
* @param string|null $fromKeyAlias The from unique key to get the map against
* as exposed in the record
*
* @return array [keyValue1, keyValue2, ...]
*/
public function getRecordMap(?string $fromKeyAlias = null)
{
return $fromKeyAlias === null ? $this->recordMap : $this->recordMap[$fromKeyAlias];
}
/**
* Trigger extract
*
* @param mixed $param
*
* @throws YaEtlException
*
* @return bool
*/
public function extract($param = null): bool
{
if (isset($this->joinFrom)) {
return $this->joinExtract();
}
// enforce limit if any is set
if ($this->isLimitReached()) {
return false;
}
$this->enforceBatchSize();
if ($this->fetchRecords()) {
$this->incrementOffset()
->genRecordMap();
return true;
}
return false;
}
/**
* Enforce batch size consistency
*
* @return static
*/
public function enforceBatchSize(): ExtractorBatchLimitInterface
{
if (isset($this->joinFrom)) {
// obey batch size to allow fromer to fetch a huge amount of records
// while keeping the "where in" query size under control by splitting
// it into several chunks.
// append uniqueKeyValues to uniqueKeyValueBuffer
$this->uniqueKeyValueBuffer = \array_replace($this->uniqueKeyValueBuffer, $this->uniqueKeyValues);
// only keep batchSize
$this->uniqueKeyValues = \array_slice($this->uniqueKeyValueBuffer, 0, $this->batchSize, true);
// drop consumed keys
$this->uniqueKeyValueBuffer = \array_slice($this->uniqueKeyValueBuffer, $this->batchSize, null, true);
return $this;
}
parent::enforceBatchSize();
return $this;
}
/**
* Execute the Join
*
* @param mixed $record
*
* @throws YaEtlException
*
* @return mixed The result of the join
*/
public function exec($record = null)
{
$uniqueKeyValue = $record[$this->uniqueKeyAlias];
if (isset($this->joinedRecords[$uniqueKeyValue])) {
$joinRecord = $this->joinedRecords[$uniqueKeyValue];
unset($this->joinedRecords[$uniqueKeyValue]);
if ($joinRecord === false) {
// skip record
$this->getCarrier()->continueFlow();
return $record;
}
return $this->onClose-> /* @scrutinizer ignore-call */ merge($record, $joinRecord);
}
if ($this->joinExtract()) {
return $this->exec($record);
}
// something is wrong as uniqueKeyValueBuffer should
// never run out until the fromer stop providing records
// which means we do not want to reach here
throw new YaEtlException('Record map mismatch between Joiner ' . \get_class($this) . ' and Fromer ' . \get_class($this->joinFrom));
}
/**
* Trigger an extract in join mode
*
* @throws YaEtlException
*
* @return bool
*/
protected function joinExtract(): bool
{
// join mode, get record map
$this->uniqueKeyValues = $this->joinFrom->getRecordMap($this->onClose-> /* @scrutinizer ignore-call */ getFromKeyAlias());
// limit does not apply in join mode
$this->enforceBatchSize();
if (empty($this->uniqueKeyValues)) {
return false;
}
if ($this->fetchJoinedRecords()) {
$this->getCarrier()->getFlowMap()->incrementNode($this->getId(), 'num_join');
// gen record map before we set defaults
$this->genRecordMap()
->setDefaultExtracted();
return true;
}
return false;
}
/**
* Configure the unique key
*
* @param array|string $uniqueKeySetup can be either a unique key name as
* string
* `'(table.)compositeKeyName' // ('id' by default)`
*
* or an array :
* `['(table.)compositeKey1'] // single unique key`
* `['(table.)compositeKey1', '(table.)compositeKey2', ] // composite unique key`
*
* or an associative array in case you are using aliases :
* `[
* '(table.)compositeKey1' => 'aliasNameAsInRecord',
* ]`
*
* and :
* `[
* '(table.)compositeKey1' => 'aliasNameAsInRecord1',
* '(table.)compositeKey2' => 'aliasNameAsInRecord2',
* // ...
* ]`
*
* @return static
*/
protected function configureUniqueKey($uniqueKeySetup): self
{
$uniqueKeySetup = \is_array($uniqueKeySetup) ? $uniqueKeySetup : [$uniqueKeySetup];
$this->compositeKey = [];
$this->uniqueKeyName = null;
$this->uniqueKeyAlias = null;
foreach ($uniqueKeySetup as $key => $value) {
if (\is_numeric($key)) {
$compositeKeyName = $this->cleanUpKeyName($value);
$compositeKeyParts = \explode('.', $compositeKeyName);
$compositeKeyAlias = \end($compositeKeyParts);
} else {
$compositeKeyName = $this->cleanUpKeyName($key);
$compositeKeyAlias = $this->cleanUpKeyName($value);
}
$this->compositeKey[$compositeKeyName] = $compositeKeyAlias;
}
if (\count($this->compositeKey) === 1) {
$this->uniqueKeyName = \key($this->compositeKey);
$this->uniqueKeyAlias = \current($this->compositeKey);
}
return $this;
}
/**
* Clean up key names
*
* @param string $keyName
*
* @return string
*/
protected function cleanUpKeyName($keyName): string
{
return \trim($keyName, '` ');
}
/**
* Prepare record set to obey join mode eg return record = true
* to break branch execution when no match are found in join more
* or default to be later merged in left join mode
*
* @return static
*/
protected function setDefaultExtracted(): self
{
$defaultRecord = $this->onClose-> /* @scrutinizer ignore-call */ isLeftJoin() ? $this->onClose-> /* @scrutinizer ignore-call */ getDefaultRecord() : false;
$defaultExtracted = \array_fill_keys($this->uniqueKeyValues, $defaultRecord);
$this->joinedRecords = \array_replace($defaultExtracted, /* @scrutinizer ignore-type */ $this->joinedRecords);
return $this;
}
/**
* Generate record map
*
* @throws YaEtlException
*
* @return static
*/
protected function genRecordMap(): self
{
// here we need to build record map ready for all joiners
$this->recordMap = [];
foreach ($this->joinerOnCloses as $onClose) {
$fromKeyAlias = $onClose->getFromKeyAlias();
if (isset($this->recordMap[$fromKeyAlias])) {
// looks like there is more than
// one joiner on this key
continue;
}
// generate record map
$this->recordMap[$fromKeyAlias] = [];
$map = &$this->recordMap[$fromKeyAlias];
// we do not want to map defaults here as we do not want joiners
// to this to join on null
// we could optimize a little bit for cases where
// $this->joinedRecords is an indexed array on the proper key but ...
foreach ($this->getExtracted() as $record) {
if (!isset($record[$fromKeyAlias])) {
// Since we do not enforce key alias existence during init
// we have to do it here
throw new YaEtlException("From Key Alias not found in record: $fromKeyAlias");
}
$fromKeyValue = $record[$fromKeyAlias];
$map[$fromKeyValue] = $fromKeyValue;
}
}
return $this;
}
/**
* fetch records when joining against another extractor
* They should still be send to setExtracted to be made
* available in map generation for eventual joiners to
* this joiner and also fill up joinedRecords as an
* associative array indexed by the proper join key
*
* @return bool
*/
abstract protected function fetchJoinedRecords(): bool;
}