wikimedia/mediawiki-extensions-Wikibase

View on GitHub
lib/includes/Store/Sql/Terms/DatabaseTermInLangIdsAcquirer.php

Summary

Maintainability
C
7 hrs
Test Coverage
<?php

namespace Wikibase\Lib\Store\Sql\Terms;

use MediaWiki\MediaWikiServices;
use Psr\Log\LoggerInterface;
use Psr\Log\NullLogger;
use Wikibase\Lib\Rdbms\RepoDomainDb;
use Wikibase\Lib\Store\Sql\Terms\Util\ReplicaPrimaryAwareRecordIdsAcquirer;

/**
 * A {@link TermInLangIdsAcquirer} implementation using the database tables
 * wbt_term_in_lang, wbt_text_in_lang, and wbt_text.
 *
 * Because the wbt_text.wbx_text column can only hold up to 255 bytes,
 * terms longer than that (typically non-Latin descriptions)
 * will be truncated, and different terms that only differ after the first
 * 255 bytes will get the same term in lang ID (and thus same other ids too).
 *
 * @see @ref docs_storage_terms
 * @license GPL-2.0-or-later
 */
class DatabaseTermInLangIdsAcquirer implements TermInLangIdsAcquirer {

    /**
     * @var RepoDomainDb
     */
    private $repoDb;

    /**
     * @var TypeIdsAcquirer
     */
    private $typeIdsAcquirer;

    /** @var LoggerInterface */
    private $logger;

    public function __construct(
        RepoDomainDb $repoDb,
        TypeIdsAcquirer $typeIdsAcquirer,
        LoggerInterface $logger = null
    ) {
        $this->repoDb = $repoDb;
        $this->typeIdsAcquirer = $typeIdsAcquirer;
        $this->logger = $logger ?? new NullLogger();
    }

    public function acquireTermInLangIds( array $termsArray, $callback = null ): array {
        if ( $termsArray === [] ) {
            if ( $callback !== null ) {
                ( $callback )( [] );
            }
            return [];
        }

        $termInLangIds = $this->mapTermsArrayToTermIds( $termsArray );

        if ( $callback !== null ) {
            ( $callback )( $termInLangIds );
        }

        $this->restoreCleanedUpIds( $termsArray, $termInLangIds );

        return $termInLangIds;
    }

    /**
     * replace root keys containing type names in termsArray
     * with their respective ids in wbt_type table
     *
     * @param array $termsArray terms per type per language:
     *    [
     *        'type1' => [ ... ],
     *        'type2' => [ ... ],
     *        ...
     *    ]
     *
     * @return array
     *    [
     *        <typeId1> => [ ... ],
     *        <typeId2> => [ ... ],
     *        ...
     *    ]
     */
    private function mapToTypeIds( array $termsArray ) {
        $typeIds = $this->typeIdsAcquirer->acquireTypeIds( array_keys( $termsArray ) );

        $termsArrayByTypeId = [];
        foreach ( $typeIds as $type => $typeId ) {
            $termsArrayByTypeId[$typeId] = $termsArray[$type];
        }

        return $termsArrayByTypeId;
    }

    /**
     * replace text at termsArray leaves with their ids in wbt_text table
     * and return resulting array
     *
     * @param array $termsArray terms per type per language:
     *    [
     *        'type' => [
     *            [ 'language' => 'term' | [ 'term1', 'term2', ... ] ], ...
     *        ], ...
     *    ]
     * @param ReplicaPrimaryAwareRecordIdsAcquirer $textIdsAcquirer
     *
     * @return array
     *    [
     *        'type' => [
     *            [ 'language' => [ <textId1>, <textId2>, ... ] ], ...
     *        ], ...
     *    ]
     */
    private function mapToTextIds(
        array $termsArray,
        ReplicaPrimaryAwareRecordIdsAcquirer $textIdsAcquirer
    ) {
        $texts = [];

        array_walk_recursive( $termsArray, function ( $text ) use ( &$texts ) {
            $texts[] = $text;
        } );

        $textIds = $this->acquireTextIds( $texts, $textIdsAcquirer );

        array_walk_recursive( $termsArray, function ( &$text ) use ( $textIds ) {
            $text = $textIds[$text];
        } );

        return $termsArray;
    }

