sgammon/GUST

View on GitHub
java/gust/backend/model/PersistenceDriver.java

Summary

Maintainability
A
0 mins
Test Coverage
/*
 * Copyright © 2020, The Gust Framework Authors. All rights reserved.
 *
 * The Gust/Elide framework and tools, and all associated source or object computer code, except where otherwise noted,
 * are licensed under the Zero Prosperity license, which is enclosed in this repository, in the file LICENSE.txt. Use of
 * this code in object or source form requires and implies consent and agreement to that license in principle and
 * practice. Source or object code not listing this header, or unless specified otherwise, remain the property of
 * Elide LLC and its suppliers, if any. The intellectual and technical concepts contained herein are proprietary to
 * Elide LLC and its suppliers and may be covered by U.S. and Foreign Patents, or patents in process, and are protected
 * by trade secret and copyright law. Dissemination of this information, or reproduction of this material, in any form,
 * is strictly forbidden except in adherence with assigned license requirements.
 */
package gust.backend.model;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableSet;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningScheduledExecutorService;
import com.google.errorprone.annotations.CanIgnoreReturnValue;
import com.google.protobuf.Descriptors.FieldDescriptor;
import com.google.protobuf.FieldMask;
import com.google.protobuf.Message;
import gust.backend.runtime.Logging;
import gust.backend.runtime.ReactiveFuture;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import tools.elide.core.DatapointType;
import tools.elide.core.FieldType;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import javax.annotation.OverridingMethodsMustInvokeSuper;
import javax.annotation.concurrent.Immutable;
import javax.annotation.concurrent.ThreadSafe;
import java.util.*;
import java.util.concurrent.*;

import static java.lang.String.format;
import static gust.backend.model.ModelMetadata.*;


/**
 * Describes the surface of a generic persistence driver, which is capable of accepting arbitrary structured and typed
 * business data (also called "data models"), and managing them with regard to persistent storage, which includes
 * storing them when asked, and recalling them when subsequently asked to do so.
 *
 * <p>Persistence driver implementations do not always guarantee <i>durability</i> of data. For example,
 * {@link CacheDriver} implementations are also {@link PersistenceDriver}s, and that entire class of implementations
 * does not guarantee data will be there when you ask for it <i>at all</i> (relying on cache state is generally
 * considered to be a very bad practice).</p>
 *
 * <p>Other implementation trees exist (notably, {@link DatabaseDriver}) which go the other way, and are expected to
 * guarantee durability of data across restarts, distributed systems and networks, and failure cases, as applicable.
 * Database driver implementations also support richer data storage features like querying and indexing.</p>
 *
 * @see CacheDriver <pre>`CacheDriver`</pre> for persistence drivers with volatile durability guarantees
 * @see DatabaseDriver <pre>`DatabaseDriver`</pre> for drivers with rich features and/or strong durability guarantees.
 * @param <Key> Key record type (must be annotated with model role {@code OBJECT_KEY}).
 * @param <Model> Message/model type which this persistence driver is specialized for.
 * @param <ReadIntermediate> Intermediate record format used by the underlying driver implementation during model
 *                           de-serialization.
 * @param <WriteIntermediate> Intermediate record format used by the underlying driver implementation during model
 *                           serialization.
 */
@Immutable
@ThreadSafe
@SuppressWarnings({"unused", "UnstableApiUsage"})
public interface PersistenceDriver<Key extends Message, Model extends Message, ReadIntermediate, WriteIntermediate> {
  /** Default timeout to apply when otherwise unspecified. */
  long DEFAULT_TIMEOUT = 30;

  /** Time units for {@link #DEFAULT_TIMEOUT}. */
  TimeUnit DEFAULT_TIMEOUT_UNIT = TimeUnit.SECONDS;

  /** Default timeout to apply when fetching from the cache. */
  long DEFAULT_CACHE_TIMEOUT = 5;

  /** Time units for {@link #DEFAULT_CACHE_TIMEOUT}. */
  TimeUnit DEFAULT_CACHE_TIMEOUT_UNIT = TimeUnit.SECONDS;

  /** Default model adapter internals. */
  @SuppressWarnings("SameParameterValue")
  final class Internals {
    /** Log pipe for default model adapter. */
    static final Logger logging = Logging.logger(PersistenceDriver.class);

    private Internals() { /* Disallow instantiation. */ }

    /** Runnable that might throw async exceptions. */
    @FunctionalInterface
    interface DriverRunnable {
      /**
       * Run some operation that may throw async-style exceptions.
       *
       * @throws TimeoutException The operation timed out.
       * @throws InterruptedException The operation was interrupted during execution.
       * @throws ExecutionException An execution error halted async execution.
       */
      void run() throws TimeoutException, InterruptedException, ExecutionException;
    }

    /**
     * Swallow any exceptions that occur
     *
     * @param operation Operation to run and wrap.
     */
    static void swallowExceptions(@Nonnull DriverRunnable operation) {
      try {
        operation.run();

      } catch (Exception exc) {
        Throwable inner = exc.getCause() != null ? exc.getCause() : exc;
        logging.warn(format(
          "Encountered unidentified exception '%s'. Message: '%s'.",
          exc.getClass().getSimpleName(), exc.getMessage()));

      }
    }

    /**
     * Convert async exceptions into persistence layer exceptions, according to the failure that occurred. Also print a
     * descriptive log statement.
     *
     * @param operation Operation to execute and wrap with protection.
     * @param <R> Return type for the callable operation, if applicable.
     * @return Return value of the async operation.
     */
    @CanIgnoreReturnValue
    static <R> R convertAsyncExceptions(@Nonnull Callable<R> operation) {
      try {
        return operation.call();
      } catch (InterruptedException ixe) {
        logging.warn(format("Interrupted. Message: '%s'.",
          ixe.getMessage()));
        throw PersistenceOperationFailed.forErr(PersistenceFailure.INTERRUPTED);

      } catch (ExecutionException exe) {
        Throwable inner = exe.getCause() != null ? exe.getCause() : exe;
        logging.warn(format("Encountered async exception '%s'. Message: '%s'.",
          inner.getClass().getSimpleName(), inner.getMessage()));
        throw PersistenceOperationFailed.forErr(PersistenceFailure.INTERNAL);

      } catch (TimeoutException txe) {
        throw PersistenceOperationFailed.forErr(PersistenceFailure.TIMEOUT);
      } catch (Exception exc) {
        logging.warn(format(
          "Encountered unidentified exception '%s'. Message: '%s'.",
          exc.getClass().getSimpleName(), exc.getMessage()));
        throw PersistenceOperationFailed.forErr(PersistenceFailure.INTERNAL,
          exc.getCause() != null ? exc.getCause() : exc);

      }
    }

