monarch-initiative/N2V

View on GitHub
embiggen/embedders/pykeen_embedders/pykeen_embedder.py

Summary

Maintainability
A
1 hr
Test Coverage
"""Abstract Torch/PyKEEN Model wrapper for embedding models."""
from typing import Dict, Union, Tuple, Any, Type

import numpy as np
import pandas as pd
from ensmallen import Graph
import inspect
from inspect import getfullargspec

from embiggen.utils.pytorch_utils import validate_torch_device
from embiggen.utils.abstract_models import AbstractEmbeddingModel, abstract_class, EmbeddingResult
from embiggen.utils.abstract_models import format_list
import torch
from pykeen.models import Model
from pykeen.triples import CoreTriplesFactory
from pykeen.training import SLCWATrainingLoop, LCWATrainingLoop, TrainingLoop


@abstract_class
class PyKEENEmbedder(AbstractEmbeddingModel):
    """Abstract Torch/PyKEEN Model wrapper for embedding models."""

    SUPPORTED_TRAINING_LOOPS = {
        "Stochastic Local Closed World Assumption": SLCWATrainingLoop,
        "Local Closed World Assumption": LCWATrainingLoop,
    }

    def __init__(
        self,
        embedding_size: int = 100,
        epochs: int = 100,
        batch_size: int = 2**10,
        device: str = "auto",
        training_loop: Union[str, Type[TrainingLoop]
                             ] = "Stochastic Local Closed World Assumption",
        verbose: bool = False,
        random_state: int = 42,
        ring_bell: bool = False,
        enable_cache: bool = False
    ):
        """Create new PyKEEN Abstract Embedder model.
        
        Parameters
        -------------------------
        embedding_size: int = 100
            The dimension of the embedding to compute.
        epochs: int = 100
            The number of epochs to use to train the model for.
        batch_size: int = 2**10
            Size of the training batch.
        device: str = "auto"
            The devide to use to train the model.
            Can either be cpu or cuda.
        training_loop: Union[str, Type[TrainingLoop]
                             ] = "Stochastic Local Closed World Assumption"
            The training loop to use to train the model.
            Can either be:
            - Stochastic Local Closed World Assumption
            - Local Closed World Assumption
        verbose: bool = False
            Whether to show the loading bar.
        random_state: int = 42
            Random seed to use while training the model
        ring_bell: bool = False,
            Whether to play a sound when embedding completes.
        enable_cache: bool = False
            Whether to enable the cache, that is to
            store the computed embedding.
        """
        if isinstance(training_loop, str):
            if training_loop in PyKEENEmbedder.SUPPORTED_TRAINING_LOOPS:
                training_loop = PyKEENEmbedder.SUPPORTED_TRAINING_LOOPS[training_loop]
            else:
                raise ValueError(
                    f"The provided training loop name {training_loop} is not "
                    "a supported training loop name. "
                    f"The supported names are {format_list(PyKEENEmbedder.SUPPORTED_TRAINING_LOOPS)}."
                )

        if not inspect.isclass(training_loop):
            raise ValueError(
                "The provided training loop should be a class object.")

        if not issubclass(training_loop, TrainingLoop):
            raise ValueError(
                "The provided training loop class is not a subclass of `TrainingLoop` "
                f"and has type {type(training_loop)}."
            )

        self._training_loop = training_loop
        self._epochs = epochs
        self._verbose = verbose
        self._batch_size = batch_size
        self._device = validate_torch_device(device)

        super().__init__(
            embedding_size=embedding_size,
            enable_cache=enable_cache,
            ring_bell=ring_bell,
            random_state=random_state
        )

    @classmethod
    def smoke_test_parameters(cls) -> Dict[str, Any]:
        """Returns parameters for smoke test."""
        return dict(
            embedding_size=10,
            epochs=1
        )

    def parameters(self) -> Dict[str, Any]:
        return dict(
            **super().parameters(),
            **dict(
                epochs=self._epochs,
                batch_size=self._batch_size,
            )
        )

    @classmethod
    def library_name(cls) -> str:
        return "PyKEEN"

    @classmethod
    def task_name(cls) -> str:
        return "Node Embedding"

    def _build_model(self, triples_factory: CoreTriplesFactory) -> Type[Model]:
        """Build new model for embedding.

        Parameters
        ------------------
        triples_factory: CoreTriplesFactory
            The PyKEEN triples factory to use to create the model.
        """
        raise NotImplementedError(
            f"In the child class {self.__class__.__name__} of {super().__class__.__name__} "
            f"implementing the model {self.model_name()} we could not find the method "
            "called `_build_model`. Please do implement it."
        )

    def _get_steps_per_epoch(self, graph: Graph) -> Tuple[Any]:
        """Returns number of steps per epoch.

        Parameters
        ------------------
        graph: Graph
            The graph to compute the number of steps.
        """
        return None

    def _extract_embeddings(
        self,
        graph: Graph,
        model: Type[Model],
        return_dataframe: bool
    ) -> EmbeddingResult:
        """Returns embedding from the model.

        Parameters
        ------------------
        graph: Graph
            The graph that was embedded.
        model: Type[Model]
            The Keras model used to embed the graph.
        return_dataframe: bool
            Whether to return a dataframe of a numpy array.
        """
        raise NotImplementedError(
            f"In the child class {self.__class__.__name__} of {super().__class__.__name__} "
            f"implementing the model {self.model_name()} we could not find the method "
            "called `_extract_embeddings`. Please do implement it."
        )

    @classmethod
    def _create_inverse_triples(cls) -> bool:
        """Returns whether the class is expected to create inverse triples."""
        return False

    def _fit_transform(
        self,
        graph: Graph,
        return_dataframe: bool = True,
    ) -> Union[np.ndarray, pd.DataFrame, Dict[str, np.ndarray], Dict[str, pd.DataFrame]]:
        """Return node embedding"""

        torch_device = torch.device(self._device)

        if "entity_ids" in getfullargspec(CoreTriplesFactory).args:
            triples_factory = CoreTriplesFactory(
                torch.IntTensor(graph.get_directed_edge_triples_ids().astype(np.int64)),
                num_entities=graph.get_number_of_nodes(),
                num_relations=graph.get_number_of_edge_types(),
                entity_ids=graph.get_node_ids().astype(np.int64),
                relation_ids=graph.get_unique_edge_type_ids().astype(np.int64),
                create_inverse_triples=self._create_inverse_triples(),
            )
        else:
            triples_factory = CoreTriplesFactory(
                torch.IntTensor(graph.get_directed_edge_triples_ids().astype(np.int64)),
                num_entities=graph.get_number_of_nodes(),
                num_relations=graph.get_number_of_edge_types(),
                create_inverse_triples=self._create_inverse_triples(),
            )

        batch_size = min(
            self._batch_size,
            graph.get_number_of_directed_edges()
        )

        model = self._build_model(triples_factory)

        if not issubclass(model.__class__, Model):
            raise NotImplementedError(
                "The model created with the `_build_model` in the child "
                f"class {self.__class__.__name__} for the model {self.model_name()} "
                f"in the library {self.library_name()} did not return a "
                f"PyKEEN model but an object of type {type(model)}."
            )

        # Move the model to gpu if we need to
        model.to(torch_device)

        training_loop = SLCWATrainingLoop(
            model=model,
            triples_factory=triples_factory,
        )

        training_loop.train(
            triples_factory=triples_factory,
            num_epochs=self._epochs,
            batch_size=batch_size,
            use_tqdm=self._verbose,
            use_tqdm_batch=self._verbose,
            tqdm_kwargs=dict(
                disable=not self._verbose,
                dynamic_ncols=True,
                leave=False
            )
        )

        # Extract and return the embedding
        return self._extract_embeddings(
            graph,
            model,
            return_dataframe=return_dataframe
        )

    @classmethod
    def requires_nodes_sorted_by_decreasing_node_degree(cls) -> bool:
        return False

    @classmethod
    def is_topological(cls) -> bool:
        return True

    @classmethod
    def requires_edge_types(cls) -> bool:
        return True

    @classmethod
    def can_use_edge_weights(cls) -> bool:
        """Returns whether the model can optionally use edge weights."""
        return False

    @classmethod
    def can_use_node_types(cls) -> bool:
        """Returns whether the model can optionally use node types."""
        return False

    @classmethod
    def task_involves_edge_types(cls) -> bool:
        """Returns whether the model task involves edge types."""
        return True

    @classmethod
    def is_stocastic(cls) -> bool:
        """Returns whether the model is stocastic and has therefore a random state."""
        return True