
View on GitHub


0 mins
Test Coverage

namespace Drupal\harvest;

use Contracts\FactoryInterface;
use Drupal\Core\DependencyInjection\ContainerInjectionInterface;
use Drupal\harvest\Entity\HarvestPlanRepository;
use Drupal\harvest\Entity\HarvestRunRepository;
use Drupal\harvest\Storage\HarvestHashesDatabaseTableFactory;
use Drupal\metastore\MetastoreService;
use Harvest\ETL\Factory;
use Harvest\Harvester;
use Psr\Log\LoggerInterface;
use Symfony\Component\DependencyInjection\ContainerInterface;

 * Main DKAN Harvester service.
 * Import groups of datasets from an external source, and manage existing
 * harvest plans and their dependent datasets.
class HarvestService implements ContainerInjectionInterface {

  use OrphanDatasetsProcessor;

   * Service to instantiate storage objects for Harvest plan storage.
   * @var \Contracts\FactoryInterface
   * @see \Drupal\harvest\Storage\DatabaseTableFactory
  private $storeFactory;

   * Harvest hash database table factory service.
   * @var \Contracts\FactoryInterface
  private HarvestHashesDatabaseTableFactory $hashesStoreFactory;

   * DKAN metastore service.
   * @var \Drupal\metastore\MetastoreService
  private MetastoreService $metastore;

   * Harvest plan storage repository service.
   * @var \Drupal\harvest\Entity\HarvestPlanRepository
  private HarvestPlanRepository $harvestPlanRepository;

   * Harvest run entity repository service.
   * @var \Drupal\harvest\Entity\HarvestRunRepository
  private HarvestRunRepository $runRepository;

   * DKAN logger channel.
   * @var \Psr\Log\LoggerInterface
  private LoggerInterface $logger;