    /**
     * Since the wbx_text column can hold at most 255 bytes, we truncate the
     * the texts to that length before sending them to the acquirer.
     * Additional mappings ensure that we can still return a map from full,
     * untruncated texts to text IDs (though multiple texts may share the same
     * ID if they only differ after more than 255 bytes).
     *
     * @param string[] $texts
     * @param ReplicaPrimaryAwareRecordIdsAcquirer $textIdsAcquirer
     * @return string[]
     */
    private function acquireTextIds(
        array $texts,
        ReplicaPrimaryAwareRecordIdsAcquirer $textIdsAcquirer
    ) {
        $truncatedTexts = [];
        $contLang = MediaWikiServices::getInstance()->getContentLanguage();
        foreach ( $texts as $text ) {
            $truncatedText = $contLang->truncateForDatabase( $text, 255, '' );
            $truncatedTexts[$text] = $truncatedText;
        }

        $truncatedTextRecords = [];
        foreach ( $truncatedTexts as $truncatedText ) {
            $truncatedTextRecords[] = [ 'wbx_text' => $truncatedText ];
        }
        $truncatedTextRecords = $this->filterUniqueRecords( $truncatedTextRecords );

        $truncatedTextRecordsWithIds = $textIdsAcquirer->acquireIds( $truncatedTextRecords );
        $truncatedTextIds = [];
        foreach ( $truncatedTextRecordsWithIds as $truncatedTextRecordWithId ) {
            $truncatedText = $truncatedTextRecordWithId['wbx_text'];
            $truncatedTextId = $truncatedTextRecordWithId['wbx_id'];
            $truncatedTextIds[$truncatedText] = $truncatedTextId;
        }

        $textIds = [];
        foreach ( $truncatedTexts as $text => $truncatedText ) {
            $textIds[$text] = $truncatedTextIds[$truncatedText];
        }

        return $textIds;
    }

    /**
     * replace ( lang => [ textId, ... ] ) entries with their respective ids
     * in wbt_text_in_lang table and return resulting array
     *
     * @param array $termsArray text ids per type per language
     *    [
     *        'type' => [
     *            [ 'language' => [ <textId1>, <textId2>, ... ] ], ...
     *        ], ...
     *    ]
     * @param ReplicaPrimaryAwareRecordIdsAcquirer $textInLangIdsAcquirer
     *
     * @return array
     *    [
     *        'type' => [ <textInLangId1>, <textInLangId2>, ... ],
     *        ...
     *    ]
     */
    private function mapToTextInLangIds(
        array $termsArray,
        ReplicaPrimaryAwareRecordIdsAcquirer $textInLangIdsAcquirer
    ) {
        $flattenedLangTextIds = [];
        foreach ( $termsArray as $langTextIds ) {
            foreach ( $langTextIds as $lang => $textIds ) {
                if ( !isset( $flattenedLangTextIds[$lang] ) ) {
                    $flattenedLangTextIds[$lang] = [];
                }

                $flattenedLangTextIds[$lang] = array_unique(
                    array_merge(
                        (array)$textIds,
                        (array)$flattenedLangTextIds[$lang]
                    )
                );

            }
        }

        $textInLangIds = $this->acquireTextInLangIds(
            $flattenedLangTextIds,
            $textInLangIdsAcquirer
        );

        $newTermsArray = [];
        foreach ( $termsArray as $type => $langTextIds ) {
            $newTermsArray[$type] = [];
            foreach ( $langTextIds as $lang => $textIds ) {
                foreach ( (array)$textIds as $textId ) {
                    $newTermsArray[$type][] = $textInLangIds[$lang][$textId];
                }
            }
        }

        return $newTermsArray;
    }

