monarch-initiative/N2V

View on GitHub
embiggen/utils/abstract_gcn.py

Summary

Maintainability
F
4 days
Test Coverage
"""Kipf GCN model for node-label prediction."""
import copy
import warnings
from typing import Any, Dict, List, Optional, Set, Tuple, Type, Union

import compress_pickle
import numpy as np
import pandas as pd
import tensorflow as tf
from ensmallen import Graph
from keras_mixed_sequence import Sequence
from tensorflow.keras.callbacks import (  # pylint: disable=import-error,no-name-in-module
    EarlyStopping, ReduceLROnPlateau,
    TerminateOnNaN
)
from tensorflow.keras.layers import Input
from tensorflow.keras.models import \
    Model  # pylint: disable=import-error,no-name-in-module
from tensorflow.keras.optimizers import \
    Optimizer  # pylint: disable=import-error,no-name-in-module
from userinput.utils import must_be_in_set

from embiggen.layers.tensorflow import (EmbeddingLookup, FlatEmbedding,
                                        GraphConvolution, L2Norm)
from embiggen.utils import AbstractEdgeFeature
from embiggen.utils.abstract_models import (AbstractClassifierModel,
                                            abstract_class)
from embiggen.utils.normalize_model_structural_parameters import \
    normalize_model_list_parameter
from embiggen.utils.number_to_ordinal import number_to_ordinal


def graph_to_sparse_tensor(
    graph: Graph,
    kernel: str,
    handling_multi_graph: str = "warn",
) -> tf.SparseTensor:
    """Returns provided graph as sparse Tensor.

    Parameters
    -------------------
    graph: Graph,
        The graph to convert.
    kernel: str
        The type of normalization to use. It can either be:
        * "Weights", to just use the graph weights themselves.
        * "Left Normalized Laplacian", for the left normalized Laplacian.
        * "Right Normalized Laplacian", for the right normalized Laplacian.
        * "Symmetric Normalized Laplacian", for the symmetric normalized Laplacian.
        * "Transposed Left Normalized Laplacian", for the transposed left normalized Laplacian.
        * "Transposed Right Normalized Laplacian", for the transposed right normalized Laplacian.
        * "Transposed Symmetric Normalized Laplacian", for the transposed symmetric normalized Laplacian.
        * "Weighted Left Normalized Laplacian", for the weighted left normalized Laplacian.
        * "Weighted Right Normalized Laplacian", for the weighted right normalized Laplacian.
        * "Weighted Symmetric Normalized Laplacian", for the weighted symmetric normalized Laplacian.
        * "Transposed Weighted Left Normalized Laplacian", for the transposed weighted left normalized Laplacian.
        * "Transposed Weighted Right Normalized Laplacian", for the transposed weighted right normalized Laplacian.
        * "Transposed Weighted Symmetric Normalized Laplacian", for the transposed weighted symmetric normalized Laplacian.
    handling_multi_graph: str = "warn"
        How to behave when dealing with multigraphs.
        Possible behaviours are:
        - "warn", which warns the user and drops the multi-edges.
        - "raise"
        - "drop"

    Raises
    -------------------
    ValueError,
        If the weights are requested but the graph does not contain any.
    ValueError,
        If the graph contains singletons.
    ValueError,
        If the graph is a multigraph.

    Returns
    -------------------
    SparseTensor with (weighted) adjacency matrix.
    """
    if "Weighted" in kernel:
        use_weights = True
        kernel = kernel.replace("Weighted ", "")
    else:
        use_weights = False

    if "Transposed" in kernel:
        transpose = True
        kernel = kernel.replace("Transposed ", "")
    else:
        transpose = False

    if use_weights and not graph.has_edge_weights():
        raise ValueError(
            "Edge weights were requested but the provided graph "
            "does not contain any edge weight."
        )

    if graph.has_singleton_nodes():
        raise ValueError(
            f"In the provided {graph.get_name()} graph there are "
            f"{graph.get_number_of_singleton_nodes()} singleton nodes."
            "The GCN model does not support operations on graph containing "
            "singletons. You can either choose to drop singletons from "
            "the graph by using the `graph.remove_singleton_nodes()` "
            "method or alternatively you can add selfloops to them by "
            "using the `graph.add_selfloops()` method."
        )

    if graph.is_multigraph():
        message = (
            "The GCN model does not currently support convolutions on a multigraph. "
            "We are dropping the parallel edges before computing the adjacency matrix."
        )
        if handling_multi_graph == "warn":
            warnings.warn(message)
        elif handling_multi_graph == "raise":
            raise ValueError(message)

        graph = graph.remove_parallel_edges()

    # We transpose the graph if requested, though the operation is skipped
    # if we are computing the transposed of an undirected graph. A warning
    # is raised in this case.
    if transpose:
        if graph.is_directed():
            graph = graph.to_transposed()
        else:
            warnings.warn(
                "You are trying to compute the transposed of an undirected graph. "
                "The transposed of an undirected graph is the same graph. "
                "This operation is skipped."
            )

    if kernel == "Weights":
        edge_node_ids = graph.get_directed_edge_node_ids()
        kernel_weights = graph.get_directed_edge_weights()
    elif kernel == "Left Normalized Laplacian":
        edge_node_ids, kernel_weights = graph.get_left_normalized_laplacian_coo_matrix()
    elif kernel == "Right Normalized Laplacian":
        (
            edge_node_ids,
            kernel_weights,
        ) = graph.get_right_normalized_laplacian_coo_matrix()
    elif kernel == "Symmetric Normalized Laplacian":
        (
            edge_node_ids,
            kernel_weights,
        ) = graph.get_symmetric_normalized_laplacian_coo_matrix()
    else:
        raise ValueError(
            f"Kernel {kernel} is not supported. "
            "Supported kernels are: "
            f"{', '.join(AbstractGCN.supported_kernels)}."
        )
    kernel_weights = np.abs(kernel_weights)
    if use_weights and kernel != "Weights":
        kernel_weights = kernel_weights * graph.get_directed_edge_weights()

    # We check that no NaNs are present in the kernel weights.
    number_of_nans = np.isnan(kernel_weights).sum()
    if number_of_nans > 0:
        raise ValueError(
            f"The provided graph contains {number_of_nans} NaNs in the kernel weights."
        )
    
    # We check that no value is set to zero.
    number_of_zeros = (kernel_weights == 0).sum()
    if number_of_zeros > 0:
        raise ValueError(
            f"The provided graph contains {number_of_zeros} zeros in the kernel weights."
        )
    
    return tf.sparse.reorder(
        tf.SparseTensor(
            edge_node_ids,
            kernel_weights,
            (graph.get_number_of_nodes(), graph.get_number_of_nodes()),
        )
    )


