sgammon/GUST

View on GitHub
java/gust/backend/transport/GoogleTransportManager.java

Summary

Maintainability
A
0 mins
Test Coverage
/*
 * Copyright © 2020, The Gust Framework Authors. All rights reserved.
 *
 * The Gust/Elide framework and tools, and all associated source or object computer code, except where otherwise noted,
 * are licensed under the Zero Prosperity license, which is enclosed in this repository, in the file LICENSE.txt. Use of
 * this code in object or source form requires and implies consent and agreement to that license in principle and
 * practice. Source or object code not listing this header, or unless specified otherwise, remain the property of
 * Elide LLC and its suppliers, if any. The intellectual and technical concepts contained herein are proprietary to
 * Elide LLC and its suppliers and may be covered by U.S. and Foreign Patents, or patents in process, and are protected
 * by trade secret and copyright law. Dissemination of this information, or reproduction of this material, in any form,
 * is strictly forbidden except in adherence with assigned license requirements.
 */
package gust.backend.transport;

import com.google.api.gax.core.CredentialsProvider;
import com.google.api.gax.core.ExecutorProvider;
import com.google.api.gax.grpc.*;
import gust.Core;
import gust.backend.runtime.Logging;
import io.grpc.Channel;
import io.grpc.ClientInterceptor;
import io.grpc.ManagedChannel;
import io.micronaut.aop.MethodInvocationContext;
import io.micronaut.context.annotation.ConfigurationProperties;
import io.micronaut.context.annotation.Context;
import io.micronaut.core.annotation.AnnotationValue;
import io.micronaut.runtime.context.scope.Refreshable;
import org.slf4j.Logger;
import org.threeten.bp.Duration;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import javax.annotation.concurrent.Immutable;
import javax.annotation.concurrent.ThreadSafe;
import javax.inject.Inject;
import javax.inject.Singleton;
import java.io.IOException;
import java.util.*;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.ScheduledExecutorService;


/**
 * Supplies a {@link TransportManager} implementation for dealing with Google Cloud APIs via gRPC and Protobuf. In this
 * object, we make heavy use of the Google API Extensions for Java ("GAX"), in order to centrally manage channels, on-
 * demand, for downstream service use.
 *
 * <p>Connection instances may be requested from this object like any other transport manager, but if there is an
 * existing managed channel for the provided service ID, it will provide the existing instance (or one from a pool of
 * existing instances) rather than creating a new one.</p>
 *
 * <p>To request a connection instance from this transport manager, annotate an injectable method parameter (or
 * constructor parameter) of type {@link Channel} with {@link GoogleAPIChannel}. For example:
 * <code>
 *   public void doSomething(@GoogleAPIChannel(Service.PUBSUB) Channel pubsubChannel) {
 *     // ...
 *   }
 * </code></p>
 *
 * <p><b>Configuration:</b> Each Google service has a specified <i>name</i>, included on the docs in
 * {@link GoogleService}, at which that service may be configured in <pre>application.yml</pre>. For example, the
 * following code configures the Cloud Pubsub client's keepalive and pool size settings:
 * <code>
 *   transport:
 *     google:
 *       pubsub:
 *         poolSize: 3
 *         keepAliveTime: 15s
 *         keepAliveTimeout: 30s
 *         keepAliveWithoutCalls: true
 * </code></p>
 */
@Context
@Singleton
@Immutable
@ThreadSafe
@Refreshable
@SuppressWarnings("unused")
@ConfigurationProperties(GoogleTransportManager.CONFIG_PREFIX)  // `transport.google`
public final class GoogleTransportManager implements TransportManager<GoogleAPIChannel, GoogleService, Channel> {
  /** Prefix under which Google services may be configured. */
  public final static String CONFIG_PREFIX = ROOT_CONFIG_PREFIX + ".google";

  /** Logging pipe. */
  private final static Logger logging = Logging.logger(GoogleTransportManager.class);

  /** Maximum number of concurrent connections per service. */
  private final static int DEFAULT_MAX_POOL_SIZE = 3;