    /**
     * Enforce that a particular model operation have the provided value present, and equal to the expected value. If
     * these expectations are violated, an exception is thrown.
     *
     * @param value Value in the option set for this method.
     * @param expected Expected value from the option set.
     * @param expectation Message to throw if the expectation is violated.
     * @param <R> Return value type - same as {@code value} and {@code expected}.
     * @return Expected value if it is equal to {@code value}.
     */
    @CanIgnoreReturnValue
    static <R> R enforceOption(@Nullable R value, @Nonnull R expected, @Nonnull String expectation) {
      if (value != null && value.equals(expected)) {
        return value;
      }
      throw new IllegalArgumentException("Operation failed: " + expectation);
    }
  }

  // -- API: Execution -- //
  /**
   * Resolve an executor service for use with this persistence driver. Operations will be executed against this as they
   * are received.
   *
   * @return Scheduled executor service.
   */
  @Nonnull ListeningScheduledExecutorService executorService();

  // -- API: Codec -- //
  /**
   * Acquire an instance of the codec used by this adapter. Codecs are either injected/otherwise provided during adapter
   * construction, or they are specified statically if the adapter depends on a specific codec.
   *
   * @return Model codec currently in use by this adapter.
   */
  @Nonnull ModelCodec<Model, WriteIntermediate, ReadIntermediate> codec();

  // -- API: Key Generation -- //
  /**
   * Generate a semi-random opaque token, usable as an ID for a newly-created entity via the model layer. In this case,
   * the ID is returned directly, so it may be used to populate a key.
   *
   * @param instance Model instance to generate an ID for.
   * @return Generated opaque string ID.
   */
  default @Nonnull String generateId(@Nonnull Message instance) {
    return UUID.randomUUID().toString();
  }

  /**
   * Generate a key for a new entity, which must be stored by this driver, but does not yet have a key. If the driver
   * does not support key generation, {@link UnsupportedOperationException} is thrown.
   *
   * <p>Generated keys are expected to be best-effort unique. Generally, Java's built-in {@link java.util.UUID} should
   * do the trick just fine. In more complex or scalable circumstances, this method can be overridden to reach out to
   * the data engine to generate a key.</p>
   *
   * @param instance Default instance of the model type for which a key is desired.
   * @return Generated key for an entity to be stored.
   */
  default @Nonnull Key generateKey(@Nonnull Message instance) {
    // enforce role, key field presence
    var descriptor = instance.getDescriptorForType();
    enforceRole(descriptor, DatapointType.OBJECT);
    var keyType = keyField(descriptor);
    if (keyType.isEmpty()) throw new MissingAnnotatedField(descriptor, FieldType.KEY);

    // convert to builder, grab field builder for key (keys must be top-level fields)
    var builder = instance.newBuilderForType();
    var keyBuilder = builder.getFieldBuilder(keyType.get().getField());
    spliceIdBuilder(keyBuilder, Optional.of(generateId(instance)));

    //noinspection unchecked
    Key obj = (Key)keyBuilder.build();
    if (Internals.logging.isDebugEnabled()) {
      Internals.logging.debug(format("Generated key for record: '%s'.", obj.toString()));
    }
    return obj;
  }

  // -- API: Projections & Field Masking -- //
  /**
   * Apply the fields from {@code source} to {@code target}, considering any provided {@link FieldMask}.
   *
   * <p>If the invoking developer chooses to provide {@code markedPaths}, they must also supply {@code markEffect}. For
   * each field encountered that matches a property path in {@code markedPaths}, {@code markEffect} is applied. This
   * happens recursively for the entire model tree of {@code source} (and, consequently, {@code target}).</p>
   *
   * <p>After all field computations are complete, the builder is built (and casted, if necessary), before being handed
   * back to invoking code.</p>
   *
   * @see FetchOptions.MaskMode Determines how "marked" fields are treated.
   * @param target Builder to set each field value on, as appropriate.
   * @param source Source instance to pull fields and field values from.
   * @param markedPaths "Marked" paths - each one will be treated, as encountered, according to {@code markEffect}.
   * @param markEffect Determines how to treat "marked" paths. See {@link FetchOptions.MaskMode} for more information.
   * @param stackPrefix Dotted stack of properties describing the path that got us to this point (via recursion).
   * @return Constructed model, after applying the provided field mask, as applicable.
   */
  default Message.Builder applyFieldsRecursive(@Nonnull Message.Builder target,
                                               @Nonnull Message source,
                                               @Nonnull Set<String> markedPaths,
                                               @Nonnull FetchOptions.MaskMode markEffect,
                                               @Nonnull String stackPrefix) {
    // otherwise, we must examine each field with a value on the `source`, checking against `markedPaths` (if present)
    // as we go. if it matches, we filter through `markEffect` before applying against `target`.
    for (Map.Entry<FieldDescriptor, Object> property : source.getAllFields().entrySet()) {
      FieldDescriptor field = property.getKey();
      boolean skip = false;
      Object value = property.getValue();
      FetchOptions.MaskMode effect = FetchOptions.MaskMode.INCLUDE.equals(markEffect) ?
        FetchOptions.MaskMode.EXCLUDE : FetchOptions.MaskMode.INCLUDE;

      String currentPath = stackPrefix.isEmpty() ? field.getName() : stackPrefix + "." + field.getName();

      boolean marked = markedPaths.contains(currentPath);
      if (!FieldDescriptor.Type.MESSAGE.equals(field.getType()) && marked) {
        // field is in the marked paths.
        effect = markEffect;
      } else if (FieldDescriptor.Type.MESSAGE.equals(field.getType())) {
        effect = FetchOptions.MaskMode.INCLUDE;  // always include messages
      }

      switch (effect) {
        case PROJECTION:
        case INCLUDE:
          if (Internals.logging.isDebugEnabled()) {
            Internals.logging.debug(format(
              "Field '%s' (%s) included because it did not violate expectation %s via field mask.",
              currentPath,
              field.getFullName(),
              markEffect.name()));
          }

          // handle recursive cases first
          if (FieldDescriptor.Type.MESSAGE.equals(field.getType())) {
            target.setField(
              field,
              applyFieldsRecursive(
                target.getFieldBuilder(field),
                (Message)value,
                markedPaths,
                markEffect,
                currentPath).build());

          } else {
            // it's a simple field value
            target.setField(field, value);
          }
          break;

        case EXCLUDE:
          if (Internals.logging.isDebugEnabled()) {
            Internals.logging.debug(format(
              "Excluded field '%s' (%s) because it did not meet expectation %s via field mask.",
              currentPath,
              field.getFullName(),
              markEffect.name()));
          }
      }
    }
    return target;
  }

