CMSgov/dpc-app

View on GitHub
dpc-queue/src/main/java/gov/cms/dpc/queue/service/DataService.java

Summary

Maintainability
A
2 hrs
Test Coverage
B
81%
package gov.cms.dpc.queue.service;

import ca.uhn.fhir.context.FhirContext;
import gov.cms.dpc.common.annotations.ExportPath;
import gov.cms.dpc.common.annotations.JobTimeout;
import gov.cms.dpc.common.logging.SplunkTimestamp;
import gov.cms.dpc.fhir.DPCResourceType;
import gov.cms.dpc.queue.IJobQueue;
import gov.cms.dpc.queue.JobStatus;
import gov.cms.dpc.queue.exceptions.DataRetrievalException;
import gov.cms.dpc.queue.exceptions.DataRetrievalRetryException;
import gov.cms.dpc.queue.models.JobQueueBatch;
import gov.cms.dpc.queue.models.JobQueueBatchFile;
import org.hl7.fhir.dstu3.model.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.inject.Inject;
import java.io.BufferedReader;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.time.OffsetDateTime;
import java.time.ZoneOffset;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.*;
import java.util.stream.Collectors;

public class DataService {

    private static final Logger LOGGER = LoggerFactory.getLogger(DataService.class);

    private final IJobQueue queue;
    private final String exportPath;
    private final FhirContext fhirContext;
    private final int jobTimeoutInSeconds;

    @Inject
    public DataService(IJobQueue queue, FhirContext fhirContext, @ExportPath String exportPath, @JobTimeout int jobTimeoutInSeconds) {
        this.queue = queue;
        this.fhirContext = fhirContext;
        this.exportPath = exportPath;
        this.jobTimeoutInSeconds = jobTimeoutInSeconds;
    }

    public Resource retrieveData(UUID organizationId, String orgNPI, String providerNPI, List<String> patientIds, DPCResourceType... resourceTypes) {
        return retrieveData(organizationId, orgNPI, providerNPI, patientIds, null, OffsetDateTime.now(ZoneOffset.UTC), null, null, resourceTypes);
    }

    /**
     * Retrieves data from BFD
     *
     * @param organizationID  UUID of organization
     * @param orgNPI          NPI of organization
     * @param providerNPI     NPI of provider
     * @param patientMBIs     List of patient String MBIs
     * @param since           Retrieve data since this date
     * @param transactionTime BFD Transaction Time
     * @param requestingIP    IP Address of request
     * @param requestUrl      URL of original request
     * @param resourceTypes   List of DPCResourceType data to retrieve
     * @return Resource
     */
    public Resource retrieveData(UUID organizationID,
                                 String orgNPI,
                                 String providerNPI,
                                 List<String> patientMBIs,
                                 OffsetDateTime since,
                                 OffsetDateTime transactionTime,
                                 String requestingIP, String requestUrl, DPCResourceType... resourceTypes) {
        UUID jobID = this.queue.createJob(organizationID, orgNPI, providerNPI, patientMBIs, List.of(resourceTypes), since, transactionTime, requestingIP, requestUrl, false, false);
        LOGGER.info("Patient everything export job created with job_id={} _since={} from requestUrl={}", jobID.toString(), since, requestUrl);
        final String eventTime = SplunkTimestamp.getSplunkTimestamp();
        LOGGER.info("dpcMetric=queueSubmitted,requestUrl={},jobID={},queueSubmitTime={}", "/Patient/$everything", jobID ,eventTime);

        Optional<List<JobQueueBatch>> optionalBatches = waitForJobToComplete(jobID, organizationID, this.queue);

        if (optionalBatches.isPresent()) {
            List<JobQueueBatch> batches = optionalBatches.get();
            List<JobQueueBatchFile> files = batches.stream().map(JobQueueBatch::getJobQueueBatchFiles).flatMap(List::stream).collect(Collectors.toList());
            if (files.size() == 1 && files.get(0).getResourceType() == DPCResourceType.OperationOutcome) {
                // An OperationOutcome (ERROR) was returned
                final String jobTime = SplunkTimestamp.getSplunkTimestamp();
                LOGGER.info("dpcMetric=jobFail,completionResult={},jobID={},jobCompleteTime={}", "FAILED", jobID, jobTime);
                return assembleOperationOutcome(batches);
            } else {
                final String jobTime = SplunkTimestamp.getSplunkTimestamp();
                LOGGER.info("dpcMetric=jobComplete,completionResult={},jobID={},jobCompleteTime={}", "COMPLETE", jobID, jobTime);
                // A normal Bundle of data was returned
                return assembleBundleFromBatches(batches, List.of(resourceTypes));
            }
        }

        final String jobTime = SplunkTimestamp.getSplunkTimestamp();
        LOGGER.info("dpcMetric=jobFail,completionResult={},jobID={},jobCompleteTime={}", "APPLICATIONERROR", jobID, jobTime);
        // No data for the batch was returned AND No OperationOutcome was created
        LOGGER.error("No data returned from queue for job, jobID: {}; jobTimeout: {}", jobID, jobTimeoutInSeconds);
        // These Exceptions are not just thrown in the application - the message is also sent in the Response payload
        throw new DataRetrievalException("Failed to retrieve data"); 
    }