  /** Tag to append to the user-agent on outgoing calls. */
  private final static String GUST_TAG = "gust/" + Core.getGustVersion();

  /** Map of pooled connections, grouped per-service. */
  private final ConcurrentMap<GoogleService, ManagedChannelPool> poolMap;

  /** Maximum size of any one service connection pool. */
  private volatile int maxPoolSize = DEFAULT_MAX_POOL_SIZE;

  /** Executor service to use for RPC traffic. */
  private volatile @Nonnull ScheduledExecutorService executorService;

  /** Thrown when required transport configuration is missing. */
  private static final class TransportConfigMissing extends TransportException {
    /**
     * Instantiate a new `TransportConfigMissing` exception.
     *
     * @param svc Service we are missing config for.
     */
    private TransportConfigMissing(@Nonnull GoogleService svc) {
      super(String.format(
        "Google transport configuration could not be resolved for service %s.", svc.getToken()));
    }
  }

  /** Thrown when credentials are required, but missing. */
  private static final class TransportCredentialsMissing extends TransportException {
    /**
     * Instantiate a new `TransportConfigMissing` exception.
     *
     * @param svc Service we are missing config for.
     */
    private TransportCredentialsMissing(@Nonnull GoogleService svc, @Nullable IOException ioe) {
      super(String.format(
        "Google transport credentials could not be resolved for service %s.", svc.getToken()), ioe);
    }
  }

  /** Exception thrown when a channel could not be established. */
  private static final class ChannelEstablishFailed extends TransportException {
    /**
     * Instantiate a new `ChannelEstablishException` exception.
     *
     * @param svc Service that failed to establish a connection.
     * @param ioe Wrapped IO exception.
     */
    private ChannelEstablishFailed(@Nonnull GoogleService svc, @Nonnull IOException ioe) {
      super(String.format(
        "Failed to establish a connection to Google service '%s'.", svc.getToken()), ioe);
    }
  }

  /** Client header interceptor for injecting the framework `User-Agent` header. */
  @Immutable
  private static final class UAInterceptor extends GrpcHeaderInterceptor {
    /** Singleton interceptor instance. */
    private static final UAInterceptor INSTANCE = new UAInterceptor();

    private UAInterceptor() {
      super(Collections.singletonMap("user-agent", GUST_TAG));
    }
  }

  /** Wrapper object that manages a set of pooled connections. */
  @Immutable
  private static final class ManagedChannelPool {
    /** Known service we will be managing connections for. */
    private final GoogleService service;

    /** Holds the set of channels managed by this pool. */
    private final InstantiatingGrpcChannelProvider provider;

    /**
     * Initialize a new managed connection pool for the provided service configuration.
     *
     * @param maxPoolSize Maximum per-service pool size.
     * @param executorService Executor service to use for spawning work.
     * @param svc Service which we will be managing connections for.
     * @param config Configuration by which to initialize gRPC channels.
     */
    private ManagedChannelPool(int maxPoolSize,
                               @Nonnull ScheduledExecutorService executorService,
                               @Nonnull GoogleService svc,
                               @Nonnull GoogleTransportConfig config) {
      this.service = svc;
      this.provider = buildProvider(maxPoolSize, svc, executorService, config);
    }

    /**
     * Acquire a managed channel from this pool, potentially blocking until one is ready or otherwise free. If all
     * channels are busy, and the maximum number of channels has not yet been reached, one may be established fresh for
     * the invoking caller, which may also incur delays.
     *
     * @return Managed channel acquired for the service configured with this pool.
     * @throws ChannelEstablishFailed If an I/O error occurs establishing or resolving the requested channel.
     */
    @Nonnull GrpcTransportChannel acquire() throws ChannelEstablishFailed {
      try {
        if (logging.isDebugEnabled())
          logging.debug(String.format("Acquiring connection for service '%s'...", this.service.getToken()));

        // acquire the channel
        GrpcTransportChannel channel = (GrpcTransportChannel)this.provider.getTransportChannel();
        if (channel == null || channel.isShutdown())
          throw new IllegalStateException("Failed to acquire gRPC channel (or was initially shut down).");
        else if (logging.isDebugEnabled())
          logging.debug(String.format("gRPC channel acquired ('%s').", channel.getChannel().authority()));
        return channel;

      } catch (IOException ioe) {
        logging.error(String.format("Failed to establish managed channel (error: '%s').", ioe.getMessage()));
        throw new ChannelEstablishFailed(this.service, ioe);
      }
    }
  }