  /**
   * Apply mask-related options to the provided instance. This may include re-building <i>without</i> certain fields, so
   * the instance returned may be different.
   *
   * @param instance Instance to filter based on any provided field mask.k
   * @param options Options to apply to the provided instance.
   * @return Model, post-filtering.
   */
  @VisibleForTesting
  default Model applyMask(@Nonnull Model instance, @Nonnull FetchOptions options) {
    // do we have a mask to apply? does it have fields?
    if (instance.isInitialized()
        && options.fieldMask().isPresent()
        && options.fieldMask().get().getPathsCount() > 0) {
      if (Internals.logging.isTraceEnabled())
        Internals.logging.trace(format("Found valid field mask, applying: '%s'.", options.fieldMask().get()));

      // resolve mask & mode
      FieldMask mask = options.fieldMask().get();
      FetchOptions.MaskMode maskMode = Objects.requireNonNull(options.fieldMaskMode(),
        "Cannot provide `null` for field mask mode.");

      //noinspection unchecked
      return (Model)applyFieldsRecursive(
        instance.newBuilderForType(),
        instance,
        ImmutableSet.copyOf(Objects.requireNonNull(mask.getPathsList())),
        maskMode,
        "" /* root path */).build();
    }
    if (Internals.logging.isTraceEnabled())
      Internals.logging.trace("No field mask found. Skipping mask application.");
    return instance;
  }

  // -- API: Fetch -- //
  /**
   * Synchronously retrieve a data model instance from underlying storage, addressed by its unique ID.
   *
   * <p>If the record cannot be located by the storage engine, {@code null} will be returned instead. For a safe variant
   * of this method (relying on {@link Optional}), see {@link #fetchSafe(Message)}.</p>
   *
   * <p><b>Note:</b> Asynchronous and reactive versions of this method also exist. You should always consider using
   * those if your requirements allow.</p>
   *
   * @see #fetchAsync(Message) For an async version of this method, which produces a {@link ListenableFuture}.
   * @see #fetchSafe(Message) For a safe version of this method, which uses {@link Optional} instead of null.
   * @see #fetchReactive(Message) For a reactive version of this method, which produces a {@link Publisher}.
   * @param key Key at which we should look for the requested entity, and return it if found.
   * @return Requested record, as a model instance, or {@code null} if one could not be found.
   * @throws PersistenceException If an unexpected failure occurs, of any kind, while fetching the requested instance.
   */
  default @Nullable Model fetch(@Nonnull Key key) throws PersistenceException {
    return fetch(key, FetchOptions.DEFAULTS);
  }

  /**
   * Synchronously retrieve a data model instance from underlying storage, addressed by its unique ID.
   *
   * <p>If the record cannot be located by the storage engine, {@code null} will be returned instead. For a safe
   * variant of this method (relying on {@link Optional}), see {@link #fetchSafe(Message)}}. This variant
   * additionally allows specification of {@link FetchOptions}.</p>
   *
   * <p><b>Note:</b> Asynchronous and reactive versions of this method also exist. You should always consider using
   * those if your requirements allow.</p>
   *
   * @see #fetchAsync(Message) For an async version of this method, which produces a {@link ListenableFuture}.
   * @see #fetchSafe(Message) For a safe version of this method, which uses {@link Optional} instead of null.
   * @see #fetchReactive(Message) For a reactive version of this method, which produces a {@link Publisher}.
   * @param key Key at which we should look for the requested entity, and return it if found.
   * @param options Options to apply to this individual retrieval operation.
   * @return Requested record, as a model instance, or {@code null} if one could not be found.
   * @throws InvalidModelType If the specified key type is not compatible with model-layer operations.
   * @throws PersistenceException If an unexpected failure occurs, of any kind, while fetching the requested instance.
   * @throws MissingAnnotatedField If the specified key record has no resolvable ID field.
   */
  default @Nullable Model fetch(@Nonnull Key key, @Nullable FetchOptions options) throws PersistenceException {
    Optional<Model> msg = fetchSafe(key, options);
    return msg.orElse(null);
  }

  /**
   * Safely (and synchronously) retrieve a data model instance from storage, returning {@link Optional#empty()} if it
   * cannot be located, rather than {@code null}.
   *
   * <p><b>Note:</b> Asynchronous and reactive versions of this method also exist. You should always consider using
   * those if your requirements allow. All of the reactive/async methods support null safety with {@link Optional}.</p>
   *
   * @see #fetch(Message) For a simpler, but {@code null}-unsafe version of this method.
   * @see #fetchAsync(Message) For an async version of this metho, which produces a {@link ListenableFuture}.
   * @see #fetchReactive(Message) For a reactive version of this method, which produces a {@link Publisher}.
   * @param key Key at which we should look for the requested entity, and return it if found.
   * @return Requested record, as a model instance, or {@link Optional#empty()} if it cannot be found.
   * @throws InvalidModelType If the specified key type is not compatible with model-layer operations.
   * @throws PersistenceException If an unexpected failure occurs, of any kind, while fetching the requested resource.
   * @throws MissingAnnotatedField If the specified key record has no resolvable ID field.
   */
  default @Nonnull Optional<Model> fetchSafe(@Nonnull Key key) throws PersistenceException {
    return fetchSafe(key, FetchOptions.DEFAULTS);
  }

