sgammon/GUST

View on GitHub
java/gust/backend/driver/spanner/SpannerDriver.java

Summary

Maintainability
A
0 mins
Test Coverage
/*
 * Copyright © 2020, The Gust Framework Authors. All rights reserved.
 *
 * The Gust/Elide framework and tools, and all associated source or object computer code, except where otherwise noted,
 * are licensed under the Zero Prosperity license, which is enclosed in this repository, in the file LICENSE.txt. Use of
 * this code in object or source form requires and implies consent and agreement to that license in principle and
 * practice. Source or object code not listing this header, or unless specified otherwise, remain the property of
 * Elide LLC and its suppliers, if any. The intellectual and technical concepts contained herein are proprietary to
 * Elide LLC and its suppliers and may be covered by U.S. and Foreign Patents, or patents in process, and are protected
 * by trade secret and copyright law. Dissemination of this information, or reproduction of this material, in any form,
 * is strictly forbidden except in adherence with assigned license requirements.
 */
package gust.backend.driver.spanner;

import com.google.api.gax.core.CredentialsProvider;
import com.google.api.gax.rpc.TransportChannelProvider;
import com.google.cloud.grpc.GrpcTransportOptions;
import com.google.cloud.spanner.*;
import com.google.cloud.spanner.SpannerOptions;
import com.google.common.util.concurrent.ListeningScheduledExecutorService;
import com.google.protobuf.FieldMask;
import com.google.protobuf.Message;
import com.google.protobuf.util.FieldMaskUtil;
import gust.backend.model.*;
import gust.backend.runtime.Logging;
import gust.backend.runtime.ReactiveFuture;
import gust.backend.transport.GoogleAPIChannel;
import gust.backend.transport.GoogleService;
import io.micronaut.context.annotation.Context;
import io.micronaut.context.annotation.Factory;
import io.micronaut.runtime.context.scope.Refreshable;
import org.slf4j.Logger;
import tools.elide.core.*;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import javax.annotation.concurrent.Immutable;
import javax.annotation.concurrent.ThreadSafe;
import java.io.IOException;
import java.util.*;
import java.util.concurrent.TimeUnit;

import static gust.backend.driver.spanner.SpannerUtil.*;
import static com.google.common.util.concurrent.Futures.transformAsync;
import static com.google.common.util.concurrent.Futures.immediateFuture;
import static com.google.common.util.concurrent.Futures.withTimeout;
import static gust.backend.runtime.ReactiveFuture.wrap;
import static gust.backend.model.ModelMetadata.*;
import static java.lang.String.format;


/**
 * Provides a {@link DatabaseDriver} implementation which enables seamless Protocol Buffer persistence with Google Cloud
 * Spanner, for any {@link Message}-derived (schema-driven) business model in a given Gust app's ecosystem.
 *
 * <p>Model storage can be deeply customized on a per-model basis, thanks to the built-in proto annotations available
 * in <code>gust.core</code>. The Spanner adapter supports basic persistence (i.e. as a regular
 * <pre>PersistenceDriver</pre>), but also supports generic, object index-style queries.</p>
 *
 * <p>See the main {@link SpannerAdapter} for a full description of supported application-level functionality.</p>
 *
 * @param <Key> Typed {@link Message} which implements a concrete model key structure, as defined and annotated by the
 *              core Gust annotations.
 * @param <Model> Typed {@link Message} which implements a concrete model object structure, as defined and annotated by
 *              the core Gust annotations.
 * @see SpannerAdapter Main typed adapter interface for Spanner.
 * @see SpannerManager Adapter instance manager and factory.
 * @see SpannerDriverSettings Driver-level settings specific to Spanner.
 * @see gust.backend.driver.firestore.FirestoreDriver Similar driver implementation, built on top of Cloud Firestore,
 *      which itself is implemented on top of Cloud Spanner.
 */
