monarch-initiative/N2V

View on GitHub
embiggen/embedders/fastnode2vec_embedders/node2vec.py

Summary

Maintainability
A
1 hr
Test Coverage
"""Node2Vec wrapper for FastNode2Vec numba-based node embedding library."""
from typing import Dict, Union, Any

import numpy as np
import pandas as pd
from ensmallen import Graph

from embiggen.utils.abstract_models import AbstractEmbeddingModel, EmbeddingResult
from fastnode2vec import Graph as FNGraph
from fastnode2vec import Node2Vec
from multiprocessing import cpu_count
from time import time


class Node2VecFastNode2Vec(AbstractEmbeddingModel):
    """Node2Vec wrapper for FastNode2Vec numba-based node embedding library."""

    def __init__(
        self,
        embedding_size: int = 100,
        epochs: int = 30,
        walk_length: int = 128,
        iterations: int = 10,
        window_size: int = 5,
        learning_rate: float = 0.01,
        return_weight: float = 0.25,
        explore_weight: float = 4.0,
        number_of_workers: Union[int, str] = "auto",
        verbose: bool = False,
        random_state: int = 42,
        ring_bell: bool = False,
        enable_cache: bool = False
    ):
        """Create new wrapper for Node2Vec model from FastNode2Vec library.

        Parameters
        -------------------------
        embedding_size: int = 100
            The dimension of the embedding to compute.
        epochs: int = 100
            The number of epochs to use to train the model for.
        learning_rate: float = 0.01
            Learning rate of the model.
        verbose: bool = False
            Whether to show the loading bar.
        random_state: int = 42
            Random seed to use while training the model
        ring_bell: bool = False,
            Whether to play a sound when embedding completes.
        enable_cache: bool = False
            Whether to enable the cache, that is to
            store the computed embedding.
        """
        self._walk_length = walk_length
        self._iterations = iterations
        self._window_size = window_size
        self._return_weight = return_weight
        self._explore_weight = explore_weight
        self._epochs = epochs
        self._verbose = verbose
        self._learning_rate = learning_rate
        self._time_required_by_last_embedding = None
        if number_of_workers == "auto":
            number_of_workers = cpu_count()
        self._number_of_workers = number_of_workers

        super().__init__(
            embedding_size=embedding_size,
            enable_cache=enable_cache,
            ring_bell=ring_bell,
            random_state=random_state
        )

    @classmethod
    def smoke_test_parameters(cls) -> Dict[str, Any]:
        """Returns parameters for smoke test."""
        return dict(
            embedding_size=5,
            epochs=1,
            window_size=1,
            walk_length=2,
            iterations=1
        )

    def parameters(self) -> Dict[str, Any]:
        return dict(
            **super().parameters(),
            **dict(
                epochs=self._epochs,
                walk_length=self._walk_length,
                window_size=self._window_size,
                iterations=self._iterations,
                return_weight=self._return_weight,
                explore_weight=self._explore_weight,
            )
        )

    @classmethod
    def model_name(cls) -> str:
        """Returns name of the model"""
        return "Node2Vec"

    @classmethod
    def library_name(cls) -> str:
        return "FastNode2Vec"

    @classmethod
    def task_name(cls) -> str:
        return "Node Embedding"

    def get_time_required_by_last_embedding(self) -> float:
        """Returns the time required by last embedding."""
        if self._time_required_by_last_embedding is None:
            raise ValueError("You have not yet run an embedding.")
        return self._time_required_by_last_embedding

    def _fit_transform(
        self,
        graph: Graph,
        return_dataframe: bool = True,
    ) -> Union[np.ndarray, pd.DataFrame, Dict[str, np.ndarray], Dict[str, pd.DataFrame]]:
        """Return node embedding"""

        if graph.has_edge_weights():
            edges_iterator = (
                (
                    *graph.get_node_names_from_edge_id(edge_id),
                    graph.get_edge_weight_from_edge_id(edge_id)
                )
                for edge_id in range(graph.get_number_of_directed_edges())
            )
        else:
            edges_iterator = (
                graph.get_node_names_from_edge_id(edge_id)
                for edge_id in range(graph.get_number_of_directed_edges())
            )

        fn_graph: FNGraph = FNGraph(
            edges_iterator,
            directed=True,
            weighted=graph.has_edge_weights(),
            number_of_edges=graph.get_number_of_directed_edges()
        )

        start = time()

        model: Node2Vec = Node2Vec(
            graph=fn_graph,
            dim=self._embedding_size,
            walk_length=self._walk_length,
            window=self._window_size,
            p=1.0/self._return_weight,
            q=1.0/self._explore_weight,
            workers=self._number_of_workers,
            batch_walks=self._iterations,
            seed=self._random_state,
        )

        model.train(
            epochs=self._epochs * self._iterations,
            verbose=self._verbose
        )

        self._time_required_by_last_embedding = time() - start

        # This library does not provide node embedding
        # for disconnected nodes, so we need to patch it up
        # if the graph contains disconnected nodes.
        if graph.has_singleton_nodes():
            node_embedding = np.zeros(
                shape=(graph.get_number_of_nodes(), self._embedding_size),
            )
            for node_id in range(graph.get_number_of_nodes()):
                node_name = graph.get_node_name_from_node_id(node_id)
                if node_name in model.wv:
                    node_embedding[node_id] = model.wv[node_name]
        else:
            node_embedding = model.wv[graph.get_node_names()]

        if return_dataframe:
            node_embedding = pd.DataFrame(
                node_embedding,
                index=graph.get_node_names()
            )

        return EmbeddingResult(
            embedding_method_name=self.model_name(),
            node_embeddings=node_embedding
        )

    @classmethod
    def requires_nodes_sorted_by_decreasing_node_degree(cls) -> bool:
        return False

    @classmethod
    def is_topological(cls) -> bool:
        return True

    @classmethod
    def can_use_edge_weights(cls) -> bool:
        return True

    @classmethod
    def requires_edge_weights(cls) -> bool:
        return False

    @classmethod
    def can_use_node_types(cls) -> bool:
        """Returns whether the model can use node types."""
        return False

    @classmethod
    def can_use_edge_types(cls) -> bool:
        """Returns whether the model can use edge types."""
        return False

    @classmethod
    def is_stocastic(cls) -> bool:
        """Returns whether the model is stocastic and has therefore a random state."""
        return True