  /**
   * Build the provided {@link GoogleTransportConfig} into a gRPC channel provider, which applies the configuration when
   * instantiating channels according to needs invoked through the active {@link TransportManager}.
   *
   * @param maxPoolSize Maximum size of the channel pool.
   * @param service Service for which we are building a channel provider.
   * @param executorService Executor service to make use of when executing RPCs.
   * @param config gRPC configuration to apply when instantiating new channels for this service.
   * @return Pre-fabricated provider instance to use when instantiating new channels.
   */
  private static InstantiatingGrpcChannelProvider buildProvider(int maxPoolSize,
                                                                @Nonnull GoogleService service,
                                                                @Nonnull ScheduledExecutorService executorService,
                                                                @Nonnull GoogleTransportConfig config) {
    if (logging.isTraceEnabled())
      logging.trace("Setting up new gRPC channel provider...");
    // begin setting up provider
    InstantiatingGrpcChannelProvider.Builder builder = InstantiatingGrpcChannelProvider.newBuilder()
      // target
      .setEndpoint(Objects.requireNonNull(config.endpoint(),
        "Managed channel endpoint cannot be null."))

      // message sizes
      .setMaxInboundMessageSize(Objects.requireNonNull(config.maxInboundMessageSize(),
        "Maximum inbound message size cannot be null."))
      .setMaxInboundMetadataSize(Objects.requireNonNull(config.maxInboundMetadataSize(),
        "Maximum inbound metadata size cannot be null."))

      // pooling & execution
      .setPoolSize(Math.min(
        Objects.requireNonNull(config.getPoolSize(),
          "Cannot set `null` for max pool size."),
        maxPoolSize))

      .setExecutorProvider(new ExecutorProvider() {
        @Override
        public boolean shouldAutoClose() {
          return false; /* this executor is shared, so don't auto-close per-service. */
        }

        @Override
        public ScheduledExecutorService getExecutor() {
          return executorService;
        }
      });

    // interceptors
    if (logging.isTraceEnabled()) logging.trace("Applying extra interceptors...");
    Optional<List<ClientInterceptor>> extraInterceptors = config.getExtraInterceptors();
    if (extraInterceptors.isPresent()) {
      List<ClientInterceptor> extraInterceptorsList = extraInterceptors.get();
      final ArrayList<ClientInterceptor> composedInterceptors = new ArrayList<>(
        extraInterceptorsList.size() + 1);
      composedInterceptors.add(UAInterceptor.INSTANCE);
      composedInterceptors.addAll(extraInterceptorsList);
      builder.setInterceptorProvider(() -> Collections.unmodifiableList(composedInterceptors));
      if (logging.isDebugEnabled())
        logging.debug(String.format(
          "Custom interceptors added (%s), along with `UAInterceptor`.", extraInterceptorsList.size()));
    } else {
      builder.setInterceptorProvider(() -> Collections.singletonList(UAInterceptor.INSTANCE));
      if (logging.isDebugEnabled())
        logging.debug("No custom interceptors detected. Added `UAInterceptor`.");
    }

    // keepalive
    if (logging.isTraceEnabled()) logging.trace("Applying keepalive settings...");
    if (Objects.requireNonNull(config.getKeepaliveEnabled())) {
      builder
        .setKeepAliveTime(Duration.ofSeconds(config.getKeepaliveTime().getSeconds()))
        .setKeepAliveTimeout(Duration.ofSeconds(config.getKeepaliveTimeout().getSeconds()))
        .setKeepAliveWithoutCalls(config.getKeepAliveNoActivity());
      if (logging.isDebugEnabled())
        logging.debug(String.format(
          "Managed channel keepalive is ENABLED with (%s) time, (%s) timeout.",
          config.getKeepaliveTime().toString(),
          config.getKeepaliveTimeout().toString()));
    } else {
      if (logging.isDebugEnabled())
        logging.debug("Managed channel keepalive is disabled.");
    }

    // credentials
    if (logging.isTraceEnabled()) logging.trace("Applying credential settings...");
    Optional<CredentialsProvider> maybeProvider = config.credentialsProvider();
    if (Objects.requireNonNull(config.requiresCredentials()) && !maybeProvider.isPresent()) {
      logging.error(String.format(
        "Failed to initialize gRPC service '%s': credentials were required, but could not be obtained.",
        service.getToken()));
      throw new TransportCredentialsMissing(service, null);
    }
    if (maybeProvider.isPresent()) {
      if (logging.isTraceEnabled()) logging.trace("Found credential provider. Resolving...");
      try {
        builder.setCredentials(maybeProvider.get().getCredentials());
        if (logging.isDebugEnabled()) logging.debug("Credentials were resolved and attached.");
      } catch (IOException ioe) {
        logging.error(String.format(
          "Failed to initialize gRPC service '%s': credentials were specified, but failed to load.",
          service.getToken()));
        throw new TransportCredentialsMissing(service, ioe);
      }
    }

    // connection priming
    if (Objects.requireNonNull(config.enablePrimer())) {
      if (logging.isDebugEnabled()) logging.debug("Connection priming ENABLED.");
      builder.setChannelPrimer(managedChannel -> primeManagedChannel(service, config, managedChannel));
    } else if (logging.isDebugEnabled()) {
      logging.debug("Connection priming DISABLED.");
    }
    return builder.build();
  }