   * Create.
   * @inheritdoc
  public static function create(ContainerInterface $container) {
    return new self(

   * Constructor.
  public function __construct(
    FactoryInterface $storeFactory,
    HarvestHashesDatabaseTableFactory $hashesStoreFactory,
    MetastoreService $metastore,
    HarvestPlanRepository $harvestPlansRepository,
    HarvestRunRepository $runRepository,
    LoggerInterface $loggerChannel
  ) {
    $this->storeFactory = $storeFactory;
    $this->hashesStoreFactory = $hashesStoreFactory;
    $this->metastore = $metastore;
    $this->harvestPlanRepository = $harvestPlansRepository;
    $this->runRepository = $runRepository;
    $this->logger = $loggerChannel;

   * Get harvest plan identifiers.
   * @param bool $has_run_record
   *   If true, return only harvest IDs that have been run at least once.
   * @return string[]
   *   Array of Harvest Plan IDs.
  public function getAllHarvestIds(bool $has_run_record = FALSE): array {
    // Some calling code is very particular about the output being an array,
    // both as a return value here and after json_encode(). Since the entity
    // query returns a keyed array, json_encode() will think it's an object. We
    // don't want that, so we use array_values().
    return array_values(
      $has_run_record ?
        $this->runRepository->getUniqueHarvestPlanIds() :

   * Return a harvest plan.
   * @param string $plan_id
   *   The harvest plan id.
   * @return string|null
   *   The harvest plan, if any, or NULL.
   * @throws \Exception
  public function getHarvestPlan($plan_id) {
    return $this->harvestPlanRepository->getPlanJson($plan_id);

   * Return a harvest plan object.
   * @param string $plan_id
   *   The harvest plan id.
   * @return object|null
   *   The harvest plan, if any, or NULL.
   * @throws \Exception
  public function getHarvestPlanObject($plan_id): ?object {
    return $this->harvestPlanRepository->getPlan($plan_id);

   * Register a new harvest plan.
   * @param object $plan
   *   The plan object. Must contain an 'identifier' propoerty. See
   *   components.schemas.harvestPlan within
   *   modules/harvest/docs/openapi_spec.json for the schema of a plan.
   * @return string
   *   Identifier.
   * @throws \Exception
   *   Exceptions may be thrown if validation fails.
  public function registerHarvest($plan) {
    return $this->harvestPlanRepository->storePlan($plan, $plan->identifier);

   * Deregister harvest.
   * @param string $plan_id
   *   Plan identifier.
   * @return bool
   *   Whether this happened successfully.
  public function deregisterHarvest(string $plan_id) {
    if (in_array($plan_id, $this->harvestPlanRepository->getAllHarvestPlanIds())) {
      // Remove all the support tables for this plan id.
      $this->storeFactory->getInstance('harvest_' . $plan_id . '_items')->destruct();
      // Remove the plan id from the harvest_plans table.
      return $this->harvestPlanRepository->remove($plan_id);
    return FALSE;

   * Public.
  public function revertHarvest($id) {
    $harvester = $this->getHarvester($id);
    return $harvester->revert();

   * Public.
  public function runHarvest($plan_id) {
    $harvester = $this->getHarvester($plan_id);

    $run_id = (string) time();
    $result = $harvester->harvest();

    if (empty($result['status']['extracted_items_ids'])) {
      throw new \Exception('No items found to extract, review your harvest plan.');
    $result['status']['orphan_ids'] =
      $this->getOrphanIdsFromResult($plan_id, $result['status']['extracted_items_ids']);

    $result['identifier'] = $run_id;
    $this->runRepository->storeRun($result, $plan_id, $run_id);

    return $result;

   * Get Harvest Run Info.
   * @param string $plan_id
   *   The harvest plan ID.
   * @param string $run_id
   *   The harvest run ID.
   * @return bool|string
   *   JSON-encoded run information for the given run, or FALSE if no matching
   *   runID is found.
  public function getHarvestRunInfo(string $plan_id, string $run_id): bool|string {
    if ($info = $this->runRepository->retrieveRunJson($plan_id, $run_id)) {
      return $info;
    return FALSE;

   * Get the results of a harvest run.
   * @param string $plan_id
   *   Harvest plan ID.
   * @param string $run_id
   *   Harvest run ID.
   * @return array
   *   Array of status info from the run.
  public function getHarvestRunResult(string $plan_id, string $run_id): array {
    if ($entity = $this->runRepository->loadEntity($plan_id, $run_id)) {
      return $entity->toResult();
    else {
      return [];

   * Retrieve all run results for a given plan.
   * @param string $plan_id
   *   The harvest plan identifier.
   * @return array
   *   JSON-encoded result arrays, keyed by harvest run identifier.
   * @deprecated Gather run IDs from getRunIdsForHarvest() and access specific
   *   information based on those IDs.
   * @see self::getRunIdsForHarvest()
   * @see self::getHarvestRunInfo()
  public function getAllHarvestRunInfo(string $plan_id): array {
    return $this->runRepository->retrieveAllRunsJson($plan_id);

   * Retrieve harvest run IDs for a given harvest plan.
   * @param string $plan_id
   *   The harvest plan identifier.
   * @return array
   *   Harvest run identifiers, keyed by identifier.
  public function getRunIdsForHarvest(string $plan_id): array {
    return $this->runRepository->retrieveAllRunIds($plan_id);

   * Get a harvest's most recent run identifier.
   * Since the run record id is a timestamp, we can sort on the id.
   * @param string $plan_id
   *   The harvest identifier.
   * @return string
   *   The most recent harvest run record identifier.
  public function getLastHarvestRunId(string $plan_id): string {
    $run_ids = $this->runRepository->retrieveAllRunIds($plan_id);
    return reset($run_ids);

   * Publish a harvest.
   * @param string $harvestId
   *   Harvest identifier.
   * @return array
   *   The uuids of the datasets to publish.
  public function publish(string $harvestId): array {
    return $this->bulkUpdateStatus($harvestId, 'publish');

   * Archive a harvest.
   * @param string $harvestId
   *   Harvest identifier.
   * @return array
   *   The uuids of the published datasets.
  public function archive(string $harvestId): array {
    return $this->bulkUpdateStatus($harvestId, 'archive');

   * Archive a harvest.
   * @param string $harvestId
   *   Harvest identifier.
   * @param string $method
   *   Metastore update status method - "archive" or "publish" available.
   * @return array
   *   The uuids of the published datasets.
  protected function bulkUpdateStatus(string $harvestId, string $method): array {
    if (!in_array($method, ['archive', 'publish'])) {
      throw new \OutOfRangeException("Method {$method} does not exist");

    $lastRunId = $this->getLastHarvestRunId($harvestId);
    $lastRunInfo = json_decode($this->getHarvestRunInfo($harvestId, $lastRunId));
    $status = $lastRunInfo->status ?? NULL;
    if (!isset($status->extracted_items_ids)) {
      return [];

    $updated = [];
    foreach ($status->extracted_items_ids as $datasetId) {
      // $this->publishHarvestedDataset() will return true if $datasetId
      // could be successfully published.
      $updated[] = $this->setDatasetStatus($status, $datasetId, $method) ? $datasetId : NULL;

    return array_values(array_filter($updated));

   * Use metastore service to publish a harvested item.
   * @param object $runInfoStatus
   *   Status object with run information.
   * @param string $datasetId
   *   ID to DKAN dataset.
   * @param string $method
   *   Metastore update status method - "archive" or "publish" available.
   * @return bool
   *   Whether status change action was successful.
  protected function setDatasetStatus($runInfoStatus, string $datasetId, string $method): bool {
    try {
      return isset($runInfoStatus->load) &&
        $runInfoStatus->load->{$datasetId} &&
        $runInfoStatus->load->{$datasetId} != 'FAILURE' &&
        $this->metastore->$method('dataset', $datasetId);
    catch (\Exception $e) {
      $this->logger->error("Error applying method {$method} to dataset {$datasetId}: {$e->getMessage()}");
      return FALSE;

   * Proxy to Etl Factory to validate harvest plan.
   * @param object $plan
   *   Plan.
   * @return bool
   *   TRUE if harvest plan validates. Throws exception otherwise.
  public function validateHarvestPlan($plan): bool {
    return Factory::validateHarvestPlan($plan);

   * Get a DKAN harvester instance.
   * @param string $plan_id
   *   Harvester ID.
   * @return \Harvest\Harvester
   *   Harvester object.
  private function getHarvester(string $plan_id): Harvester {
    return $this->getDkanHarvesterInstance(
      $this->storeFactory->getInstance('harvest_' . $plan_id . '_items'),

   * Get the harvester from the harvester library.
  protected function getDkanHarvesterInstance($harvestPlan, $item_store, $hash_store): Harvester {
    return new Harvester(new Factory($harvestPlan, $item_store, $hash_store));
