java/gust/backend/model/ModelAdapter.java
/*
* Copyright © 2020, The Gust Framework Authors. All rights reserved.
*
* The Gust/Elide framework and tools, and all associated source or object computer code, except where otherwise noted,
* are licensed under the Zero Prosperity license, which is enclosed in this repository, in the file LICENSE.txt. Use of
* this code in object or source form requires and implies consent and agreement to that license in principle and
* practice. Source or object code not listing this header, or unless specified otherwise, remain the property of
* Elide LLC and its suppliers, if any. The intellectual and technical concepts contained herein are proprietary to
* Elide LLC and its suppliers and may be covered by U.S. and Foreign Patents, or patents in process, and are protected
* by trade secret and copyright law. Dissemination of this information, or reproduction of this material, in any form,
* is strictly forbidden except in adherence with assigned license requirements.
*/
package gust.backend.model;
import com.google.common.base.Function;
import com.google.common.util.concurrent.AsyncFunction;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningScheduledExecutorService;
import com.google.protobuf.Message;
import gust.backend.runtime.ReactiveFuture;
import tools.elide.core.DatapointType;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.util.Objects;
import java.util.Optional;
import static java.lang.String.format;
import static gust.backend.model.ModelMetadata.*;
/**
* Specifies an adapter for data models. "Adapters" are responsible for handling data storage and recall, and generic
* model serialization and deserialization activities. Adapters are composed of a handful of components, which together
* define the functionality that composes the adapter writ-large.
*
* <p>Major components of functionality are described below:
* <ul>
* <li><b>Codec:</b> The {@link ModelCodec} is responsible for serialization and deserialization. In some cases,
* codecs can be mixed with other objects to customize how data is stored. For example, the Redis cache layer supports
* using ProtoJSON, Protobuf binary, or JVM serialization, optionally with compression. On the other hand, the
* Firestore adapter specifies its own codecs which serialize into Firestore models.</li>
* <li><b>Driver:</b> The {@link PersistenceDriver} is responsible for persisting serialized/collapsed models into
* underlying storage, deleting data recalling data via key fetches, and querying indexes to produce result-sets.</li>
* </ul></p>
*
* @see PersistenceDriver Interface which defines basic driver functionality.
* @see CacheDriver Cache-specific persistence driver support, included in this object.
* @see DatabaseAdapter Extends this interface with richer data engine features.
* @param <Key> Key type, instances of which uniquely address instances of {@code Model}.
* @param <Model> Model type which this adapter is responsible for adapting.
* @param <ReadIntermediate> Intermediate record format used by the implementation when de-serializing model instances.
* @param <WriteIntermediate> Intermediate record format used when serializing model instances for write.
*/
@SuppressWarnings("UnstableApiUsage")
public interface ModelAdapter<Key extends Message, Model extends Message, ReadIntermediate, WriteIntermediate>
extends PersistenceDriver<Key, Model, ReadIntermediate, WriteIntermediate> {
// -- Interface: Drivers -- //
/**
* Return the cache driver in use for this particular model adapter. If a cache driver is present, and active/enabled
* according to database driver settings, it will be used on read-paths (such as fetching objects by ID).
*
* @return Cache driver currently in use by this model adapter.
*/
@Nonnull Optional<CacheDriver<Key, Model>> cache();
/**
* Return the lower-level {@link PersistenceDriver} powering this adapter. The driver is responsible for communicating
* with the actual backing storage service, either via local stubs/emulators or a production API.
*
* @return Persistence driver instance currently in use by this model adapter.
*/
@Nonnull PersistenceDriver<Key, Model, ReadIntermediate, WriteIntermediate> engine();
// -- Interface: Execution -- //
/** {@inheritDoc} */
@Override
default @Nonnull ListeningScheduledExecutorService executorService() {
return engine().executorService();
}
// -- Interface: Key Generation -- //
/** {@inheritDoc} */
@Override
default @Nonnull Key generateKey(@Nonnull Message instance) {
return engine().generateKey(instance);
}
// -- Interface: Fetch -- //
/** {@inheritDoc} */
@Override
default @Nonnull ReactiveFuture<Optional<Model>> retrieve(@Nonnull Key key, @Nonnull FetchOptions options) {
enforceRole(key, DatapointType.OBJECT_KEY);
final ListeningScheduledExecutorService exec = options.executorService().orElseGet(this::executorService);
if (Internals.logging.isTraceEnabled())
Internals.logging.trace(format("Retrieving record '%s' from storage (executor: '%s')...", id(key), exec));
final Optional<CacheDriver<Key, Model>> cache = this.cache();
if (options.enableCache() && cache.isPresent()) {
if (Internals.logging.isDebugEnabled())
Internals.logging.debug(
format("Caching enabled with object of type '%s'.", cache.get().getClass().getSimpleName()));
// cache result future
final ReactiveFuture<Optional<Model>> cacheFetchFuture = Objects.requireNonNull(
cache.get().fetch(key, options, exec), "Cache cannot return `null` for `retrieve`.");
// wrap in a future, with a non-propagating cancelling timeout, which handles any nulls from the cache.
final ListenableFuture<Optional<Model>> cacheFuture = (Futures.nonCancellationPropagating(
Futures.transform(cacheFetchFuture, new Function<>() {
@Override
public @Nonnull Optional<Model> apply(@Nullable Optional<Model> cacheResult) {
if (Internals.logging.isDebugEnabled()) {
//noinspection OptionalAssignedToNull
Internals.logging.debug(
format("Received response from cache (value present: '%s').",
cacheResult == null ? "null" : cacheResult.isPresent()));
}
if (cacheResult != null && cacheResult.isPresent()) {
return cacheResult;
}
return Optional.empty(); // not found
}
}, exec)));
// wrap the cache future in a timeout function, which enforces the configured (or default) cache timeout
final ListenableFuture<Optional<Model>> limitedCacheFuture = Futures.withTimeout(
cacheFuture,
options.cacheTimeout().orElse(PersistenceDriver.DEFAULT_CACHE_TIMEOUT),
options.cacheTimeoutUnit(),
exec);
// finally, respond to a cache miss by deferring to the driver directly. this must be separate from `cacheFuture`
// to allow separate cancellation of the cache future and the future which backstops it.
return ReactiveFuture.wrap(Futures.transformAsync(limitedCacheFuture, new AsyncFunction<>() {
@Override
public @Nonnull ListenableFuture<Optional<Model>> apply(@Nullable Optional<Model> cacheResult) {
if (Internals.logging.isTraceEnabled()) {
//noinspection OptionalAssignedToNull
Internals.logging.debug(
format("Returning response from cache (value present: '%s')",
cacheResult == null ? "null" : cacheResult.isPresent()));
}
if (cacheResult != null && cacheResult.isPresent()) {
return Futures.immediateFuture(cacheResult);
} else {
var record = engine().retrieve(key, options);
record.addListener(() -> {
if (Internals.logging.isDebugEnabled()) {
Internals.logging.debug("Response was NOT cached. Storing in cache...");
}
Internals.swallowExceptions(() -> {
Optional<Model> fetchResult = record.get();
fetchResult.ifPresent(model -> cache.get().put(
key,
model,
options.executorService().orElseGet(ModelAdapter.this::executorService)));
});
}, options.executorService().orElseGet(ModelAdapter.this::executorService));
return record;
}
}
}, exec), exec);
} else {
if (Internals.logging.isDebugEnabled()) {
Internals.logging.debug("Caching is disabled. Deferring to driver.");
}
return engine().retrieve(key, options);
}
}
// -- Interface: Persist -- //
/** {@inheritDoc} */
@Override
default @Nonnull ReactiveFuture<Model> persist(@Nullable Key key,
@Nonnull Model model,
@Nonnull WriteOptions options) {
return engine().persist(key, model, options);
}
/** {@inheritDoc} */
@Override
default @Nonnull ReactiveFuture<Key> delete(@Nonnull Key key,
@Nonnull DeleteOptions options) {
ReactiveFuture<Key> op = engine().delete(key, options);
if (options.enableCache()) {
// if caching is enabled and a cache driver is present, make sure to evict any cached record behind this key.
Optional<CacheDriver<Key, Model>> cacheDriver = this.cache();
if (cacheDriver.isPresent()) {
ListeningScheduledExecutorService exec = options.executorService().orElseGet(this::executorService);
ReactiveFuture<Key> storageDelete = engine().delete(key, options);
ReactiveFuture<Key> cacheEvict = cacheDriver.get().evict(key, exec);
return ReactiveFuture.wrap(Futures.whenAllComplete(storageDelete, cacheEvict).call(() -> key, exec));
}
}
return op;
}
}