  /**
   * Prime a managed gRPC channel, once it has been established by the underlying GAX tooling.
   *
   * @param service Service which we are instantiating a channel for.
   * @param config Configuration for our gRPC service channel.
   * @param channel Instantiated/established connection and higher-order RPC channel.
   */
  private static void primeManagedChannel(@Nonnull GoogleService service,
                                          @Nonnull GoogleTransportConfig config,
                                          @Nonnull ManagedChannel channel) {
    throw new IllegalStateException("channel priming is not yet supported");
  }

  /**
   * Initialize a new Google Transport Manager.
   *
   * @param executorService Scheduled executor against which to execute RPC traffic.
   */
  @Inject
  GoogleTransportManager(@Nonnull ScheduledExecutorService executorService) {
    if (logging.isTraceEnabled())
      logging.trace(String.format("Initializing `GoogleTransportManager` (version tag: '%s').", GUST_TAG));
    if (logging.isDebugEnabled())
      logging.debug(String.format("`GoogleTransportManager` executor: '%s'.", executorService.getClass().getName()));
    this.poolMap = new ConcurrentSkipListMap<>();  // initialize pool map
    this.executorService = executorService;
  }

  /**
   * Generate or otherwise resolve a transport-layer configuration for the provided <pre>service</pre>. This includes
   * stuff like the actual endpoint to connect to, keepalive configuration, retries, pooling, and so on.
   *
   * @param service Service for which we should generate or acquire a transport config.
   * @return Transport configuration for the specified service.
   * @throws TransportConfigMissing If configuration cannot be resolved.
   */
  private static @Nonnull GoogleTransportConfig configForService(@Nonnull GoogleService service)
      throws TransportConfigMissing {
    if (logging.isTraceEnabled())
      logging.trace(String.format("Resolving configuration type for Google service '%s'...", service.getToken()));
    Optional<Class<GoogleTransportConfig>> cfgType = service.getConfigType();
    if (cfgType.isPresent()) {
      Class<GoogleTransportConfig> cfgTypeClass = cfgType.get();
      if (logging.isDebugEnabled())
        logging.debug(String.format("Configuration type resolved as '%s'.", cfgType.get().getName()));
      // create config instance
      try {
        return cfgTypeClass.newInstance();
      } catch (InstantiationException | IllegalAccessException err) {
        logging.error(
          String.format("Failed to resolve configuration for Google service '%s'.", service.getToken()));
        throw new TransportConfigMissing(service);
      }
    }
    throw new IllegalStateException(
      "Failed to resolve configuration type for service '%s'. It may not be implemented.");
  }