  /**
   * Safely (and synchronously) retrieve a data model instance from storage, returning {@link Optional#empty()} if it
   * cannot be located, rather than {@code null}.
   *
   * <p>This variant additionally allows specification of {@link FetchOptions}.</p>
   *
   * <p><b>Note:</b> Asynchronous and reactive versions of this method also exist. You should always consider using
   * those if your requirements allow. All of the reactive/async methods support null safety with {@link Optional}.</p>
   *
   * @see #fetch(Message) For a simpler, but {@code null}-unsafe version of this method.
   * @see #fetchAsync(Message) For an async version of this metho, which produces a {@link ListenableFuture}.
   * @see #fetchReactive(Message) For a reactive version of this method, which produces a {@link Publisher}.
   * @param key Key at which we should look for the requested entity, and return it if found.
   * @param options Options to apply to this individual retrieval operation.
   * @return Requested record, as a model instance, or {@link Optional#empty()} if it cannot be found.
   * @throws InvalidModelType If the specified key type is not compatible with model-layer operations.
   * @throws PersistenceException If an unexpected failure occurs, of any kind, while fetching the requested resource.
   * @throws MissingAnnotatedField If the specified key record has no resolvable ID field.
   */
  @Nonnull
  default Optional<Model> fetchSafe(@Nonnull Key key, @Nullable FetchOptions options) throws PersistenceException {
    if (Internals.logging.isTraceEnabled())
      Internals.logging.trace(format("Synchronously fetching model with key '%s'. Options follow.\n%s",
        key, options));
    return Internals.convertAsyncExceptions(() -> {
      FetchOptions resolvedOptions = options != null ? options : FetchOptions.DEFAULTS;
      return this.fetchAsync(key, options).get(
        resolvedOptions.timeoutValue().orElse(DEFAULT_TIMEOUT),
        resolvedOptions.timeoutUnit().orElse(DEFAULT_TIMEOUT_UNIT));
    });
  }

  /**
   * Reactively retrieve a data model instance from storage, emitting it over a {@link Publisher} wrapped in an
   * {@link Optional}.
   *
   * <p>In other words, if the model cannot be located, exactly one {@link Optional#empty()} will be emitted over the
   * channel. If the model is successfully located and retrieved, it is emitted exactly once. See other method variants,
   * which allow specification of additional options.</p>
   *
   * <p><b>Exceptions:</b> Instead of throwing a {@link PersistenceException} as other methods do, this operation will
   * <i>emit</i> the exception over the {@link Publisher} channel instead, to enable reactive exception handling.</p>
   *
   * @see #fetch(Message) For a simple, synchronous ({@code null}-unsafe) version of this method.
   * @see #fetchAsync(Message) For an async version of this method, which produces a {@link ListenableFuture}.
   * @see #fetchReactive(Message, FetchOptions) For a variant of this method that allows specification of options.
   * @param key Key at which we should look for the requested entity, and emit it if found.
   * @return Publisher which will receive exactly-one emitted {@link Optional#empty()}, or wrapped object.
   * @throws InvalidModelType If the specified key type is not compatible with model-layer operations.
   * @throws PersistenceException If an unexpected failure occurs, of any kind, while fetching the requested resource.
   * @throws MissingAnnotatedField If the specified key record has no resolvable ID field.
   */
  default @Nonnull ReactiveFuture<Optional<Model>> fetchReactive(@Nonnull Key key) {
    return fetchReactive(key, FetchOptions.DEFAULTS);
  }

  /**
   * Reactively retrieve a data model instance from storage, emitting it over a {@link Publisher} wrapped in an
   * {@link Optional}.
   *
   * <p>In other words, if the model cannot be located, exactly one {@link Optional#empty()} will be emitted over the
   * channel. If the model is successfully located and retrieved, it is emitted exactly once. See other method variants,
   * which allow specification of additional options. This method variant additionally allows the specification of
   * {@link FetchOptions}.</p>
   *
   * <p><b>Exceptions:</b> Instead of throwing a {@link PersistenceException} as other methods do, this operation will
   * <i>emit</i> the exception over the {@link Publisher} channel instead, to enable reactive exception handling.</p>
   *
   * @see #fetch(Message) For a simple, synchronous ({@code null}-unsafe) version of this method.
   * @see #fetchAsync(Message) For an async version of this method, which produces a {@link ListenableFuture}.
   * @param key Key at which we should look for the requested entity, and emit it if found.
   * @param options Options to apply to this individual retrieval operation.
   * @return Publisher which will receive exactly-one emitted {@link Optional#empty()}, or wrapped object.
   * @throws InvalidModelType If the specified key type is not compatible with model-layer operations.
   * @throws PersistenceException If an unexpected failure occurs, of any kind, while fetching the requested resource.
   * @throws MissingAnnotatedField If the specified key record has no resolvable ID field.
   */
  default @Nonnull ReactiveFuture<Optional<Model>> fetchReactive(@Nonnull Key key, @Nullable FetchOptions options) {
    return this.fetchAsync(key, options);
  }

  /**
   * Asynchronously retrieve a data model instance from storage, which will populate the provided {@link Future} value.
   *
   * <p>All futures emitted via the persistence framework (and Gust writ-large) are {@link ListenableFuture}-compliant
   * implementations under the hood. If the requested record cannot be located, {@link Optional#empty()} is returned as
   * the future value, otherwise, the model is returned. See other method variants, which allow specification of
   * additional options.</p>
   *
   * <p><b>Exceptions:</b> Instead of throwing a {@link PersistenceException} as other methods do, this operation will
   * <i>emit</i> the exception over the {@link Future} channel instead, or raise the exception in the event
   * {@link Future#get()} is called to surface it in the invoking (or dependent) code.</p>
   *
   * @see #fetch(Message) For a simple, synchronous ({@code null}=unsafe) version of this method.
   * @see #fetchSafe(Message) For a simple, synchronous ({@code null}-safe) version of this method.
   * @see #fetchReactive(Message) For a reactive version of this method, which returns a {@link Publisher}.
   * @see #fetchAsync(Message, FetchOptions) For a variant of this method which supports {@link FetchOptions}.
   * @param key Key at which we should look for the requested entity, and emit it if found.
   * @return Future value, which resolves to the specified datamodel instance, or {@link Optional#empty()} if the record
   *         could not be located by the storage engine.
   * @throws InvalidModelType If the specified key type is not compatible with model-layer operations.
   * @throws PersistenceException If an unexpected failure occurs, of any kind, while fetching the requested resource.
   * @throws MissingAnnotatedField If the specified key record has no resolvable ID field.
   */
  default @Nonnull ReactiveFuture<Optional<Model>> fetchAsync(@Nonnull Key key) {
    return fetchAsync(key, FetchOptions.DEFAULTS);
  }

