atlassian/localstack

View on GitHub
localstack/utils/kinesis/java/com/atlassian/KinesisStarter.java

Summary

Maintainability
B
4 hrs
Test Coverage
package com.atlassian;

import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;

import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration;
import com.amazonaws.services.kinesis.multilang.MultiLangDaemon;
import com.amazonaws.services.kinesis.multilang.MultiLangDaemonConfig;

/**
 * Custom extensions to <code>MultiLangDaemon</code> class from amazon-kinesis-client
 * project, introducing the following additional configuration properties:
 *
 * - dynamodbEndpoint: endpoint host (hostname:port) for DynamoDB API
 * - dynamodbProtocol: protocol for DynamoDB API (http or https)
 * - kinesisProtocol: protocol for Kinesis API (http or https)
 * - metricsLevel: level of CloudWatch metrics to report (e.g., SUMMARY or NONE)
 *
 * @author Waldemar Hummer
 */
public class KinesisStarter {

    private static final String PROP_DYNAMODB_ENDPOINT = "dynamodbEndpoint";
    private static final String PROP_DYNAMODB_PROTOCOL = "dynamodbProtocol";
    private static final String PROP_KINESIS_ENDPOINT = "kinesisEndpoint";
    private static final String PROP_KINESIS_PROTOCOL = "kinesisProtocol";
    private static final String PROP_METRICS_LEVEL = "metricsLevel";

    public static void main(String[] args) throws Exception {

        Properties props = loadProps(args[0]);

        if(props.containsKey("disableCertChecking")) {
            System.setProperty("com.amazonaws.sdk.disableCertChecking", "true");
        }

        MultiLangDaemonConfig config = new MultiLangDaemonConfig(args[0]);

        ExecutorService executorService = config.getExecutorService();
        KinesisClientLibConfiguration kinesisConfig = config.getKinesisClientLibConfiguration();

        if(props.containsKey(PROP_METRICS_LEVEL)) {
            String level = props.getProperty(PROP_METRICS_LEVEL);
            kinesisConfig = kinesisConfig.withMetricsLevel(level);
        }
        if(props.containsKey(PROP_DYNAMODB_ENDPOINT)) {
            String protocol = "http";
            if(props.containsKey(PROP_DYNAMODB_PROTOCOL)) {
                protocol = props.getProperty(PROP_DYNAMODB_PROTOCOL);
            }
            String endpoint = protocol + "://" + props.getProperty(PROP_DYNAMODB_ENDPOINT);
            kinesisConfig.withDynamoDBEndpoint(endpoint);
        }
        if(props.containsKey(PROP_KINESIS_ENDPOINT)) {
            String protocol = "http";
            if(props.containsKey(PROP_KINESIS_PROTOCOL)) {
                protocol = props.getProperty(PROP_KINESIS_PROTOCOL);
            }
            String endpoint = protocol + "://" + props.getProperty(PROP_KINESIS_ENDPOINT);
            kinesisConfig.withKinesisEndpoint(endpoint);
        }

        MultiLangDaemon daemon = new MultiLangDaemon(
            kinesisConfig,
            config.getRecordProcessorFactory(),
            executorService);

        Future<Integer> future = executorService.submit(daemon);
        System.exit(future.get());
    }
    
    private static Properties loadProps(String file) throws Exception {
        Properties props = new Properties();
        props.load(Thread.currentThread().getContextClassLoader().getResourceAsStream(file));
        return props;
    }
}