  /**
   * Resolve a managed channel for the provided Google API service, according to the provided transport configuration,
   * which contains transport-layer settings to apply when establishing new connections for this service.
   *
   * @param service Service to establish or otherwise resolve a connection for.
   * @param config Configuration to apply when establishing connections for this service.
   * @return Managed channel, established for the provided service.
   * @throws ChannelEstablishFailed If a channel cannot be established.
   */
  private @Nonnull GrpcTransportChannel resolveChannel(@Nonnull GoogleService service,
                                                       @Nonnull GoogleTransportConfig config) throws TransportException {
    if (logging.isTraceEnabled())
      logging.trace(String.format("Resolving managed gRPC channel for Google service '%s'.", service.getToken()));

    final @Nonnull ManagedChannelPool pool;
    if (!poolMap.containsKey(service)) {
      if (logging.isDebugEnabled()) logging.debug(String.format(
          "Connection pool not found for service '%s'. Establishing...", service.getToken()));

      // establish initial pool of connections
      pool = new ManagedChannelPool(maxPoolSize, executorService, service, config);
      poolMap.put(service, pool);
    } else {
      if (logging.isTraceEnabled()) logging.trace(String.format(
        "Connection pool found for service '%s'. Using existing.", service.getToken()));
      pool = Objects.requireNonNull(poolMap.get(service));
    }
    return pool.acquire();
  }

  // -- Getters & Setters -- //

  /** @return Maximum size of any one service connection pool. */
  public int getMaxPoolSize() {
    return maxPoolSize;
  }

  // -- Public API -- //
  /**
   * Acquire a connection from this transport manager. The connection provided may or may not be freshly-created,
   * depending on the underlying implementation, but it should never be <pre>null</pre> (exceptions are raised instead).
   *
   * @param type Type of connection to acquire. Defined by the implementation.
   * @return Connection instance for the desired service, potentially fresh, potentially re-used.
   * @throws TransportException If the connection could not be acquired.
   */
  @Override
  public @Nonnull Channel acquire(@Nonnull GoogleService type) throws TransportException {
    //noinspection ConstantConditions
    if (type == null) throw new IllegalArgumentException("Cannot resolve connection for `null` service.");
    return resolveChannel(type, configForService(type)).getChannel();
  }

  /**
   * Provide acquired connections via injection-annotated {@link Channel} method or constructor parameters. This
   * essentially proxies to {@link #acquire(GoogleService)}, passing in the service specified in the context of the
   * {@link GoogleAPIChannel} annotation.
   *
   * @param context Intercepted method execution context.
   * @return Resulting object.
   * @throws TransportException If the connection could not be acquired.
   */
  @Override
  public Channel intercept(MethodInvocationContext<Object, Channel> context) {
    if (!context.hasAnnotation(GoogleAPIChannel.class)) {
      throw new IllegalArgumentException(
        "Must annotate method with @GoogleAPIChannel to inject from GoogleTransportManager.");
    }
    // fetch annotation and resolve desired service
    @Nonnull AnnotationValue<GoogleAPIChannel> anno = Objects.requireNonNull(
      context.getAnnotation(GoogleAPIChannel.class));
    Optional<GoogleService> desiredService = anno.enumValue("service", GoogleService.class);
    if (!desiredService.isPresent())
      throw new IllegalArgumentException("Must provide desired service to GoogleTransportManager.");
    return acquire(desiredService.get());
  }
}