  /**
   * Asynchronously retrieve a data model instance from storage, which will populate the provided {@link Future} value.
   *
   * <p>All futures emitted via the persistence framework (and Gust writ-large) are {@link ListenableFuture}-compliant
   * implementations under the hood. If the requested record cannot be located, {@link Optional#empty()} is returned as
   * the future value, otherwise, the model is returned.</p>
   *
   * <p>This method additionally enables specification of custom {@link FetchOptions}, which are applied on a per-
   * operation basis to override global defaults.</p>
   *
   * <p><b>Exceptions:</b> Instead of throwing a {@link PersistenceException} as other methods do, this operation will
   * <i>emit</i> the exception over the {@link Future} channel instead, or raise the exception in the event
   * {@link Future#get()} is called to surface it in the invoking (or dependent) code.</p>
   *
   * @see #fetch(Message) For a simple, synchronous ({@code null}=unsafe) version of this method.
   * @see #fetchSafe(Message) For a simple, synchronous ({@code null}-safe) version of this method.
   * @see #fetchReactive(Message) For a reactive version of this method, which returns a {@link Publisher}.
   * @param key Key at which we should look for the requested entity, and emit it if found.
   * @param options Options to apply to this individual retrieval operation.
   * @return Future value, which resolves to the specified datamodel instance, or {@link Optional#empty()} if the record
   *         could not be located by the storage engine.
   * @throws InvalidModelType If the specified key type is not compatible with model-layer operations.
   * @throws PersistenceException If an unexpected failure occurs, of any kind, while fetching the requested resource.
   * @throws MissingAnnotatedField If the specified key record has no resolvable ID field.
   */
  @OverridingMethodsMustInvokeSuper
  default @Nonnull ReactiveFuture<Optional<Model>> fetchAsync(@Nonnull Key key, @Nullable FetchOptions options) {
    if (Internals.logging.isTraceEnabled())
      Internals.logging.trace(format("Fetching model with key '%s' asynchronously. Options follow.\n%s",
        key, options));
    return this.retrieve(key, options != null ? options : FetchOptions.DEFAULTS);
  }

  /**
   * Low-level record retrieval method. Effectively called by all other fetch variants. Asynchronously retrieve a data
   * model instance from storage, which will populate the provided {@link ReactiveFuture} value.
   *
   * <p>All futures emitted via the persistence framework (and Gust writ-large) are {@link ListenableFuture}-compliant
   * implementations under the hood. If the requested record cannot be located, {@link Optional#empty()} is returned as
   * the future value, otherwise, the model is returned.</p>
   *
   * <p>This method additionally enables specification of custom {@link FetchOptions}, which are applied on a per-
   * operation basis to override global defaults.</p>
   *
   * <p><b>Exceptions:</b> Instead of throwing a {@link PersistenceException} as other methods do, this operation will
   * <i>emit</i> the exception over the {@link Future} channel instead, or raise the exception in the event
   * {@link Future#get()} is called to surface it in the invoking (or dependent) code.</p>
   *
   * @see #fetch(Message) For a simple, synchronous ({@code null}=unsafe) version of this method.
   * @see #fetchSafe(Message) For a simple, synchronous ({@code null}-safe) version of this method.
   * @see #fetchAsync(Message) For an async variant of this method (identical, except options are optional).
   * @see #fetchReactive(Message) For a reactive version of this method, which returns a {@link Publisher}.
   * @param key Key at which we should look for the requested entity, and emit it if found.
   * @param options Options to apply to this individual retrieval operation.
   * @return Future value, which resolves to the specified datamodel instance, or {@link Optional#empty()} if the record
   *         could not be located by the storage engine.
   * @throws InvalidModelType If the specified key type is not compatible with model-layer operations.
   * @throws PersistenceException If an unexpected failure occurs, of any kind, while fetching the requested resource.
   * @throws MissingAnnotatedField If the specified key record has no resolvable ID field.
   */
  @Nonnull ReactiveFuture<Optional<Model>> retrieve(@Nonnull Key key, @Nonnull FetchOptions options);

  // -- API: Persist -- //
  /**
   * Create the record specified by {@code model} in underlying storage, provisioning a key or ID for the record if
   * needed. The persisted entity is returned or an error occurs.
   *
   * <p>This operation will enforce the option {@code MUST_NOT_EXIST} for the write - i.e., "creating" a record implies
   * that it must not exist beforehand. Additionally, if the record is missing a unique ID or key (one or the other must
   * be annotated on the record), then a semi-random value will be generated for the record.</p>
   *
   * <p>The returned record will be re-constituted, with the spliced-in ID or key value, as applicable, and with any
   * computed or framework-related properties filled in (i.e. automatic timestamping).</p>
   *
   * @param model Model to create in underlying storage. Requires a {@code ID} or {@code KEY}-annotated field.
   * @return Future value, which resolves to the stored model entity, affixed with an assigned ID or key.
   * @throws InvalidModelType If the specified model record is not usable with storage.
   * @throws PersistenceException If an unexpected failure occurs, of any kind, while creating the record.
   * @throws MissingAnnotatedField If a required annotated field cannot be located (i.e. {@code ID} or {@code KEY}).
   */
  default @Nonnull ReactiveFuture<Model> create(@Nonnull Model model) {
    //noinspection unchecked
    return create((Key)key(model).orElse(null), model);
  }