@abstract_class
class AbstractGCN(AbstractClassifierModel):
    """Abstract base GCN."""

    supported_kernels = [
        "Weights",
        "Left Normalized Laplacian",
        "Right Normalized Laplacian",
        "Symmetric Normalized Laplacian",
        "Transposed Left Normalized Laplacian",
        "Transposed Right Normalized Laplacian",
        "Transposed Symmetric Normalized Laplacian",
        "Weighted Left Normalized Laplacian",
        "Weighted Right Normalized Laplacian",
        "Weighted Symmetric Normalized Laplacian",
        "Trasposed Weighted Left Normalized Laplacian",
        "Trasposed Weighted Right Normalized Laplacian",
        "Trasposed Weighted Symmetric Normalized Laplacian",
    ]

    def __init__(
        self,
        kernels: Optional[Union[str, List[str]]],
        epochs: int = 1000,
        number_of_graph_convolution_layers: int = 2,
        number_of_units_per_graph_convolution_layers: Union[int, List[int]] = 128,
        dropout_rate: float = 0.5,
        batch_size: Optional[int] = None,
        apply_norm: bool = False,
        combiner: str = "sum",
        optimizer: Union[str, Optimizer] = "adam",
        early_stopping_min_delta: float = 0.001,
        early_stopping_patience: int = 10,
        reduce_lr_min_delta: float = 0.001,
        reduce_lr_patience: int = 5,
        early_stopping_monitor: str = "loss",
        early_stopping_mode: str = "min",
        reduce_lr_monitor: str = "loss",
        reduce_lr_mode: str = "min",
        reduce_lr_factor: float = 0.9,
        use_class_weights: bool = True,
        random_state: int = 42,
        use_node_embedding: bool = False,
        node_embedding_size: int = 50,
        use_node_type_embedding: bool = False,
        node_type_embedding_size: int = 50,
        residual_convolutional_layers: bool = False,
        handling_multi_graph: str = "warn",
        node_feature_names: Optional[Union[str, List[str]]] = None,
        node_type_feature_names: Optional[Union[str, List[str]]] = None,
        verbose: bool = True,
    ):
        """Create new Kipf GCN object.

        Parameters
        -------------------------------
        kernels: Optional[Union[str, List[str]]]
            The type of normalization to use. It can either be:
            * "Weights", to just use the graph weights themselves.
            * "Left Normalized Laplacian", for the left normalized Laplacian.
            * "Right Normalized Laplacian", for the right normalized Laplacian.
            * "Symmetric Normalized Laplacian", for the symmetric normalized Laplacian.
            * "Transposed Left Normalized Laplacian", for the transposed left normalized Laplacian.
            * "Transposed Right Normalized Laplacian", for the transposed right normalized Laplacian.
            * "Transposed Symmetric Normalized Laplacian", for the transposed symmetric normalized Laplacian.
            * "Weighted Left Normalized Laplacian", for the weighted left normalized Laplacian.
            * "Weighted Right Normalized Laplacian", for the weighted right normalized Laplacian.
            * "Weighted Symmetric Normalized Laplacian", for the weighted symmetric normalized Laplacian.
            * "Transposed Weighted Left Normalized Laplacian", for the transposed weighted left normalized Laplacian.
            * "Transposed Weighted Right Normalized Laplacian", for the transposed weighted right normalized Laplacian.
            * "Transposed Weighted Symmetric Normalized Laplacian", for the transposed weighted symmetric normalized Laplacian.
        epochs: int = 1000
            Epochs to train the model for.
        number_of_graph_convolution_layers: int = 3
            Number of graph convolution layer.
        number_of_units_per_graph_convolution_layers: Union[int, List[int]] = 128
            Number of units per hidden layer.
        dropout_rate: float = 0.3
            Float between 0 and 1.
            Fraction of the input units to dropout.
        batch_size: Optional[int] = None
            Batch size to use while training the model.
            If None, the batch size will be the number of nodes.
            In all model parametrization that involve a number of graph
            convolution layers, the batch size will be the number of nodes.
        apply_norm: bool = False
            Whether to normalize the output of the convolution operations,
            after applying the level activations.
        combiner: str = "mean"
            A string specifying the reduction op.
            Currently "mean", "sqrtn" and "sum" are supported.
            "sum" computes the weighted sum of the embedding results for each row.
            "mean" is the weighted sum divided by the total weight.
            "sqrtn" is the weighted sum divided by the square root of the sum of the squares of the weights.
            Defaults to mean.
        optimizer: str = "Adam"
            The optimizer to use while training the model.
        early_stopping_min_delta: float
            Minimum delta of metric to stop the training.
        early_stopping_patience: int
            Number of epochs to wait for when the given minimum delta is not
            achieved after which trigger early stopping.
        reduce_lr_min_delta: float
            Minimum delta of metric to reduce learning rate.
        reduce_lr_patience: int
            Number of epochs to wait for when the given minimum delta is not
            achieved after which reducing learning rate.
        early_stopping_monitor: str = "loss",
            Metric to monitor for early stopping.
        early_stopping_mode: str = "min",
            Direction of the variation of the monitored metric for early stopping.
        reduce_lr_monitor: str = "loss",
            Metric to monitor for reducing learning rate.
        reduce_lr_mode: str = "min",
            Direction of the variation of the monitored metric for learning rate.
        reduce_lr_factor: float = 0.9,
            Factor for reduction of learning rate.
        use_class_weights: bool = True
            Whether to use class weights to rebalance the loss relative to unbalanced classes.
            Learn more about class weights here: https://www.tensorflow.org/tutorials/structured_data/imbalanced_data
        random_state: int = 42
            Random state to reproduce the training samples.
        use_node_embedding: bool = False
            Whether to use a node embedding layer that is automatically learned
            by the model while it trains. Please do be advised that by using
            a node embedding layer you are making a closed-world assumption,
            and this model will not work on graphs with a different node vocabulary.
        node_embedding_size: int = 50
            Dimension of the node embedding.
        use_node_type_embedding: bool = False
            Whether to use a node type embedding layer that is automatically learned
            by the model while it trains. Please do be advised that by using
            a node type embedding layer you are making a closed-world assumption,
            and this model will not work on graphs with a different node vocabulary.
        node_type_embedding_size: int = 50
            Dimension of the node type embedding.
        residual_convolutional_layers: bool = False
            Whether to use residual connections and concatenate all the convolutional
            layers together before the first dense layer.
        handling_multi_graph: str = "warn"
            How to behave when dealing with multigraphs.
            Possible behaviours are:
            - "warn"
            - "raise"
            - "drop"
        node_feature_names: Optional[Union[str, List[str]]] = None
            Names of the node features.
            This is used as the layer names.
        node_type_feature_names: Optional[Union[str, List[str]]] = None
            Names of the node type features.
            This is used as the layer names.
        verbose: bool = True
            Whether to show loading bars.
        """
        self._number_of_graph_convolution_layers = number_of_graph_convolution_layers
        self._number_of_units_per_graph_convolution_layers = (
            normalize_model_list_parameter(
                number_of_units_per_graph_convolution_layers,
                number_of_graph_convolution_layers,
                object_type=int,
                can_be_empty=True,
            )
        )

        if kernels is None:
            kernels = []

        if isinstance(kernels, str):
            kernels = [kernels]

        if not isinstance(kernels, list):
            raise TypeError(
                f"Provided kernels should be either a string or a list, "
                f"but found {type(kernels)}."
            )

        self._kernels = [
            must_be_in_set(kernel, self.supported_kernels, "kernel")
            for kernel in kernels
        ]

        if self.has_convolutional_layers() and not self.has_kernels():
            raise ValueError(
                "You are trying to create a GCN model with convolutional layers "
                "but you have not provided any kernel to use."
            )
        if not self.has_convolutional_layers() and self.has_kernels():
            raise ValueError(
                "You are trying to create a GCN model without convolutional layers "
                "but you have provided kernels to use."
            )

        self._combiner = combiner
        self._epochs = epochs
        self._use_class_weights = use_class_weights
        self._dropout_rate = dropout_rate
        self._optimizer = optimizer
        self._apply_norm = apply_norm
        self._handling_multi_graph = handling_multi_graph
        self._use_node_embedding = use_node_embedding
        self._node_embedding_size = node_embedding_size
        self._use_node_type_embedding = use_node_type_embedding
        self._node_type_embedding_size = node_type_embedding_size

        if isinstance(node_feature_names, str):
            node_feature_names = [node_feature_names]

        if isinstance(node_type_feature_names, str):
            node_type_feature_names = [node_type_feature_names]

        self._node_feature_names = node_feature_names
        self._node_type_feature_names = node_type_feature_names

        if residual_convolutional_layers and not self.has_kernels():
            raise ValueError(
                "You are trying to create a GCN model with residual convolutional layers "
                "but you have not provided any kernel to use."
            )

        self._residual_convolutional_layers = residual_convolutional_layers

        self._early_stopping_min_delta = early_stopping_min_delta
        self._early_stopping_patience = early_stopping_patience
        self._reduce_lr_min_delta = reduce_lr_min_delta
        self._reduce_lr_patience = reduce_lr_patience
        self._early_stopping_monitor = early_stopping_monitor
        self._early_stopping_mode = early_stopping_mode
        self._reduce_lr_monitor = reduce_lr_monitor
        self._reduce_lr_mode = reduce_lr_mode
        self._reduce_lr_factor = reduce_lr_factor

        self._verbose = verbose
        self._model = None
        self._layer_names: Set[str] = set()
        self.history = None

        super().__init__(random_state=random_state)

        self._batch_size = batch_size

    @classmethod
    def smoke_test_parameters(cls) -> Dict[str, Any]:
        """Returns parameters for smoke test."""
        return dict(
            epochs=1,
            number_of_units_per_graph_convolution_layers=2,
            handling_multi_graph="drop",
        )

    def _add_layer_name(self, layer_name: str):
        """Add layer name to the set of layer names."""
        if layer_name in self._layer_names:
            raise ValueError(
                f"You are trying to add a layer with name {layer_name} "
                f"but a layer with the same name has already been added."
            )

        self._layer_names.add(layer_name)

    def clone(self) -> Type["AbstractGCN"]:
        """Return copy of self."""
        with tf.keras.utils.custom_object_scope(
            {
                "GraphConvolution": GraphConvolution,
                "EmbeddingLookup": EmbeddingLookup,
                "FlatEmbedding": FlatEmbedding,
                "L2Norm": L2Norm,
            }
        ):
            return copy.deepcopy(self)

    def get_batch_size_from_graph(self, graph: Graph) -> int:
        """Returns batch size to use for the given graph."""
        if self.has_convolutional_layers() or self._batch_size is None:
            return graph.get_number_of_nodes()
        return self._batch_size

    def parameters(self) -> Dict[str, Any]:
        """Returns parameters used for this model."""
        return dict(
            number_of_units_per_graph_convolution_layers=self._number_of_units_per_graph_convolution_layers,
            epochs=self._epochs,
            apply_norm=self._apply_norm,
            combiner=self._combiner,
            use_class_weights=self._use_class_weights,
            dropout_rate=self._dropout_rate,
            optimizer=self._optimizer,
            early_stopping_min_delta=self._early_stopping_min_delta,
            early_stopping_patience=self._early_stopping_patience,
            reduce_lr_min_delta=self._reduce_lr_min_delta,
            reduce_lr_patience=self._reduce_lr_patience,
            early_stopping_monitor=self._early_stopping_monitor,
            early_stopping_mode=self._early_stopping_mode,
            reduce_lr_monitor=self._reduce_lr_monitor,
            reduce_lr_mode=self._reduce_lr_mode,
            reduce_lr_factor=self._reduce_lr_factor,
            use_node_embedding=self._use_node_embedding,
            node_embedding_size=self._node_embedding_size,
            use_node_type_embedding=self._use_node_type_embedding,
            node_type_embedding_size=self._node_type_embedding_size,
            residual_convolutional_layers=self._residual_convolutional_layers,
            handling_multi_graph=self._handling_multi_graph,
            node_feature_names=self._node_feature_names,
            node_type_feature_names=self._node_type_feature_names,
            verbose=self._verbose,
        )

    def plot(self, show_shapes: bool = True, **kwargs: Dict):
        """Plot model using dot.
        
        Parameters
        -----------------------
        show_shapes: bool = True
            Whether to show shapes of the layers.
        kwargs: Dict
            Additional arguments to pass to the plot function.
        """
        if self._model is None:
            raise ValueError(
                "You are trying to plot a model that has not been compiled yet. "
                "You should call the `compile` or `fit` methods before calling `plot`."
            )
        return tf.keras.utils.plot_model(
            self._model,
            show_layer_names=True,
            expand_nested=True,
            show_layer_activations=True,
            show_shapes=show_shapes,
            **kwargs,
        )
    
    def summary(self, **kwargs: Dict):
        """Print summary of the model.
        
        Parameters
        -----------------------
        kwargs: Dict
            Additional arguments to pass to the summary function.
        """
        if self._model is None:
            raise ValueError(
                "You are trying to print the summary of a model that has not been compiled yet. "
                "You should call the `compile` or `fit` methods before calling `summary`."
            )
        return self._model.summary(**kwargs)

    def _get_class_weights(self, graph: Graph) -> Dict[int, float]:
        """Returns dictionary with class weights."""
        raise NotImplementedError(
            "The method `get_class_weights` should be implemented "
            "in the child classes of `AbstractGCN`, but is missing "
            f"in the class {self.__class__.__name__}."
        )

    def _get_model_training_input(
        self,
        graph: Graph,
        support: Graph,
        node_features: Optional[List[np.ndarray]],
        node_type_features: Optional[List[np.ndarray]],
        edge_type_features: Optional[List[np.ndarray]],
        edge_features: Optional[Union[Type[AbstractEdgeFeature], List[np.ndarray]]],
    ) -> Tuple[Union[np.ndarray, Type[Sequence]]]:
        """Returns training input tuple."""
        raise NotImplementedError(
            "The method `get_model_training_input` should be implemented "
            "in the child classes of `AbstractGCN`, but is missing "
            f"in the class {self.__class__.__name__}."
        )

    def _get_model_training_output(
        self,
        graph: Graph,
    ) -> Optional[np.ndarray]:
        """Returns training output tuple."""
        raise NotImplementedError(
            "The method `get_model_training_output` should be implemented "
            "in the child classes of `AbstractGCN`, but is missing "
            f"in the class {self.__class__.__name__}."
        )

    def _get_model_training_sample_weights(
        self,
        graph: Graph,
    ) -> Optional[np.ndarray]:
        """Returns training output tuple."""
        raise NotImplementedError(
            "The method `_get_model_training_sample_weights` should be implemented "
            "in the child classes of `AbstractGCN`, but is missing "
            f"in the class {self.__class__.__name__}."
        )

    def _get_model_prediction_input(
        self,
        graph: Graph,
        support: Graph,
        node_features: Optional[List[np.ndarray]] = None,
        node_type_features: Optional[List[np.ndarray]] = None,
        edge_type_features: Optional[List[np.ndarray]] = None,
        edge_features: Optional[
            Union[
                Type[AbstractEdgeFeature],
                List[Union[np.ndarray, Type[AbstractEdgeFeature]]],
            ]
        ] = None,
    ) -> Tuple[Union[np.ndarray, Type[Sequence]]]:
        """Returns dictionary with class weights."""
        raise NotImplementedError(
            "The method `get_model_prediction_input` should be implemented "
            "in the child classes of `AbstractGCN`, but is missing "
            f"in the class {self.__class__.__name__}."
        )

    def _build_model(
        self,
        graph: Graph,
        graph_convolution_model: Model,
        edge_type_features: List[np.ndarray],
        edge_features: List[Union[np.ndarray, Type[AbstractEdgeFeature]]],
    ) -> Type[Model]:
        """Returns GCN model."""
        raise NotImplementedError(
            "The method `_build_model` should be implemented "
            "in the child classes of `AbstractGCN`, but is missing "
            f"in the class {self.__class__.__name__}."
        )

    def _build_graph_convolution_model(
        self,
        graph: Graph,
        node_features: List[np.ndarray],
        node_type_features: List[np.ndarray],
    ) -> Model:
        """Create new GCN model."""
        # We create the list we will use to collect the input features.
        input_features = []
        hidden = []

        if self.has_kernels() and len(node_features) == 0 and len(node_type_features) == 0 and not self._use_node_embedding and not self._use_node_type_embedding:
            raise ValueError(
                "You are trying to create a GCN model with convolutional layers "
                "but you have not provided any node or node type feature to use, "
                "and you are not using node or node type embeddings."
            )

        kernels = []
        for kernel in self._kernels:
            self._add_layer_name(kernel)
            kernels.append(Input(
                shape=(None,),
                # shape=(graph.get_number_of_nodes(),),
                # batch_size=graph.get_number_of_nodes(),
                sparse=True,
                name=kernel,
            ))

        # When we are not creating a node-level model but an edge-label or
        # edge-prediction model, and the model is not using convolutional layers,
        # we need to differentiate th source node and destination node features,
        # both for the node themselves and for the associated node types.

        # This is necessary specifically when the batch size is not the number of nodes,
        # as we no longer can assume a dense range of features for each of the nodes
        # in the source nodes and destination nodes feature set.

        # One important observation, is that we cannot any longer create in this portion
        # of the model the node embedding layer, as we do not have the source and destination
        # node ids at this level - we will need to add this node embedding layer within the
        # abstract edge GCN model.

        # What we can do here, is handle in a centralized manner the source and destination
        # node and node type features, and then pass them to the abstract edge GCN model
        # as a list of features, where the first half of the features are the source node
        # features and the second half of the features are the destination node features.

        if not self.has_kernels() and self.is_edge_level_task():
            prefixes = ("Source ", "Destination ")
        else:
            prefixes = ("",)

        for prefix in prefixes:
            # We create the input layers for all of the node and node type features.
            for features, feature_names, feature_category in (
                (node_features, self._node_feature_names, f"{prefix}node feature"),
                (
                    node_type_features,
                    self._node_type_feature_names,
                    f"{prefix}node type feature",
                ),
            ):
                if feature_names is not None and len(features) == 0:
                    raise ValueError(
                        f"You have provided {len(feature_names)} {feature_category} names, "
                        f"but then you have not provided any {feature_category}. Either "
                        f"provide the {feature_category} to the compile or fit method, or "
                        "remove the feature names from the constructor call."
                    )
                if len(features) > 0:
                    if feature_names is None:
                        if len(features) > 1:
                            feature_names = [
                                f"{number_to_ordinal(i+1)} {feature_category}"
                                for i in range(len(features))
                            ]
                        else:
                            feature_names = [feature_category.capitalize()]
                    else:
                        feature_names = [
                            f"{prefix}{feature_name}"
                            for feature_name in feature_names
                        ]
                    if len(feature_names) != len(features):
                        raise ValueError(
                            f"You have provided {len(feature_names)} "
                            f"{feature_category} names but you have provided {len(features)} "
                            f"{feature_category}s to the model. Specifically, the provided "
                            f"feature names are {feature_names}."
                        )
                    new_input_features = []
                    for node_feature, node_feature_name in zip(
                        features, feature_names
                    ):
                        self._add_layer_name(node_feature_name)
                        new_input_features.append(Input(
                            shape=node_feature.shape[1:],
                            name=node_feature_name,
                        ))
                    input_features.extend(new_input_features)
                    hidden.extend(new_input_features)

            # We create the node embedding layer if we are executing convolutional layers
            # or, if we are not executing convolutional layers, if we are not executing
            # an edge-level task. In fact, when we are executing an edge-level task and
            # we are not executing convolutional layers, we will need to create the node
            # embedding layer within the abstract edge GCN model.
            if self._use_node_embedding and (
                self.has_kernels() or not self.is_edge_level_task()
            ):
                node_ids = Input(shape=(1,), name="Nodes", dtype=tf.int32)
                input_features.append(node_ids)

                node_embedding = FlatEmbedding(
                    vocabulary_size=graph.get_number_of_nodes(),
                    dimension=self._node_embedding_size,
                    input_length=1,
                    name="NodesEmbedding",
                )(node_ids)
                hidden.append(node_embedding)

            if self._use_node_type_embedding:
                node_type_ids = Input(
                    shape=(graph.get_maximum_multilabel_count(),),
                    name=f"{prefix}Node Types",
                    dtype=tf.int32,
                )
                input_features.append(node_type_ids)

                space_adjusted_input_layer_name = node_type_ids.name.replace(" ", "")
                use_masking = graph.has_multilabel_node_types() or graph.has_unknown_node_types()

                node_type_embedding = FlatEmbedding(
                    vocabulary_size=graph.get_number_of_node_types() + (1 if use_masking else 0),
                    dimension=self._node_type_embedding_size,
                    input_length=graph.get_maximum_multilabel_count(),
                    mask_zero=use_masking,
                    name=f"{space_adjusted_input_layer_name}Embedding",
                )(node_type_ids)
                hidden.append(node_type_embedding)

        starting_hidden = hidden

        output_hiddens = []

        for kernel, kernel_name in zip(kernels, self._kernels):
            hidden = starting_hidden
            # Building the body of the model.
            for i, units in enumerate(
                self._number_of_units_per_graph_convolution_layers
            ):
                if len(self._number_of_units_per_graph_convolution_layers) > 1:
                    ordinal = number_to_ordinal(i + 1)
                else:
                    ordinal = ""
                sanitized_kernel_name = kernel_name.replace(" ", "")
                assert len(hidden) > 0
                hidden = GraphConvolution(
                    units=units,
                    combiner=self._combiner,
                    dropout_rate=self._dropout_rate,
                    apply_norm=self._apply_norm,
                    name=f"{ordinal}{sanitized_kernel_name}GraphConvolution",
                )((kernel, *hidden))
                if self._residual_convolutional_layers:
                    output_hiddens.extend(hidden)
            if not self._residual_convolutional_layers:
                output_hiddens.extend(hidden)

        if len(output_hiddens) == 0:
            output_hiddens = starting_hidden

        # Returning the convolutional portion of the model.
        return Model(
            inputs=[
                input_layer
                for input_layer in (*kernels, *input_features)
                if input_layer is not None
            ],
            outputs=output_hiddens,
        )

    def get_model_expected_input_shapes(self, graph: Graph) -> Dict[str, Tuple[int]]:
        """Return dictionary with expected input shapes."""
        if self._model is None:
            raise RuntimeError(
                "You need to fit the model before you can "
                "retrieve the expected input shapes."
            )

        return {
            input_layer.name: tuple(
                [
                    self.get_batch_size_from_graph(graph)
                    if dimension is None
                    else dimension
                    for dimension in tuple(input_layer.shape)
                ]
            )
            for input_layer in self._model.inputs
        }

    def compile(
        self,
        graph: Graph,
        support: Optional[Graph] = None,
        node_features: Optional[List[np.ndarray]] = None,
        node_type_features: Optional[List[np.ndarray]] = None,
        edge_type_features: Optional[List[np.ndarray]] = None,
        edge_features: Optional[
            Union[
                Type[AbstractEdgeFeature],
                List[Union[np.ndarray, Type[AbstractEdgeFeature]]],
            ]
        ] = None,
    ) -> pd.DataFrame:
        """Return pandas dataframe with training history.

        Parameters
        -----------------------
        graph: Graph,
            The graph whose edges are to be embedded and edge types extracted.
        node_features: Optional[List[np.ndarray]]
            The node features to be used in the training of the model.
        node_type_features: Optional[List[np.ndarray]]
            The node type features to be used in the training of the model.
        edge_type_features: Optional[List[np.ndarray]]
            The edge type features to be used in the training of the model.
        edge_features: Optional[Union[Type[AbstractEdgeFeature], List[Union[np.ndarray, Type[AbstractEdgeFeature]]]]] = None
            The edge features to be used in the training of the model.

        Returns
        -----------------------
        Dataframe with training history.
        """
        if (
            self.has_kernels()
            and node_features is None
            and not self._use_node_embedding
            and node_type_features is None
            and not self._use_node_type_embedding
        ):
            raise ValueError(
                "Neither node features were provided nor the node "
                "embedding was enabled through the `use_node_embedding` "
                "parameter. If you do not provide node features or use a node embedding layer "
                "nor use a node type embedding layer and neiher use node type features, "
                "it does not make sense to use a GCN model."
            )

        node_features=self.normalize_node_features(
            graph=graph,
            support=support,
            random_state=self._random_state,
            node_features=node_features,
            allow_automatic_feature=True,
        )
        node_type_features=self.normalize_node_type_features(
            graph=graph,
            support=support,
            random_state=self._random_state,
            node_type_features=node_type_features,
            allow_automatic_feature=True,
        )
        edge_type_features=self.normalize_edge_type_features(
            graph=graph,
            support=support,
            random_state=self._random_state,
            edge_type_features=edge_type_features,
            allow_automatic_feature=True,
        )
        edge_features=self.normalize_edge_features(
            graph=graph,
            support=support,
            random_state=self._random_state,
            edge_features=edge_features,
            allow_automatic_feature=True,
        )

        self._model: Type[Model] = self._build_model(
            support,
            graph_convolution_model=self._build_graph_convolution_model(
                graph,
                node_features=node_features,
                node_type_features=node_type_features,
            ),
            edge_type_features=edge_type_features,
            edge_features=edge_features,
        )

    def _fit(
        self,
        graph: Graph,
        support: Optional[Graph] = None,
        node_features: Optional[List[np.ndarray]] = None,
        node_type_features: Optional[List[np.ndarray]] = None,
        edge_type_features: Optional[List[np.ndarray]] = None,
        edge_features: Optional[
            Union[
                Type[AbstractEdgeFeature],
                List[Union[np.ndarray, Type[AbstractEdgeFeature]]],
            ]
        ] = None,
    ) -> pd.DataFrame:
        """Return pandas dataframe with training history.

        Parameters
        -----------------------
        graph: Graph,
            The graph whose edges are to be embedded and edge types extracted.
            It can either be an Graph or a list of lists of edges.
        support: Optional[Graph] = None
            The graph describiding the topological structure that
            includes also the above graph.
        node_features: Optional[List[np.ndarray]]
            The node features to be used in the training of the model.
        node_type_features: Optional[List[np.ndarray]]
            The node type features to be used in the training of the model.
        edge_type_features: Optional[List[np.ndarray]]
            The edge type features to be used in the training of the model.
        edge_features: Optional[Union[Type[AbstractEdgeFeature], List[Union[np.ndarray, Type[AbstractEdgeFeature]]]]] = None
            The edge features to be used in the training of the model.

        Returns
        -----------------------
        Dataframe with training history.
        """
        try:
            from tqdm.keras import TqdmCallback

            traditional_verbose = False
        except AttributeError:
            traditional_verbose = True

        if support is None:
            support = graph

        class_weight = (
            self._get_class_weights(graph) if self._use_class_weights else None
        )

        if self._model is None:
            self.compile(
                graph,
                support=support,
                node_features=node_features,
                node_type_features=node_type_features,
                edge_type_features=edge_type_features,
                edge_features=edge_features,
            )

        # Within the expected input shapes, we do not have the batch size itself.
        # The batch size is left implicit as a None value, but we need to add it
        # to the expected input shapes to check that the input shapes are correct.

        expected_input_shapes = self.get_model_expected_input_shapes(graph)

        model_input = self._get_model_training_input(
            graph,
            support=support,
            edge_features=edge_features,
            node_type_features=node_type_features,
            edge_type_features=edge_type_features,
            node_features=node_features,
        )

        if not isinstance(model_input, tuple) and not issubclass(
            type(model_input), Sequence
        ):
            raise RuntimeError(
                "The model input should be a subclass of `Sequence` or a tuple "
                f"but it is `{type(model_input)}`. "
                "This is an internal error, please open an issue at "
                "GRAPE's GitHub page."
            )

        if issubclass(type(model_input), Sequence):
            sequence_input_shapes = [
                tuple(feature.shape) for feature in model_input[0][0]
            ]
        elif isinstance(model_input, tuple):
            sequence_input_shapes = [tuple(feature.shape) for feature in model_input]
        else:
            raise RuntimeError(
                "The model input should be a subclass of `Sequence` or a tuple "
                f"but it is `{type(model_input)}`. "
                "This is an internal error, please open an issue at "
                "GRAPE's GitHub page."
            )

        if len(expected_input_shapes) != len(sequence_input_shapes):
            raise RuntimeError(
                f"We expected {len(expected_input_shapes)} inputs "
                f"and we received {len(sequence_input_shapes)} inputs. "
                "Specifically, the input shapes we expected were "
                f"{expected_input_shapes}, but the training sequence "
                f"provided the input shapes {sequence_input_shapes}. "
                "This is an internal error, please open an issue at "
                "GRAPE's GitHub page."
            )

        for (layer_name, layer_input_shape), input_shape in zip(
            expected_input_shapes.items(), sequence_input_shapes
        ):
            if (
                (layer_input_shape[1:] != input_shape[1:])
                and not self.has_convolutional_layers()
                or layer_input_shape != input_shape
                and self.has_convolutional_layers()
            ):
                raise RuntimeError(
                    f"We expected {len(expected_input_shapes)} inputs "
                    f"and we received {len(sequence_input_shapes)} inputs. "
                    f"The input shape of the layer `{layer_name}` "
                    f"should be `{layer_input_shape}` but it is `{input_shape}`. "
                    "Specifically, the input shapes we expected were "
                    f"{expected_input_shapes}, but the training sequence "
                    f"provided the input shapes {sequence_input_shapes}. "
                    "This is an internal error, please open an issue at "
                    "GRAPE's GitHub page."
                )

        self.history = self._model.fit(
            x=model_input,
            y=self._get_model_training_output(graph),
            sample_weight=self._get_model_training_sample_weights(graph),
            epochs=self._epochs,
            verbose=traditional_verbose and self._verbose > 0,
            batch_size=self.get_batch_size_from_graph(graph),
            shuffle=False,
            class_weight=class_weight,
            callbacks=[
                EarlyStopping(
                    monitor=self._early_stopping_monitor,
                    min_delta=self._early_stopping_min_delta,
                    patience=self._early_stopping_patience,
                    mode=self._early_stopping_mode,
                ),
                ReduceLROnPlateau(
                    monitor=self._reduce_lr_monitor,
                    min_delta=self._reduce_lr_min_delta,
                    patience=self._reduce_lr_patience,
                    factor=self._reduce_lr_factor,
                    mode=self._reduce_lr_mode,
                ),
                TerminateOnNaN(),
                *(
                    (
                        TqdmCallback(
                            verbose=1 if "edge" in self.task_name().lower() else 0,
                            leave=False,
                        ),
                    )
                    if not traditional_verbose and self._verbose
                    else ()
                ),
            ],
        )

    def _predict_proba(
        self,
        graph: Graph,
        support: Optional[Graph] = None,
        node_features: Optional[List[np.ndarray]] = None,
        node_type_features: Optional[List[np.ndarray]] = None,
        edge_type_features: Optional[List[np.ndarray]] = None,
        edge_features: Optional[
            Union[
                Type[AbstractEdgeFeature],
                List[Union[np.ndarray, Type[AbstractEdgeFeature]]],
            ]
        ] = None,
    ) -> pd.DataFrame:
        """Run predictions on the provided graph."""
        if not graph.has_edges():
            return np.array([])

        if support is None:
            support = graph

        model_input = self._get_model_prediction_input(
            graph,
            support,
            node_features,
            node_type_features,
            edge_type_features,
            edge_features,
        )

        if issubclass(type(model_input), Sequence):
            sequence_input_shapes = [
                tuple(feature.shape) for feature in model_input[0][0]
            ]
        elif isinstance(model_input, tuple):
            sequence_input_shapes = [tuple(feature.shape) for feature in model_input]
        else:
            raise RuntimeError(
                "The model input should be a subclass of `Sequence` or a tuple "
                f"but it is `{type(model_input)}`. "
                "This is an internal error, please open an issue at "
                "GRAPE's GitHub page."
            )

        expected_input_shapes = self.get_model_expected_input_shapes(graph)

        if len(expected_input_shapes) != len(sequence_input_shapes):
            raise RuntimeError(
                f"We expected {len(expected_input_shapes)} inputs "
                f"and we received {len(sequence_input_shapes)} inputs. "
                "Specifically, the input shapes we expected were "
                f"{expected_input_shapes}, but the prediction sequence "
                f"provided the input shapes {sequence_input_shapes}. "
                "This is an internal error, please open an issue at "
                "GRAPE's GitHub page."
            )

        for (layer_name, layer_input_shape), input_shape in zip(
            expected_input_shapes.items(), sequence_input_shapes
        ):
            if layer_input_shape[1:] != input_shape[1:]:
                raise RuntimeError(
                    f"We expected {len(expected_input_shapes)} inputs "
                    f"and we received {len(sequence_input_shapes)} inputs. "
                    f"The input shape of the layer `{layer_name}` "
                    f"should be `{layer_input_shape}` but it is `{input_shape}`. "
                    "Specifically, the input shapes we expected were "
                    f"{expected_input_shapes}, but the prediction sequence "
                    f"provided the input shapes {sequence_input_shapes}. "
                    "This is an internal error, please open an issue at "
                    "GRAPE's GitHub page."
                )

        return self._model.predict(
            model_input, batch_size=self.get_batch_size_from_graph(graph), verbose=False
        )

    def _predict(
        self,
        graph: Graph,
        support: Graph,
        node_features: List[np.ndarray],
        node_type_features: List[np.ndarray],
        edge_type_features: List[np.ndarray],
        edge_features: List[Union[np.ndarray, Type[AbstractEdgeFeature]]],
    ) -> pd.DataFrame:
        """Run predictions on the provided graph."""
        predictions = self._predict_proba(
            graph,
            support,
            node_features=node_features,
            node_type_features=node_type_features,
            edge_type_features=edge_type_features,
            edge_features=edge_features,
        )

        if "Beheaded" in self._model.name:
            return predictions

        if self.is_binary_prediction_task() or self.is_multilabel_prediction_task():
            return predictions > 0.5

        return predictions.argmax(axis=-1)

    def get_output_activation_name(self) -> str:
        """Return activation of the output."""
        # Adding the last layer of the model.
        if self.is_binary_prediction_task() or self.is_multilabel_prediction_task():
            return "sigmoid"
        return "softmax"

    def get_loss_name(self) -> str:
        """Return model loss."""
        # Adding the last layer of the model.
        if self.is_binary_prediction_task() or self.is_multilabel_prediction_task():
            return "binary_crossentropy"
        return "sparse_categorical_crossentropy"

    def get_output_classes(self, graph: Graph) -> int:
        """Returns number of output classes."""
        raise NotImplementedError(
            "The method `get_output_classes` should be implemented "
            "in the child classes of `AbstractGCN`, but is missing "
            f"in the class {self.__class__.__name__}."
        )

    def has_convolutional_layers(self) -> bool:
        """Returns whether the present model has convolutional layers."""
        return self._number_of_graph_convolution_layers

    def has_kernels(self) -> bool:
        """Returns whether the present model has kernels."""
        return len(self._kernels) > 0

    def convert_graph_to_kernels(self, graph: Graph) -> Optional[tf.SparseTensor]:
        """Returns provided graph converted to a sparse Tensor.

        Implementation details
        ---------------------------
        Do note that when the model does not have convolutional layers
        the model will return None, as to avoid allocating like object for
        apparently no reason.
        """
        if not self.has_kernels():
            return None
        return [
            graph_to_sparse_tensor(
                graph,
                kernel=kernel,
                handling_multi_graph=self._handling_multi_graph,
            )
            for kernel in self._kernels
        ]

    @classmethod
    def requires_edge_weights(cls) -> bool:
        return False

    @classmethod
    def requires_positive_edge_weights(cls) -> bool:
        return False

    @classmethod
    def library_name(cls) -> str:
        """Return name of the model."""
        return "TensorFlow"

    @classmethod
    def can_use_edge_weights(cls) -> bool:
        """Returns whether the model can optionally use edge weights."""
        return True

    def is_using_edge_weights(self) -> bool:
        """Returns whether the model is parametrized to use edge weights."""
        return any(["Weighted" in kernel for kernel in self._kernels])

    def is_edge_level_task(self) -> bool:
        """Returns whether the task is edge level."""
        return False

    @classmethod
    def load(cls, path: str) -> "Self":
        """Load a saved version of the model from the provided path.

        Parameters
        -------------------
        path: str
            Path from where to load the model.
        """
        with tf.keras.utils.custom_object_scope(
            {
                "GraphConvolution": GraphConvolution,
                "EmbeddingLookup": EmbeddingLookup,
                "FlatEmbedding": FlatEmbedding,
                "L2Norm": L2Norm,
            }
        ):
            return compress_pickle.load(path)

    def dump(self, path: str):
        """Dump the current model at the provided path.

        Parameters
        -------------------
        path: str
            Path from where to dump the model.
        """
        compress_pickle.dump(self, path)