@Immutable @ThreadSafe
@SuppressWarnings({"UnstableApiUsage", "OptionalUsedAsFieldOrParameterType"})
public final class SpannerDriver<Key extends Message, Model extends Message>
        implements DatabaseDriver<Key, Model, Struct, Mutation> {
    /** Private log pipe. */
    private static final Logger logging = Logging.logger(SpannerDriver.class);

    /** Executor service to use for async calls. */
    private final @Nonnull ListeningScheduledExecutorService executorService;

    /** Codec to use for serializing/de-serializing models. */
    private final @Nonnull ModelCodec<Model, Mutation, Struct> codec;

    /** Default database ID to interact with. */
    private final @Nonnull DatabaseId defaultDatabase;

    /** Settings for the Spanner driver. */
    private final @Nonnull SpannerDriverSettings driverSettings;

    /** Cloud Spanner client engine. */
    final @Nonnull Spanner engine;

    /** Defines generic Spanner operation-specific options. */
    interface SpannerOperationOptions extends OperationOptions {
        /** @return Database to use when connecting to Spanner. */
        default @Nonnull Optional<DatabaseId> databaseId() {
            return Optional.empty();
        }
    }

    /** Defines Spanner-specific options for all read and query operations. */
    interface SpannerReadOptions extends SpannerOperationOptions, FetchOptions {
        /** @return Manual field projection, as applicable. Defaults to any present field mask. */
        default @Nonnull Optional<FieldMask> projection() {
            return this.fieldMask();
        }
    }

    /** Defines Spanner-specific options for all write and mutation operations. */
    interface SpannerWriteOptions extends SpannerOperationOptions, WriteOptions {
        // Nothing yet.
    }

    /** Defines Spanner-specific fetch options. */
    interface SpannerFetchOptions extends SpannerReadOptions {
        /** Default set of fetch options. */
        SpannerFetchOptions DEFAULTS = new SpannerFetchOptions() {};

        /** @return Timestamp boundary for single-use reads. */
        default @Nonnull Optional<TimestampBound> timestampBound() {
            return Optional.empty();
        }
    }

    /** Defines Spanner-specific mutative write options. */
    interface SpannerMutationOptions extends SpannerWriteOptions {
        /** Default set of mutation options. */
        SpannerMutationOptions DEFAULTS = new SpannerMutationOptions() {};

        /**
         * Provides a transaction manger for a delete transaction. To activate transactions, one must also set the
         * `transactional` configuration setting to `true`.
         *
         * @return Optional containing the transaction manager to use for this operation.
         */
        default @Nonnull Optional<TransactionContext> transactionContext() {
            return Optional.empty();
        }
    }

    /** Defines Spanner-specific mutative delete options. */
    interface SpannerDeleteOptions extends SpannerWriteOptions {
        /** Default set of delete options. */
        SpannerDeleteOptions DEFAULTS = new SpannerDeleteOptions() {};
    }

    /**
     * Construct a new Spanner driver from scratch.
     *
     * @param baseOptions Base options to apply to the Spanner driver.
     * @param channelProvider Managed gRPC channel to use for Spanner RPCAPI interactions.
     * @param credentialsProvider Transport credentials provider.
     * @param defaultDatabase Default Spanner database to use and interact with.
     * @param callCredentialProvider RPC call credential provider.
     * @param transportOptions Options to apply to the transport layer.
     * @param executorService Executor service to use when executing calls.
     * @param codec Model codec to use with this driver.
     * @param driverSettings Settings for the Spanner driver itself.
     */
    private SpannerDriver(@Nonnull SpannerOptions.Builder baseOptions,
                          @Nonnull TransportChannelProvider channelProvider,
                          @Nonnull DatabaseId defaultDatabase,
                          @Nonnull Optional<CredentialsProvider> credentialsProvider,
                          @Nonnull Optional<SpannerOptions.CallCredentialsProvider> callCredentialProvider,
                          @Nonnull GrpcTransportOptions transportOptions,
                          @Nonnull ListeningScheduledExecutorService executorService,
                          @Nonnull ModelCodec<Model, Mutation, Struct> codec,
                          @Nonnull SpannerDriverSettings driverSettings) {
        this.codec = codec;
        this.defaultDatabase = defaultDatabase;
        this.executorService = executorService;
        this.driverSettings = driverSettings;
        SpannerOptions.Builder options = baseOptions
                .setChannelProvider(channelProvider)
                .setTransportOptions(transportOptions);

        callCredentialProvider.ifPresent(options::setCallCredentialsProvider);
        credentialsProvider.ifPresent((credentialProvider) -> options
                .getSpannerStubSettingsBuilder()
                .setCredentialsProvider(credentialProvider));

        if (logging.isDebugEnabled())
            logging.debug(String.format("Initializing Spanner driver with options:\n%s",
                    options.build().getService().getOptions().toString()));
        this.engine = options.build().getService();
    }

    /** Factory responsible for creating {@link SpannerDriver} instances from injected dependencies. */
    @Factory static final class SpannerDriverFactory {
        private SpannerDriverFactory() { /* Disallow construction. */ }

        /**
         * Acquire a new instance of the Spanner driver, using the specified configuration settings, and the specified
         * injected channel.
         *
         * @param baseOptions Base options to apply to the Spanner driver.
         * @param spannerChannel Managed gRPC channel provider.
         * @param credentialsProvider Transport credentials provider.
         * @param callCredentialProvider RPC call credential provider.
         * @param transportOptions Options to apply to the Spanner channel.
         * @param executorService Executor service to use when executing calls.
         * @param keyInstance Key model class we are binding this driver to.
         * @param modelInstance Default instance of the model we wish to make a driver for.
         * @param driverSettings Settings for the Spanner driver.
         * @return Spanner driver instance.
         */
        @Context
        @Refreshable
        public static @Nonnull <K extends Message, M extends Message> SpannerDriver<K, M> acquireDriver(
                @Nonnull SpannerOptions.Builder baseOptions,
                @Nonnull DatabaseId defaultDatabase,
                @Nonnull @GoogleAPIChannel(service = GoogleService.SPANNER) TransportChannelProvider spannerChannel,
                @Nonnull Optional<CredentialsProvider> credentialsProvider,
                @Nonnull Optional<SpannerOptions.CallCredentialsProvider> callCredentialProvider,
                @Nonnull GrpcTransportOptions transportOptions,
                @Nonnull ListeningScheduledExecutorService executorService,
                @SuppressWarnings("unused") @Nonnull K keyInstance,
                @Nonnull M modelInstance,
                @Nonnull SpannerDriverSettings driverSettings) {
            return new SpannerDriver<>(
                baseOptions,
                spannerChannel,
                defaultDatabase,
                credentialsProvider,
                callCredentialProvider,
                transportOptions,
                executorService,
                SpannerCodec.forModel(
                    modelInstance,
                    SpannerMutationSerializer.forModel(
                        modelInstance,
                        driverSettings
                    ),
                    SpannerStructDeserializer.forModel(
                        modelInstance,
                        driverSettings
                    )
                ),
                driverSettings
            );
        }
    }

    /** @inheritDoc */
    @Override
    public @Nonnull ListeningScheduledExecutorService executorService() {
        return executorService;
    }

    /** @inheritDoc */
    @Override
    public @Nonnull ModelCodec<Model, Mutation, Struct> codec() {
        return codec;
    }

    /** @inheritDoc */
    @Override
    public @Nonnull ReactiveFuture<Optional<Model>> retrieve(@Nonnull Key key,
                                                             @Nonnull FetchOptions options) {
        // null check all inputs
        Objects.requireNonNull(key, "Cannot fetch model with `null` for key.");
        Objects.requireNonNull(options, "Cannot fetch model without `options`.");
        enforceRole(key, DatapointType.OBJECT_KEY);
        var keyId = id(key).orElseThrow(() ->
            new IllegalArgumentException("Cannot fetch model with empty key."));

        // resolve the table where we should look for this entity
        var table = resolveTableName(key);

        // next, resolve the executor, database we should operate on, and corresponding client
        ListeningScheduledExecutorService exec = options.executorService().orElseGet(this::executorService);
        SpannerFetchOptions spannerOpts;

        if (options.getClass().isAssignableFrom(SpannerFetchOptions.class)) {
            spannerOpts = ((SpannerFetchOptions) options);
        } else {
            spannerOpts = SpannerFetchOptions.DEFAULTS;
        }

        DatabaseId db = spannerOpts.databaseId().orElse(defaultDatabase);
        var client = engine.getDatabaseClient(db);
        boolean transactional = spannerOpts.transactional().isPresent() && spannerOpts.transactional().get();

        // with the DB client in hand, resolve the raw Spanner result
        ReadContext context;
        if (spannerOpts.timestampBound().isPresent()) {
            if (transactional) {
                context = client.readOnlyTransaction(spannerOpts.timestampBound().get());
            } else {
                context = client.singleUse(spannerOpts.timestampBound().get());
            }
        } else {
            if (transactional) {
                context = client.readOnlyTransaction();
            } else {
                context = client.singleUse();
            }
        }

        // calculate the fields we should read from Spanner. because Spanner is a SQL-style DB with tables, we must
        // enumerate each field we wish to load into the result set. this set of fields can either be specified via a
        // field mask attached to the call options, or by generating a set of fields from the top-level model.
        List<String> fieldsToRead;
        if (spannerOpts.projection().isPresent()) {
            fieldsToRead = FieldMaskUtil.normalize(spannerOpts.projection().get())
                    .getPathsList();
        } else {
            fieldsToRead = calculateDefaultFields(
                this.codec().instance().getDescriptorForType(),
                driverSettings
            );
        }

        var op = wrap(context.readRowAsync(
            table,
            com.google.cloud.spanner.Key.of(id(key).orElseThrow()),
            fieldsToRead
        ));

        return wrap(transformAsync(withTimeout(op, 120, TimeUnit.SECONDS, exec), (result) -> {
            if (result == null) {
                if (logging.isDebugEnabled())
                    logging.debug("Query option result was `null`. Returning empty result.");
                return immediateFuture(Optional.empty());

            } else {
                if (logging.isDebugEnabled())
                    logging.debug("Received non-null `Struct` result from Spanner. Deserializing...");

                // deserialize the model
                var deserialized = codec.deserialize(result);
                if (logging.isDebugEnabled())
                    logging.debug(format(
                        "Found and deserialized model at ID '%s' from Spanner. Record follows:\n%s",
                        keyId,
                        deserialized));

                return immediateFuture(Optional.of(
                    spliceKey(applyMask(deserialized, options), Optional.of(key))
                ));
            }
        }, exec));
    }

    /** @inheritDoc */
    @Override
    public @Nonnull ReactiveFuture<Model> persist(@Nullable Key key,
                                                  @Nonnull Model model,
                                                  @Nonnull WriteOptions options) {
        // enforce model constraints
        Objects.requireNonNull(key, "Cannot write model with `null` for key.");
        Objects.requireNonNull(model, "Cannot write model which is, itself, `null`.");
        Objects.requireNonNull(options, "Cannot write model without `options`.");
        enforceRole(key, DatapointType.OBJECT_KEY);

        // resolve executor
        ListeningScheduledExecutorService exec = options.executorService().orElseGet(this::executorService);

        // resolve existing ID, if any. if none can be resolved, generate one, and splice it into the key, and then
        // likewise splice the key into the model. if an explicit ID is present, it is assumed that it is mounted
        // correctly on the model.
        Optional<Object> existingId = id(key);
        Object modelId = existingId.orElseGet(() -> this.generateId(model));

        if (existingId.isEmpty()) {
            spliceKey(
                model,
                Optional.of(spliceId(
                    key,
                    Optional.of(modelId)
                ))
            );
        }

        try {
            // resolve extended spanner mutation options
            SpannerMutationOptions spannerOpts;
            if (options.getClass().isAssignableFrom(SpannerMutationOptions.class)) {
                spannerOpts = ((SpannerMutationOptions) options);
            } else {
                spannerOpts = SpannerMutationOptions.DEFAULTS;
            }

            // resolve the table where we should look for this entity
            var table = resolveTableName(key);
            DatabaseId db = spannerOpts.databaseId().orElse(defaultDatabase);
            var client = engine.getDatabaseClient(db);
            boolean transactional = spannerOpts.transactional().isPresent() ?
                    spannerOpts.transactional().get() :
                    options.transactional().orElse(false);

            var writeMode = spannerOpts.writeMode().isPresent() ?
                    spannerOpts.writeMode().get() :
                    options.writeMode().orElse(WriteOptions.WriteDisposition.BLIND);

            if (logging.isDebugEnabled())
                logging.debug("Mode '{}' determined for Spanner write.", writeMode.name());

            Mutation.WriteBuilder mutation;
            switch (writeMode) {
                case BLIND: mutation = Mutation.newInsertOrUpdateBuilder(table); break;
                case MUST_EXIST: mutation = Mutation.newUpdateBuilder(table); break;
                case MUST_NOT_EXIST: mutation = Mutation.newInsertBuilder(table); break;
                default: mutation = Mutation.newReplaceBuilder(table); break;
            }

            if (codec instanceof SpannerCodec) {
                if (logging.isTraceEnabled())
                    logging.trace("Serializing model to Spanner `Mutation`: {}", model.toString());

                // fill in the mutation
                var serialized = ((SpannerCodec<Model>) codec).serialize(mutation, model);

                return wrap(exec.submit(() -> {
                    if (transactional && spannerOpts.transactionContext().isPresent()) {
                        spannerOpts.transactionContext().get()
                                .buffer(serialized);
                    } else {
                        // it's time to actually write the model
                        var write = client.writeAtLeastOnce(Collections.singleton(serialized));
                        Objects.requireNonNull(write, "write result from Spanner should never be null");
                    }
                    return model;
                }));

            } else {
                throw new IllegalStateException("Cannot serialize Spanner model without `SpannerCodec`.");
            }

        } catch (IOException ioe) {
            throw new IllegalStateException(ioe);
        }
    }

    /** @inheritDoc */
    @Override
    public @Nonnull ReactiveFuture<Key> delete(@Nonnull Key key,
                                               @Nonnull DeleteOptions baseOptions) {
        Objects.requireNonNull(key, "cannot delete null key from Spanner");
        Objects.requireNonNull(baseOptions, "cannot delete without valid Spanner options");
        enforceRole(key, DatapointType.OBJECT_KEY);
        Object keyId = id(key).orElseThrow(() ->
                new IllegalArgumentException("Cannot delete key with empty or missing ID."));

        SpannerMutationOptions options;
        if (baseOptions instanceof SpannerMutationOptions) {
            options = (SpannerMutationOptions) baseOptions;
        } else {
            options = SpannerMutationOptions.DEFAULTS;
        }

        // prep for an async delete action
        ListeningScheduledExecutorService exec = options.executorService().orElseGet(this::executorService);

        // resolve extended spanner mutation options
        SpannerDeleteOptions spannerOpts;
        if (options.getClass().isAssignableFrom(SpannerDeleteOptions.class)) {
            spannerOpts = ((SpannerDeleteOptions) options);
        } else {
            spannerOpts = SpannerDeleteOptions.DEFAULTS;
        }

        // next, resolve the table we should work with, and any override DB
        var table = resolveTableName(key);
        DatabaseId db = spannerOpts.databaseId().orElse(defaultDatabase);
        var client = engine.getDatabaseClient(db);
        boolean transactional = spannerOpts.transactional().isPresent() ?
                spannerOpts.transactional().get() :
                options.transactional().orElse(false);

        // prep the delete operation and fire it off
        var deleteOperation = Mutation.delete(table, com.google.cloud.spanner.Key.of(keyId));
        if (logging.isDebugEnabled())
            logging.debug("Deleting model at ID `{}` in table `{}`.", keyId, table);
        return wrap(exec.submit(() -> {
            if (transactional && options.transactionContext().isPresent()) {
                options.transactionContext().get().buffer(deleteOperation);
            } else {
                var result = client.write(Collections.singletonList(deleteOperation));
                Objects.requireNonNull(result, "delete result from Spanner should never be null");
            }
            return key;
        }));
    }
}