  /**
   * Create the record specified by {@code model} using the optional pre-fabricated {@code key}, in underlying storage.
   * If the provided key is empty or {@code null}, the engine will provision a key or ID for the record. The persisted
   * entity is returned or an error occurs.
   *
   * <p>This operation will enforce the option {@code MUST_NOT_EXIST} for the write - i.e., "creating" a record implies
   * that it must not exist beforehand. Additionally, if the record is missing a unique ID or key (one or the other must
   * be annotated on the record), then a semi-random value will be generated for the record.</p>
   *
   * <p>The returned record will be re-constituted, with the spliced-in ID or key value, as applicable, and with any
   * computed or framework-related properties filled in (i.e. automatic timestamping).</p>
   *
   * @param model Model to create in underlying storage. Requires a {@code ID} or {@code KEY}-annotated field.
   * @return Future value, which resolves to the stored model entity, affixed with an assigned ID or key.
   * @throws InvalidModelType If the specified model record is not usable with storage.
   * @throws PersistenceException If an unexpected failure occurs, of any kind, while creating the record.
   * @throws MissingAnnotatedField If a required annotated field cannot be located (i.e. {@code ID} or {@code KEY}).
   */
  default @Nonnull ReactiveFuture<Model> create(@Nullable Key key, @Nonnull Model model) {
    return create(key, model, new WriteOptions() {
      @Override
      public @Nonnull Optional<WriteDisposition> writeMode() {
        return Optional.of(WriteDisposition.MUST_NOT_EXIST);
      }
    });
  }

  /**
   * Create the record specified by {@code model} using the specified set of {@code options}, in underlying storage. If
   * the provided mode's key or ID is empty or {@code null}, the engine will provision a key or ID for the record. The
   * persisted entity is returned or an error occurs.
   *
   * <p>This operation will enforce the option {@code MUST_NOT_EXIST} for the write - i.e., "creating" a record implies
   * that it must not exist beforehand. Additionally, if the record is missing a unique ID or key (one or the other must
   * be annotated on the record), then a semi-random value will be generated for the record.</p>
   *
   * <p>The returned record will be re-constituted, with the spliced-in ID or key value, as applicable, and with any
   * computed or framework-related properties filled in (i.e. automatic timestamping).</p>
   *
   * @param model Model to create in underlying storage. Requires a {@code ID} or {@code KEY}-annotated field.
   * @return Future value, which resolves to the stored model entity, affixed with an assigned ID or key.
   * @throws InvalidModelType If the specified model record is not usable with storage.
   * @throws PersistenceException If an unexpected failure occurs, of any kind, while creating the record.
   * @throws MissingAnnotatedField If a required annotated field cannot be located (i.e. {@code ID} or {@code KEY}).
   */
  default @Nonnull ReactiveFuture<Model> create(@Nonnull Model model, @Nonnull WriteOptions options) {
    //noinspection unchecked
    return create((Key)key(model).orElse(null), model, options);
  }

  /**
   * Create the record specified by {@code model} using the optional pre-fabricated {@code key}, and making use of the
   * specified {@code options}, in underlying storage. If the provided key is empty or {@code null}, the engine will
   * provision a key or ID for the record. The persisted entity is returned or an error occurs.
   *
   * <p>This operation will enforce the option {@code MUST_NOT_EXIST} for the write - i.e., "creating" a record implies
   * that it must not exist beforehand. Additionally, if the record is missing a unique ID or key (one or the other must
   * be annotated on the record), then a semi-random value will be generated for the record.</p>
   *
   * <p>The returned record will be re-constituted, with the spliced-in ID or key value, as applicable, and with any
   * computed or framework-related properties filled in (i.e. automatic timestamping).</p>
   *
   * @param model Model to create in underlying storage. Requires a {@code ID} or {@code KEY}-annotated field.
   * @return Future value, which resolves to the stored model entity, affixed with an assigned ID or key.
   * @throws InvalidModelType If the specified model record is not usable with storage.
   * @throws PersistenceException If an unexpected failure occurs, of any kind, while creating the record.
   * @throws MissingAnnotatedField If a required annotated field cannot be located (i.e. {@code ID} or {@code KEY}).
   * @throws IllegalArgumentException If an incompatible {@link WriteOptions.WriteDisposition} value is specified.
   */
  @Nonnull
  default ReactiveFuture<Model> create(@Nullable Key key, @Nonnull Model model, @Nonnull WriteOptions options) {
    Internals.enforceOption(
      options.writeMode()
        .orElse(WriteOptions.WriteDisposition.MUST_NOT_EXIST),
      WriteOptions.WriteDisposition.MUST_NOT_EXIST,
      "Write options for `create` must specify `MUST_NOT_EXIST` write disposition.");
    return persist(key, model, options);
  }

  /**
   * Update the record specified by {@code model} in underlying storage, using the existing key or ID value affixed to
   * the model. The entity is returned in its updated form, or an error occurs.
   *
   * <p>This operation will enforce the option {@code MUST_EXIST} for the write - i.e., "updating" a record implies that
   * it must exist beforehand. This means, if the record is missing a unique ID or key (one or the other must be
   * annotated on the record), then an error occurs (specifically, either {@link MissingAnnotatedField}) for a  missing
   * schema field, or {@link IllegalStateException} for a missing required value).</p>
   *
   * <p>The returned record will be re-constituted, with the ID or key value unmodified, as applicable, and with any
   * computed or framework-related properties updated in (i.e. automatic update timestamping).</p>
   *
   * @param model Model to update in underlying storage. Requires a {@code ID} or {@code KEY}-annotated field and value.
   * @return Future value, which resolves to the stored model entity, after it has been updated.
   * @throws InvalidModelType If the specified model record is not usable with storage.
   * @throws PersistenceException If an unexpected failure occurs, of any kind, while updated the record.
   * @throws MissingAnnotatedField If a required annotated field cannot be located (i.e. {@code ID} or {@code KEY}).
   * @throws IllegalStateException If a required annotated field value cannot be resolved (i.e. an empty key or ID).
   */
  default @Nonnull ReactiveFuture<Model> update(@Nonnull Model model) {
    //noinspection unchecked
    return update(
      (Key)key(model).orElseThrow(() -> new IllegalStateException("Failed to resolve a key value for record.")),
      model);
  }

