CloudSlang/cs-actions

View on GitHub
cs-azure/src/main/java/io/cloudslang/content/azure/services/StreamingInputJobImpl.java

Summary

Maintainability
B
5 hrs
Test Coverage
/*
 * Copyright 2024 Open Text
 * This program and the accompanying materials
 * are made available under the terms of the Apache License v2.0 which accompany this distribution.
 *
 * The Apache License is available at
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */



package io.cloudslang.content.azure.services;

import com.fasterxml.jackson.databind.MapperFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.cloudslang.content.azure.entities.CreateStreamingInputJobInputs;
import io.cloudslang.content.azure.entities.models.streamanalytics.CreateStreamingInputJobRequestBody;
import io.cloudslang.content.azure.utils.HttpUtils;
import io.cloudslang.content.httpclient.entities.HttpClientInputs;
import io.cloudslang.content.httpclient.services.HttpClientService;
import org.apache.http.client.utils.URIBuilder;
import org.jetbrains.annotations.NotNull;
import org.json.simple.JSONArray;

import java.util.Map;

import static io.cloudslang.content.azure.utils.Constants.Common.ANONYMOUS;
import static io.cloudslang.content.azure.utils.Constants.Common.CONTENT_TYPE;
import static io.cloudslang.content.azure.utils.Constants.Common.DEFAULT_DATE_FORMAT;
import static io.cloudslang.content.azure.utils.Constants.Common.DEFAULT_TIME_FORMAT;
import static io.cloudslang.content.azure.utils.Constants.Common.ENCODING;
import static io.cloudslang.content.azure.utils.Constants.Common.FIELD_DELIMETER;
import static io.cloudslang.content.azure.utils.Constants.Common.PATH_PATTERN;
import static io.cloudslang.content.azure.utils.Constants.Common.PUT;
import static io.cloudslang.content.azure.utils.Constants.Common.SET_TYPE;
import static io.cloudslang.content.azure.utils.Constants.Common.TYPE;
import static io.cloudslang.content.azure.utils.Constants.CreateStreamingInputJobConstants.DEFAULT_SOURCE_TYPE;
import static io.cloudslang.content.azure.utils.Constants.DEFAULT_RESOURCE;
import static io.cloudslang.content.azure.utils.Constants.INPUTS_JOBS_PATH;
import static io.cloudslang.content.azure.utils.Constants.RESOURCE_GROUPS_PATH;
import static io.cloudslang.content.azure.utils.Constants.STREAMING_JOBS_PATH;
import static io.cloudslang.content.azure.utils.Constants.STREAM_ANALYTICS_PATH;
import static io.cloudslang.content.azure.utils.Constants.SUBSCRIPTION_PATH;
import static io.cloudslang.content.azure.utils.HttpUtils.getAuthHeaders;
import static io.cloudslang.content.azure.utils.HttpUtils.setAPIVersion;

public class StreamingInputJobImpl {

    @NotNull
    public static Map<String, String> CreateInputJob(@NotNull final CreateStreamingInputJobInputs createStreamingInputJobInputs)
            throws Exception {
        final HttpClientInputs httpClientInputs = new HttpClientInputs();
        httpClientInputs.setUrl(getCreateInputStreamingJobUrl(createStreamingInputJobInputs.getAzureCommonInputs().getSubscriptionId(), createStreamingInputJobInputs.getAzureCommonInputs().getResourceGroupName(),
                createStreamingInputJobInputs.getJobName(), createStreamingInputJobInputs.getInputName(), createStreamingInputJobInputs.getAzureCommonInputs().getApiVersion()));
        httpClientInputs.setAuthType(ANONYMOUS);
        httpClientInputs.setMethod(PUT);
        httpClientInputs.setContentType(CONTENT_TYPE);
        httpClientInputs.setHeaders(getAuthHeaders(createStreamingInputJobInputs.getAzureCommonInputs().getAuthToken()));
        HttpUtils.setCommonHttpInputs(httpClientInputs, createStreamingInputJobInputs.getAzureCommonInputs());
        httpClientInputs.setQueryParams(setAPIVersion(createStreamingInputJobInputs.getAzureCommonInputs().getApiVersion()));
        httpClientInputs.setBody(createStreamingInputJobRequestBody(createStreamingInputJobInputs));
        return new HttpClientService().execute(httpClientInputs);

    }

