monarch-initiative/N2V

View on GitHub
embiggen/embedders/pecanpy_embedders/node2vec.py

Summary

Maintainability
A
1 hr
Test Coverage
"""Node2Vec wrapper for PecanPy numba-based node embedding library."""
from typing import Dict, Union, Any
from multiprocessing import cpu_count
from time import time
from pecanpy.pecanpy import SparseOTF

import numpy as np
import pandas as pd
from ensmallen import Graph

from embiggen.utils.abstract_models import AbstractEmbeddingModel, EmbeddingResult


class Node2VecPecanPy(AbstractEmbeddingModel):
    """Node2Vec wrapper for PecanPy numba-based node embedding library."""

    def __init__(
        self,
        embedding_size: int = 100,
        epochs: int = 30,
        walk_length: int = 128,
        iterations: int = 10,
        window_size: int = 5,
        batch_size: int = 128,
        learning_rate: float = 0.01,
        return_weight: float = 0.25,
        explore_weight: float = 4.0,
        number_of_workers: Union[int, str] = "auto",
        verbose: bool = False,
        random_state: int = 42,
        ring_bell: bool = False,
        enable_cache: bool = False,
    ):
        """Create new wrapper for Node2Vec model from PecanPy library.

        Parameters
        -------------------------
        embedding_size: int = 100
            The dimension of the embedding to compute.
        epochs: int = 100
            The number of epochs to use to train the model for.
        batch_size: int = 2**10
            Size of the training batch.
        learning_rate: float = 0.01
            Learning rate of the model.
        verbose: bool = False
            Whether to show the loading bar.
        random_state: int = 42
            Random seed to use while training the model
        ring_bell: bool = False,
            Whether to play a sound when embedding completes.
        enable_cache: bool = False
            Whether to enable the cache, that is to
            store the computed embedding.
        """
        self._walk_length = walk_length
        self._iterations = iterations
        self._window_size = window_size
        self._return_weight = return_weight
        self._explore_weight = explore_weight
        self._epochs = epochs
        self._verbose = verbose
        self._batch_size = batch_size
        self._learning_rate = learning_rate
        self._time_required_by_last_embedding = None
        if number_of_workers == "auto":
            number_of_workers = cpu_count()
        self._number_of_workers = number_of_workers

        super().__init__(
            embedding_size=embedding_size,
            enable_cache=enable_cache,
            ring_bell=ring_bell,
            random_state=random_state,
        )

    @classmethod
    def smoke_test_parameters(cls) -> Dict[str, Any]:
        """Returns parameters for smoke test."""
        return dict(
            embedding_size=5, epochs=1, window_size=1, walk_length=2, iterations=1
        )

    def parameters(self) -> Dict[str, Any]:
        return dict(
            **super().parameters(),
            **dict(
                epochs=self._epochs,
                batch_size=self._batch_size,
                walk_length=self._walk_length,
                window_size=self._window_size,
                iterations=self._iterations,
                return_weight=self._return_weight,
                explore_weight=self._explore_weight,
            )
        )

    @classmethod
    def model_name(cls) -> str:
        """Returns name of the model"""
        return "Node2Vec"

    @classmethod
    def library_name(cls) -> str:
        return "PecanPy"

    @classmethod
    def task_name(cls) -> str:
        return "Node Embedding"

    def get_time_required_by_last_embedding(self) -> float:
        """Returns the time required by last embedding."""
        if self._time_required_by_last_embedding is None:
            raise ValueError("You have not yet run an embedding.")
        return self._time_required_by_last_embedding

    def _fit_transform(
        self,
        graph: Graph,
        return_dataframe: bool = True,
    ) -> Union[
        np.ndarray, pd.DataFrame, Dict[str, np.ndarray], Dict[str, pd.DataFrame]
    ]:
        """Return node embedding"""

        model: SparseOTF = SparseOTF(
            p=1.0 / self._return_weight,
            q=1.0 / self._explore_weight,
            workers=self._number_of_workers,
            verbose=self._verbose,
        )

        # Instead of using `SparseOTF` methods,
        # we manipolate directly the object attributes
        # in order to avoid a memory peak and making
        # the conversion of the Ensmallen graph object
        # into the `SparseOTF` more seamless.

        # The `indptr` attribute contains the indices
        # of the rows in the CSR representation, which
        # are the comulative node degrees.
        # This is shifted by a value to the right, and the
        # first value is set to zero.
        model.indptr = np.zeros(graph.get_number_of_nodes() + 1, dtype=np.int64)
        model.indptr[1:] = graph.get_cumulative_node_degrees().astype(np.int64)

        # In the `indices` attribute we need to store the destinations.
        model.indices = graph.get_directed_destination_node_ids()

        model.set_node_ids(
            node_ids=None,
            implicit_ids=True,
            num_nodes=graph.get_number_of_nodes(),
        )

        # In model data we need to store the edge weights
        # if are present. If the graph is weighted, we use
        # the graph edge weights. Otherwise we set all weights
        # to one, as the library PecanPy does.
        if graph.has_edge_weights():
            model.data = graph.get_directed_edge_weights().astype(np.float64)
        else:
            model.data = np.ones_like(model.indices, dtype=np.float64)

        start = time()

        node_embedding = model.embed(
            dim=self._embedding_size,
            num_walks=self._iterations,
            walk_length=self._walk_length,
            window_size=self._window_size,
            epochs=self._epochs,
            verbose=self._verbose,
        )

        self._time_required_by_last_embedding = time() - start

        if return_dataframe:
            node_embedding = pd.DataFrame(node_embedding, index=graph.get_node_names())

        return EmbeddingResult(
            embedding_method_name=self.model_name(), node_embeddings=node_embedding
        )

    @classmethod
    def requires_nodes_sorted_by_decreasing_node_degree(cls) -> bool:
        return False

    @classmethod
    def is_topological(cls) -> bool:
        return True

    @classmethod
    def can_use_edge_weights(cls) -> bool:
        return True

    @classmethod
    def requires_edge_weights(cls) -> bool:
        return False

    @classmethod
    def can_use_node_types(cls) -> bool:
        """Returns whether the model can use node types."""
        return False

    @classmethod
    def can_use_edge_types(cls) -> bool:
        """Returns whether the model can use edge types."""
        return False

    @classmethod
    def is_stocastic(cls) -> bool:
        """Returns whether the model is stocastic and has therefore a random state."""
        return True