  /**
   * Update the record specified by {@code model} in underlying storage, making use of the specified {@code options},
   * using the existing key or ID value affixed to the model. The entity is returned in its updated form, or an error
   * occurs.
   *
   * <p>This operation will enforce the option {@code MUST_EXIST} for the write - i.e., "updating" a record implies that
   * it must exist beforehand. This means, if the record is missing a unique ID or key (one or the other must be
   * annotated on the record), then an error occurs (specifically, either {@link MissingAnnotatedField}) for a  missing
   * schema field, or {@link IllegalStateException} for a missing required value).</p>
   *
   * <p>The returned record will be re-constituted, with the ID or key value unmodified, as applicable, and with any
   * computed or framework-related properties updated in (i.e. automatic update timestamping).</p>
   *
   * @param model Model to update in underlying storage. Requires a {@code ID} or {@code KEY}-annotated field and value.
   * @return Future value, which resolves to the stored model entity, after it has been updated.
   * @throws InvalidModelType If the specified model record is not usable with storage.
   * @throws PersistenceException If an unexpected failure occurs, of any kind, while updated the record.
   * @throws MissingAnnotatedField If a required annotated field cannot be located (i.e. {@code ID} or {@code KEY}).
   * @throws IllegalStateException If a required annotated field value cannot be resolved (i.e. an empty key or ID).
   */
  default @Nonnull ReactiveFuture<Model> update(@Nonnull Model model, @Nonnull UpdateOptions options) {
    //noinspection unchecked
    return update(
      (Key)key(model).orElseThrow(() -> new IllegalStateException("Failed to resolve a key value for record.")),
      model,
      options);
  }

  /**
   * Update the record specified by {@code model}, and addressed by {@code key}, in underlying storage. The entity is
   * returned in its updated form, or an error occurs.
   *
   * <p>This operation will enforce the option {@code MUST_EXIST} for the write - i.e., "updating" a record implies that
   * it must exist beforehand. This means, if the record is missing a unique ID or key (one or the other must be
   * annotated on the record), then an error occurs (specifically, either {@link MissingAnnotatedField}) for a  missing
   * schema field, or {@link IllegalStateException} for a missing required value).</p>
   *
   * <p>The returned record will be re-constituted, with the ID or key value unmodified, as applicable, and with any
   * computed or framework-related properties updated in (i.e. automatic update timestamping).</p>
   *
   * @param model Model to update in underlying storage. Requires a {@code ID} or {@code KEY}-annotated field and value.
   * @return Future value, which resolves to the stored model entity, after it has been updated.
   * @throws InvalidModelType If the specified model record is not usable with storage.
   * @throws PersistenceException If an unexpected failure occurs, of any kind, while updated the record.
   * @throws MissingAnnotatedField If a required annotated field cannot be located (i.e. {@code ID} or {@code KEY}).
   * @throws IllegalStateException If a required annotated field value cannot be resolved (i.e. an empty key or ID).
   */
  default @Nonnull ReactiveFuture<Model> update(@Nonnull Key key, @Nonnull Model model) {
    return update(key, model, new UpdateOptions() {
      @Override
      public @Nonnull Optional<WriteDisposition> writeMode() {
        return Optional.of(WriteDisposition.MUST_EXIST);
      }
    });
  }

  /**
   * Update the record specified by {@code model}, and addressed by {@code key}, in underlying storage. The entity is
   * returned in its updated form, or an error occurs. This method variant additionally allows specification of custom
   * {@code options} for this individual operation.
   *
   * <p>This operation will enforce the option {@code MUST_EXIST} for the write - i.e., "updating" a record implies that
   * it must exist beforehand. This means, if the record is missing a unique ID or key (one or the other must be
   * annotated on the record), then an error occurs (specifically, either {@link MissingAnnotatedField}) for a  missing
   * schema field, or {@link IllegalStateException} for a missing required value).</p>
   *
   * <p>The returned record will be re-constituted, with the ID or key value unmodified, as applicable, and with any
   * computed or framework-related properties updated in (i.e. automatic update timestamping).</p>
   *
   * @param model Model to update in underlying storage. Requires a {@code ID} or {@code KEY}-annotated field and value.
   * @return Future value, which resolves to the stored model entity, after it has been updated.
   * @throws InvalidModelType If the specified model record is not usable with storage.
   * @throws PersistenceException If an unexpected failure occurs, of any kind, while updated the record.
   * @throws MissingAnnotatedField If a required annotated field cannot be located (i.e. {@code ID} or {@code KEY}).
   * @throws IllegalStateException If a required annotated field value cannot be resolved (i.e. an empty key or ID).
   * @throws IllegalArgumentException If an incompatible {@link WriteOptions.WriteDisposition} value is specified.
   */
  @Nonnull
  default ReactiveFuture<Model> update(@Nonnull Key key, @Nonnull Model model, @Nonnull UpdateOptions options) {
    Internals.enforceOption(
      options.writeMode().orElse(WriteOptions.WriteDisposition.MUST_EXIST),
      WriteOptions.WriteDisposition.MUST_EXIST,
      "Write options for `update` must specify `MUST_EXIST` write disposition.");
    return persist(key, model, options);
  }

