migraf/fhir-kindling

View on GitHub
fhir_kindling/fhir_server/transfer.py

Summary

Maintainability
B
6 hrs
Test Coverage
from __future__ import annotations

from typing import TYPE_CHECKING, List, OrderedDict, Tuple, Union

import networkx as nx
import orjson
from fhir.resources import FHIRAbstractModel, construct_fhir_element
from fhir.resources.resource import Resource
from pydantic import ValidationError
from tqdm.autonotebook import tqdm

from fhir_kindling.fhir_query import FhirQuerySync
from fhir_kindling.fhir_server.server_responses import (
    ResourceCreateResponse,
    TransferResponse,
)
from fhir_kindling.util.references import check_missing_references, extract_references

if TYPE_CHECKING:
    from fhir_kindling.fhir_server import FhirServer


def transfer(
    source: "FhirServer",
    target: "FhirServer",
    resources: List[Union[Resource, FHIRAbstractModel]] = None,
    query: FhirQuerySync = None,
    get_missing: bool = True,
    record_linkage: bool = True,
    display: bool = True,
) -> TransferResponse:
    """
    Transfer a list of resources from one server to another.

    Args:
        source: The server to transfer the resources from.
        target: The server to transfer the resources to.
        resources: A list of resources to transfer.
        query: A FhirQuerySync object to use to query the source server.
        get_missing: Whether to get missing resources from the source server.
        record_linkage: Whether to record the linkage between the source and target resources.
        display: Whether to display a progress bar.
    """

    # get the resources to transfer, including missing references
    transfer_resources = _get_transfer_resources(source, resources, query, get_missing)
    transfer_graph = reference_graph(transfer_resources)

    # process the graph to create the resources on the target server
    create_responses, linkage = resolve_reference_graph(
        transfer_graph, target, record_linkage, display
    )

    return TransferResponse(
        origin_server=source.api_address,
        destination_server=target.api_address,
        create_responses=create_responses,
        linkage=linkage,
    )


def reference_graph(resources: List[Union[Resource, FHIRAbstractModel]]) -> nx.DiGraph:
    """
    Creates a graph of the references in a list of resources.

    Args:
        resources: List of resource to create the graph from.

    Returns:
        A directed graph depicting the references in the resources.
    """
    dg = nx.DiGraph()
    for resource in resources:
        path = resource.relative_path()
        if path in dg:
            dg.nodes[path]["resource"] = resource
        else:
            dg.add_node(path, resource=resource)
        for reference in extract_references(resource):
            reference_path = f"{reference[1]}/{reference[2]}"
            if reference_path not in dg:
                dg.add_node(reference_path, resource=None)
            dg.add_edge(
                reference_path, path, field=reference[0], list_field=reference[3]
            )
    return dg


def resolve_reference_graph(
    graph: nx.DiGraph,
    target: "FhirServer",
    record_linkage: bool,
    display: bool,
) -> Tuple[List[ResourceCreateResponse], dict]:
    nodes = list(graph.nodes)

    linkage = {}
    create_responses = []
    # iterate over the graph and update the references off all successors node to match the newly created resources
    # on the target server

    with tqdm(total=len(nodes), disable=not display) as pbar:
        while len(nodes) > 0:
            top_nodes = [
                node for node in nodes if len(list(graph.predecessors(node))) == 0
            ]

            # get the resources from the graph

            resources = []

            for node in top_nodes:
                try:
                    resources.append(_resource_from_graph_node(graph, node))
                except Exception as e:
                    print(e)
                    print(node)
                    raise e
            # insert the resources into the target server
            create_response = target.add_all(resources=resources)

            # iterate through the top nodes and update the successors with the references from the target server
            for node, reference in zip(top_nodes, create_response.references):
                if record_linkage:
                    hash_origin = hash(node)
                    linkage[hash_origin] = reference.reference
                _update_successors(graph, node, reference.reference)

            # add the create response to the list of responses
            create_responses.extend(create_response.create_responses)

            # remove the processed top nodes from the graph
            graph.remove_nodes_from(top_nodes)
            nodes = list(graph.nodes)
            pbar.update(len(top_nodes))

    return create_responses, linkage


def _update_successors(graph: nx.DiGraph, node: str, reference: str):
    """
    Update the successors of a node in a graph with the updated reference from the new server.

    Args:
        graph: The graph to update.
        node: The node to update.
        reference: The reference to update the node with.
    """
    for successor in graph.successors(node):
        field = graph[node][successor]["field"]
        list_field = graph[node][successor]["list_field"]
        resource = graph.nodes[successor]["resource"]

        if list_field:
            # Find the item that references the node
            reference_list = graph.nodes[successor]["resource"].dict()[field]
            reference_item = next(
                (item for item in reference_list if item.get("reference") == str(node)),
                None,
            )
            if reference_item:
                # update the resource with the new reference
                index = reference_list.index(reference_item)
                resource = orjson.loads(resource.json())
                resource[field][index] = {"reference": reference}
                graph.nodes[successor]["resource"] = resource
            else:
                print("Reference item not found")
        else:
            if not isinstance(graph.nodes[successor]["resource"], dict):
                resource = orjson.loads(resource.json())
                resource[field] = {"reference": reference}
                graph.nodes[successor]["resource"] = resource
            else:
                graph.nodes[successor]["resource"][field] = reference


def _resource_from_graph_node(graph: nx.DiGraph, node: str) -> FHIRAbstractModel:
    """
    Get a resource from a graph node.

    Args:
        graph: The graph to get the resource from.
        node: The node to get the resource from.

    Returns:
        The resource at the node.
    """
    resource = graph.nodes[node]["resource"]

    if resource:
        if isinstance(resource, OrderedDict) or isinstance(resource, dict):
            resource_dict = dict(resource)
            resource_type = resource_dict.get(
                "resourceType", resource_dict.get("resource_type")
            )
            resource_json = orjson.dumps(resource_dict)
            try:
                resource = construct_fhir_element(resource_type, resource_json)
            except ValidationError as e:
                print(f"Error creating resource: {e}")
                print(f"Resource: {resource_dict}")
                raise e
            except Exception as e:
                print(f"Error creating resource: {e}")
                print(f"Resource: {resource_dict}")
                raise e
        elif isinstance(resource, FHIRAbstractModel):
            pass
        else:
            raise ValueError(f"Unknown resource type: {type(resource)}")
        return resource
    else:
        raise ValueError(f"Resource not found for node {node}")


def _get_transfer_resources(
    source: "FhirServer",
    resources: List[Union[Resource, FHIRAbstractModel]] = None,
    query: FhirQuerySync = None,
    get_missing: bool = True,
) -> List[FHIRAbstractModel]:
    if query and resources:
        raise ValueError("Cannot specify both query and resources")
    if not query and not resources:
        raise ValueError(
            f"Must specify either query or resources. Query: {query}, Resources: {resources}"
        )
    # if query parameters are given execute the query against the server
    if query:
        resources = query.all().resource_list
    # find missing references
    missing_references = check_missing_references(resources)

    # get missing references
    if missing_references:
        if get_missing:
            missing = source.get_many(missing_references)
            resources.extend(missing)
        else:
            raise ValueError(
                f"Related resources of the resources to be transferred are missing:\n{missing_references} \n\n"
                f"To get these resources, set get_missing=True."
            )

    return resources