    @NotNull
    private static String getCreateInputStreamingJobUrl(String subscriptionId, String resourceGroupName, String jobName, String inputName, String apiVersion) throws Exception {
        final URIBuilder uriBuilder = new URIBuilder(createStreamingInputJobPath(subscriptionId, resourceGroupName, jobName, inputName, apiVersion));
        //from httpclient 4.5.13 the setPath method is adding one extra / at the start of the URI instead it can be given directly to the constructor
        //uriBuilder.setPath(createStreamingInputJobPath(subscriptionId, resourceGroupName, jobName, inputName, apiVersion));
        return uriBuilder.build().toURL().toString();
    }

    @NotNull
    private static String createStreamingInputJobPath(String subscriptionId, String resourceGroupName, String jobName, String inputName, String apiVersion) {
        StringBuilder pathString = new StringBuilder()
                .append(DEFAULT_RESOURCE)
                .append(SUBSCRIPTION_PATH)
                .append(subscriptionId)
                .append(RESOURCE_GROUPS_PATH)
                .append(resourceGroupName)
                .append(STREAM_ANALYTICS_PATH)
                .append(STREAMING_JOBS_PATH)
                .append(jobName)
                .append(INPUTS_JOBS_PATH)
                .append(inputName);
        return pathString.toString();

    }

    private static String createStreamingInputJobRequestBody(@NotNull final CreateStreamingInputJobInputs inputs) throws Exception {
        CreateStreamingInputJobRequestBody createStreamingOutputJobRequestBody = new CreateStreamingInputJobRequestBody();
        CreateStreamingInputJobRequestBody.Properties properties = new CreateStreamingInputJobRequestBody.Properties();
        CreateStreamingInputJobRequestBody.Datasource datasource = new CreateStreamingInputJobRequestBody.Datasource();
        CreateStreamingInputJobRequestBody.Serialization serialization = new CreateStreamingInputJobRequestBody.Serialization();
        CreateStreamingInputJobRequestBody.SerializationProperties serializationprop = new CreateStreamingInputJobRequestBody.SerializationProperties();
        CreateStreamingInputJobRequestBody.Datasource.SubProperties.StorageAccounts storage = new CreateStreamingInputJobRequestBody.Datasource.SubProperties.StorageAccounts();
        CreateStreamingInputJobRequestBody.Datasource.SubProperties subproperties = new CreateStreamingInputJobRequestBody.Datasource.SubProperties();
        properties.setSourceType(DEFAULT_SOURCE_TYPE);
        datasource.setType(SET_TYPE);
        properties.setDatasource(datasource);
        storage.setAccountName(inputs.getAccountName());
        storage.setAccountKey(inputs.getAccountKey());
        JSONArray json = new JSONArray();
        json.add(storage);
        subproperties.setStrogeaccounts(json);
        datasource.setProperties(subproperties);
        subproperties.setContainer(inputs.getContainerName());
        subproperties.setPathPattern(PATH_PATTERN);
        subproperties.setDateFormat(DEFAULT_DATE_FORMAT);
        subproperties.setTimeFormat(DEFAULT_TIME_FORMAT);
        serializationprop.setFieldDelimiter(FIELD_DELIMETER);
        serializationprop.setEncoding(ENCODING);
        serialization.setType(TYPE);
        serialization.setProperties(serializationprop);
        properties.setSerialization(serialization);
        createStreamingOutputJobRequestBody.setProperties(properties);

        ObjectMapper createInstanceMapper = new ObjectMapper();
        createInstanceMapper.disable(MapperFeature.CAN_OVERRIDE_ACCESS_MODIFIERS);

        return createInstanceMapper.writeValueAsString(createStreamingOutputJobRequestBody);

    }
}