  /**
   * Low-level record persistence method. Effectively called by all other create/put variants. Asynchronously write a
   * data model instance to storage, which will populate the provided {@link ReactiveFuture} value.
   *
   * <p>Optionally, a key may be provided as a nominated value to the storage engine. Whether the engine accepts
   * nominated keys is up to the implementation. In all cases, the engine must return the key used to store and address
   * the value henceforth. If the engine <i>does</i> support nominated keys, it <i>must</i> operate in an idempotent
   * manner with regard to those keys. In other words, repeated calls to create the same entity with the same key will
   * not cause spurious side-effects - only one record will be created, with the remaining calls being rejected by the
   * underlying engine.</p>
   *
   * <p>All futures emitted via the persistence framework (and Gust writ-large) are {@link ListenableFuture}-compliant
   * implementations under the hood, but {@link ReactiveFuture} allows a model-layer result to be used as a
   * {@link Future}, or a one-item reactive {@link Publisher}.</p>
   *
   * <p>This method additionally enables specification of custom {@link WriteOptions}, which are applied on a per-
   * operation basis to override global defaults.</p>
   *
   * <p><b>Exceptions:</b> Instead of throwing a {@link PersistenceException} as other methods do, this operation will
   * <i>emit</i> the exception over the {@link Future} channel instead, or raise the exception in the event
   * {@link Future#get()} is called to surface it in the invoking (or dependent) code.</p>
   *
   * @param key Key nominated by invoking code for storing this record. If no key is provided, the underlying storage
   *            engine is expected to allocate one. Where unsupported, {@link PersistenceException} will be thrown.
   * @param model Model to store at the specified key, if provided.
   * @param options Options to apply to this persist operation.
   * @return Reactive future, which resolves to the key where the provided model is now stored. In no case should this
   *         method return {@code null}. Instead, {@link PersistenceException} will be thrown.
   * @throws InvalidModelType If the specified key type is not compatible with model-layer operations.
   * @throws PersistenceException If an unexpected failure occurs, of any kind, while fetching the requested resource.
   * @throws MissingAnnotatedField If the specified key record has no resolvable ID field.
   */
  @Nonnull ReactiveFuture<Model> persist(@Nullable Key key, @Nonnull Model model, @Nonnull WriteOptions options);

  // -- API: Delete -- //
  /**
   * Delete and fully erase the record referenced by {@code key} from underlying storage, permanently. The resulting
   * future resolves to the provided key value once the operation completes. If any issue occurs (besides encountering
   * an already-deleted entity, which is not an error), an exception is raised.
   *
   * @param key Key referring to the record which should be deleted, permanently, from underlying storage.
   * @return Future, which resolves to the provided key when the operation is complete.
   * @throws InvalidModelType If the specified key type is not compatible with model-layer operations.
   * @throws PersistenceException If an unexpected failure occurs, of any kind, while deleting the requested resource.
   * @throws MissingAnnotatedField If the specified key record has no resolvable ID field.
   * @throws IllegalStateException If a required annotated field value cannot be resolved (i.e. an empty key or ID).
   */
  default @Nonnull ReactiveFuture<Key> delete(@Nonnull Key key) {
    return delete(key, DeleteOptions.DEFAULTS);
  }

  /**
   * Delete and fully erase the supplied {@code model} from underlying storage, permanently. The resulting future
   * resolves to the provided record's key value once the operation completes. If any issue occurs (besides encountering
   * an already-deleted entity, which is not an error), an exception is raised.
   *
   * @param model Model instance to delete from underlying storage.
   * @return Future, which resolves to the provided key when the operation is complete.
   * @throws InvalidModelType If the specified key type is not compatible with model-layer operations.
   * @throws PersistenceException If an unexpected failure occurs, of any kind, while deleting the requested resource.
   * @throws MissingAnnotatedField If the specified key record has no resolvable ID field.
   * @throws IllegalStateException If a required annotated field value cannot be resolved (i.e. an empty key or ID).
   */
  default @Nonnull ReactiveFuture<Key> deleteRecord(@Nonnull Model model) {
    return deleteRecord(model, DeleteOptions.DEFAULTS);
  }

  /**
   * Delete and fully erase the supplied {@code model} from underlying storage, permanently. The resulting future
   * resolves to the provided record's key value once the operation completes. If any issue occurs (besides encountering
   * an already-deleted entity, which is not an error), an exception is raised.
   *
   * @param model Model instance to delete from underlying storage.
   * @param options Options to apply to this specific delete operation.
   * @return Future, which resolves to the provided key when the operation is complete.
   * @throws InvalidModelType If the specified key type is not compatible with model-layer operations.
   * @throws PersistenceException If an unexpected failure occurs, of any kind, while deleting the requested resource.
   * @throws MissingAnnotatedField If the specified key record has no resolvable ID field.
   * @throws IllegalStateException If a required annotated field value cannot be resolved (i.e. an empty key or ID).
   */
  default @Nonnull ReactiveFuture<Key> deleteRecord(@Nonnull Model model, @Nonnull DeleteOptions options) {
    //noinspection unchecked
    return delete((Key)key(model)
        .orElseThrow(() -> new IllegalStateException("Cannot delete record with empty key/ID.")),
      options);
  }

  /**
   * Low-level record delete method. Effectively called by all other delete variants. Asynchronously and permanently
   * erase an existing data model instance from storage, addressed by its key unique key or ID.
   *
   * <p>If no key or ID field, or value, may be located, an error is raised (see below for details). This operation is
   * expected to operate in an <i>idempotent</i> manner (i.e. repeated calls with identical parameters do not yield
   * different side effects). Calls referring to an already-deleted entity should silently succeed.</p>
   *
   * <p>All futures emitted via the persistence framework (and Gust writ-large) are {@link ListenableFuture}-compliant
   * implementations under the hood, but {@link ReactiveFuture} allows a model-layer result to be used as a
   * {@link Future}, or a one-item reactive {@link Publisher}.</p>
   *
   * <p>This method additionally enables specification of custom {@link DeleteOptions}, which are applied on a per-
   * operation basis to override global defaults.</p>
   *
   * <p><b>Exceptions:</b> Instead of throwing a {@link PersistenceException} as other methods do, this operation will
   * <i>emit</i> the exception over the {@link Future} channel instead, or raise the exception in the event
   * {@link Future#get()} is called to surface it in the invoking (or dependent) code.</p>
   *
   * @param key Unique key referring to the record in storage that should be deleted.
   * @param options Options to apply to this specific delete operation.
   * @return Future value, which resolves to the deleted record's key when the operation completes.
   * @throws InvalidModelType If the specified key type is not compatible with model-layer operations.
   * @throws PersistenceException If an unexpected failure occurs, of any kind, while deleting the requested resource.
   * @throws MissingAnnotatedField If the specified key record has no resolvable ID field.
   * @throws IllegalStateException If a required annotated field value cannot be resolved (i.e. an empty key or ID).
   */
  @Nonnull ReactiveFuture<Key> delete(@Nonnull Key key, @Nonnull DeleteOptions options);
}