    private function acquireTextInLangIds(
        array $langTextIds,
        ReplicaPrimaryAwareRecordIdsAcquirer $textInLangIdsAcquirer
    ) {
        $textInLangRecords = [];
        foreach ( $langTextIds as $lang => $textIds ) {
            foreach ( $textIds as $textId ) {
                $textInLangRecords[] = [ 'wbxl_text_id' => $textId, 'wbxl_language' => $lang ];
            }
        }
        $textInLangRecords = $this->filterUniqueRecords( $textInLangRecords );

        $acquiredIds = $textInLangIdsAcquirer->acquireIds( $textInLangRecords );

        $textInLangIds = [];
        foreach ( $acquiredIds as $acquiredId ) {
            $textInLangIds[$acquiredId['wbxl_language']][$acquiredId['wbxl_text_id']]
                = $acquiredId['wbxl_id'];
        }

        return $textInLangIds;
    }

    /**
     * replace root ( type => [ textInLangId, ... ] ) entries with their respective ids
     * in wbt_term_in_lang table and return resulting array
     *
     * @param array $termsArray text in lang ids per type
     *    [
     *        'type' => [ <textInLangId1>, <textInLangId2>, ... ],
     *        ...
     *    ]
     * @param ReplicaPrimaryAwareRecordIdsAcquirer $termInLangIdsAcquirer
     * @param array $idsToRestore
     *
     * @return array
     *    [
     *        <termInLang1>,
     *        <termInLang2>,
     *        ...
     *    ]
     */
    private function mapToTermInLangIds(
        array $termsArray,
        ReplicaPrimaryAwareRecordIdsAcquirer $termInLangIdsAcquirer,
        array $idsToRestore = []
    ) {
        $flattenedTypeTextInLangIds = [];
        foreach ( $termsArray as $typeId => $textInLangIds ) {
            if ( !isset( $flattenedTypeTextInLangIds[$typeId] ) ) {
                $flattenedTypeTextInLangIds[$typeId] = [];
            }

            $flattenedTypeTextInLangIds[$typeId] = array_unique(
                array_merge(
                    (array)$textInLangIds,
                    (array)$flattenedTypeTextInLangIds[$typeId]
                )
            );
        }

        $termInLangIds = $this->acquireTermInLangIdsInner(
            $flattenedTypeTextInLangIds,
            $termInLangIdsAcquirer,
            $idsToRestore
        );

        $newTermsArray = [];
        foreach ( $termsArray as $typeId => $textInLangIds ) {
            foreach ( $textInLangIds as $textInLangId ) {
                $newTermsArray[] = $termInLangIds[$typeId][$textInLangId];
            }
        }

        return $newTermsArray;
    }

    private function acquireTermInLangIdsInner(
        array $typeTextInLangIds,
        ReplicaPrimaryAwareRecordIdsAcquirer $termInLangIdsAcquirer,
        array $idsToRestore = []
    ) {
        $termInLangRecords = [];
        foreach ( $typeTextInLangIds as $typeId => $textInLangIds ) {
            foreach ( $textInLangIds as $textInLangId ) {
                $termInLangRecords[] = [
                    'wbtl_text_in_lang_id' => $textInLangId,
                    'wbtl_type_id' => (string)$typeId,
                ];
            }
        }
        $termInLangRecords = $this->filterUniqueRecords( $termInLangRecords );
        $fname = __METHOD__;

        $acquiredIds = $termInLangIdsAcquirer->acquireIds(
            $termInLangRecords,
            function ( $recordsToInsert ) use ( $idsToRestore, $fname ) {
                if ( count( $idsToRestore ) <= 0 ) {
                    return $recordsToInsert;
                }

                // @todo Remove temporary monitoring for T233414 soon
                MediaWikiServices::getInstance()->getStatsdDataFactory()->increment(
                    'wikibase.repo.term_store.tmp.ids_needing_to_reinsert'
                );

                if ( count( $idsToRestore ) !== count( $recordsToInsert ) ) {
                    // This means the ids are not the same, this can happen due to duplicate entries
                    // or a record that exist in another language and as the result doesn't get to be in $recordsToInsert
                    // @todo Make it handle such cases properly
                    $this->logger->info(
                        $fname . ': Restoring record term in lang ids failed',
                        [
                            'idsToRestore' => $idsToRestore,
                            'recordsToInsert' => $recordsToInsert,
                        ]

                    );
                    return $recordsToInsert;
                }

                return array_map(
                    function ( $record, $idToRestore ) {
                        $record['wbtl_id'] = $idToRestore;
                        return $record;
                    },
                    $recordsToInsert,
                    $idsToRestore
                );
            } );

        $termInLangIds = [];
        foreach ( $acquiredIds as $acquiredId ) {
            $termInLangIds[$acquiredId['wbtl_type_id']][$acquiredId['wbtl_text_in_lang_id']]
                = $acquiredId['wbtl_id'];
        }

        return $termInLangIds;
    }