    private Optional<List<JobQueueBatch>> waitForJobToComplete(UUID jobID, UUID organizationID, IJobQueue queue) {
        CompletableFuture<Optional<List<JobQueueBatch>>> dataFuture = new CompletableFuture<>();
        final ScheduledExecutorService poller = Executors.newSingleThreadScheduledExecutor();
        final ScheduledFuture<?> task = poller.scheduleAtFixedRate(() -> {
            try {
                List<JobQueueBatch> batches = getJobBatch(jobID, organizationID, queue);
                dataFuture.complete(Optional.of(batches));
            } catch (DataRetrievalRetryException e) {
                //retrying
            }
        }, 0, 250, TimeUnit.MILLISECONDS);

        // this timeout value should probably be adjusted according to the number of types being requested
        // In the case of the /Patient/*/$everything endpoint - we're getting all types back which may take longer than nother requests
        // Experimenting with increasing this threshold from 30 to 60
        dataFuture.completeOnTimeout(Optional.empty(), jobTimeoutInSeconds, TimeUnit.SECONDS);

        try {
            return dataFuture.get();
        } catch (InterruptedException | ExecutionException e) {
            return Optional.empty();
        } finally {
            task.cancel(true);
            poller.shutdown();
        }
    }

    private List<JobQueueBatch> getJobBatch(UUID jobID, UUID organizationId, IJobQueue queue) throws DataRetrievalRetryException {
        final List<JobQueueBatch> batches = queue.getJobBatches(jobID);
        if (batches.isEmpty()) {
            throw new DataRetrievalRetryException();
        }

        Set<JobStatus> jobStatusSet = batches
                .stream()
                .filter(b -> b.getOrgID().equals(organizationId))
                .filter(JobQueueBatch::isValid)
                .map(JobQueueBatch::getStatus).collect(Collectors.toSet());

        if (jobStatusSet.size() == 1 && jobStatusSet.contains(JobStatus.COMPLETED)) {
            return batches;
        } else if (jobStatusSet.contains(JobStatus.FAILED)) {
            LOGGER.error("Job failed; jobID: {}, orgID: {}", jobID, organizationId);
            throw new DataRetrievalException("Failed to retrieve batches");
        } else {
            throw new DataRetrievalRetryException();
        }
    }

    private Bundle assembleBundleFromBatches(List<JobQueueBatch> batches, List<DPCResourceType> resourceTypes) {
        if (resourceTypes == null || resourceTypes.isEmpty()) {
            throw new DataRetrievalException("Need to pass in resource types");
        }

        final Bundle bundle = new Bundle().setType(Bundle.BundleType.SEARCHSET);

        batches.stream()
                .map(JobQueueBatch::getJobQueueBatchFiles)
                .flatMap(List::stream)
                .filter(bf -> resourceTypes.contains(bf.getResourceType()))
                .forEach(batchFile -> {
                    Path path = Paths.get(String.format("%s/%s.ndjson", exportPath, batchFile.getFileName()));
                    Class<? extends Resource> typeClass = getClassForResourceType(batchFile.getResourceType());
                    addResourceEntries(typeClass, path, bundle);
                });


        // set a bundle id here? anything else?
        bundle.setId(UUID.randomUUID().toString());
        return bundle.setTotal(bundle.getEntry().size());
    }

    private Class<? extends Resource> getClassForResourceType(DPCResourceType resourceType) {
        switch (resourceType) {
            case Coverage:
                return Coverage.class;
            case ExplanationOfBenefit:
                return ExplanationOfBenefit.class;
            case Patient:
                return Patient.class;
            default:
                throw new DataRetrievalException("Unexpected resource type: " + resourceType);
        }
    }

    private void addResourceEntries(Class<? extends Resource> clazz, Path path, Bundle bundle) {
        try (BufferedReader br = Files.newBufferedReader(path)) {
            br.lines().forEach(line -> {
                Resource r = fhirContext.newJsonParser().parseResource(clazz, line);
                bundle.addEntry().setResource(r);
            });
        } catch (IOException e) {
            LOGGER.error("Unable to read resource", e);
            throw new DataRetrievalException(String.format("Unable to read resource because %s", e.getMessage()));
        }
    }

    private OperationOutcome assembleOperationOutcome(List<JobQueueBatch> batches) {
        // There is only ever 1 OperationOutcome file
        final Optional<JobQueueBatchFile> batchFile = batches.stream()
                .map(b -> b.getJobQueueFileLatest(DPCResourceType.OperationOutcome))
                .filter(Optional::isPresent)
                .map(Optional::get)
                .findFirst();

        if (batchFile.isPresent()) {
            OperationOutcome outcome = new OperationOutcome();
            Path path = Paths.get(String.format("%s/%s.ndjson", exportPath, batchFile.get().getFileName()));
            try (BufferedReader br = Files.newBufferedReader(path)) {
                br.lines()
                        .map(line -> fhirContext.newJsonParser().parseResource(OperationOutcome.class, line))
                        .map(OperationOutcome::getIssue)
                        .flatMap(List::stream)
                        .forEach(outcome::addIssue);
            } catch (IOException e) {
                LOGGER.error("Unable to read OperationOutcome", e);
                throw new DataRetrievalException(String.format("Unable to read OperationOutcome because %s", e.getMessage()));
            }

            return outcome;
        }

        LOGGER.error("No batch files found");
        throw new DataRetrievalException("Failed to retrieve operationOutcome");
    }
}