    private function restoreCleanedUpIds( array $termsArray, array $termInLangIds = [] ) {
        $uniqueTermIds = array_values( array_unique( $termInLangIds ) );

        $dbMaster = $this->repoDb->connections()->getWriteConnection();
        $persistedTermIds = $dbMaster->newSelectQueryBuilder()
            ->select( 'wbtl_id' )
            ->from( 'wbt_term_in_lang' )
            ->where( [ 'wbtl_id' => $termInLangIds ] )
            ->caller( __METHOD__ )->fetchFieldValues();

        sort( $uniqueTermIds );
        sort( $persistedTermIds );
        $idsToRestore = array_diff( $uniqueTermIds, $persistedTermIds );

        if ( $idsToRestore ) {
            $this->mapTermsArrayToTermIds( $termsArray, $idsToRestore, true );
        }
    }

    private function mapTermsArrayToTermIds(
        array $termsArray,
        array $termInLangIdsToRestore = [],
        $ignoreReplica = false
    ): array {
        $textIdsAcquirer = new ReplicaPrimaryAwareRecordIdsAcquirer(
            $this->repoDb, 'wbt_text', 'wbx_id',
            $ignoreReplica ? ReplicaPrimaryAwareRecordIdsAcquirer::FLAG_IGNORE_REPLICA : 0x0 );
        $textInLangIdsAcquirer = new ReplicaPrimaryAwareRecordIdsAcquirer(
            $this->repoDb, 'wbt_text_in_lang', 'wbxl_id',
            $ignoreReplica ? ReplicaPrimaryAwareRecordIdsAcquirer::FLAG_IGNORE_REPLICA : 0x0 );
        $termInLangIdsAcquirer = new ReplicaPrimaryAwareRecordIdsAcquirer(
            $this->repoDb, 'wbt_term_in_lang', 'wbtl_id',
            $ignoreReplica ? ReplicaPrimaryAwareRecordIdsAcquirer::FLAG_IGNORE_REPLICA : 0x0 );

        $termsArray = $this->mapToTextIds( $termsArray, $textIdsAcquirer );
        $termsArray = $this->mapToTextInLangIds( $termsArray, $textInLangIdsAcquirer );
        $termsArray = $this->mapToTypeIds( $termsArray );

        return $this->mapToTermInLangIds( $termsArray, $termInLangIdsAcquirer, $termInLangIdsToRestore );
    }

    private function calcRecordHash( array $record ) {
        ksort( $record );
        return md5( serialize( $record ) );
    }

    private function filterUniqueRecords( array $records ) {
        $uniqueRecords = [];
        foreach ( $records as $record ) {
            $recordHash = $this->calcRecordHash( $record );
            $uniqueRecords[$recordHash] = $record;
        }

        return array_values( $uniqueRecords );
    }

}