Skip to content

tgb.linkproppred

LinkPropPredDataset

Bases: object

Source code in tgb/linkproppred/dataset.py
class LinkPropPredDataset(object):
    def __init__(
        self,
        name: str,
        root: Optional[str] = "datasets",
        meta_dict: Optional[dict] = None,
        preprocess: Optional[bool] = True,
    ):
        r"""Dataset class for link prediction dataset. Stores meta information about each dataset such as evaluation metrics etc.
        also automatically pre-processes the dataset.
        Args:
            name: name of the dataset
            root: root directory to store the dataset folder
            meta_dict: dictionary containing meta information about the dataset, should contain key 'dir_name' which is the name of the dataset folder
            preprocess: whether to pre-process the dataset
        """
        self.name = name  ## original name
        # check if dataset url exist
        if self.name in DATA_URL_DICT:
            self.url = DATA_URL_DICT[self.name]
        else:
            self.url = None
            print(f"Dataset {self.name} url not found, download not supported yet.")


        # check if the evaluatioin metric are specified
        if self.name in DATA_EVAL_METRIC_DICT:
            self.metric = DATA_EVAL_METRIC_DICT[self.name]
        else:
            self.metric = None
            print(
                f"Dataset {self.name} default evaluation metric not found, it is not supported yet."
            )


        root = PROJ_DIR + root

        if meta_dict is None:
            self.dir_name = "_".join(name.split("-"))  ## replace hyphen with underline
            meta_dict = {"dir_name": self.dir_name}
        else:
            self.dir_name = meta_dict["dir_name"]
        self.root = osp.join(root, self.dir_name)
        self.meta_dict = meta_dict
        if "fname" not in self.meta_dict:
            self.meta_dict["fname"] = self.root + "/" + self.name + "_edgelist.csv"
            self.meta_dict["nodefile"] = None

        if name == "tgbl-flight":
            self.meta_dict["nodefile"] = self.root + "/" + "airport_node_feat.csv"

        self.meta_dict["val_ns"] = self.root + "/" + self.name + "_val_ns.pkl"
        self.meta_dict["test_ns"] = self.root + "/" + self.name + "_test_ns.pkl"

        #! version check
        self.version_passed = True
        self._version_check()

        # initialize
        self._node_feat = None
        self._edge_feat = None
        self._full_data = None
        self._train_data = None
        self._val_data = None
        self._test_data = None

        self.download()
        # check if the root directory exists, if not create it
        if osp.isdir(self.root):
            print("Dataset directory is ", self.root)
        else:
            # os.makedirs(self.root)
            raise FileNotFoundError(f"Directory not found at {self.root}")

        if preprocess:
            self.pre_process()

        self.ns_sampler = NegativeEdgeSampler(
            dataset_name=self.name, strategy="hist_rnd"
        )

    def _version_check(self) -> None:
        r"""Implement Version checks for dataset files
        updates the file names based on the current version number
        prompt the user to download the new version via self.version_passed variable
        """
        if (self.name in DATA_VERSION_DICT):
            version = DATA_VERSION_DICT[self.name]
        else:
            print(f"Dataset {self.name} version number not found.")
            self.version_passed = False
            return None

        if (version > 1):
            #* check if current version is outdated
            self.meta_dict["fname"] = self.root + "/" + self.name + "_edgelist_v" + str(int(version)) + ".csv"
            self.meta_dict["nodefile"] = None
            if self.name == "tgbl-flight":
                self.meta_dict["nodefile"] = self.root + "/" + "airport_node_feat_v" + str(int(version)) + ".csv"
            self.meta_dict["val_ns"] = self.root + "/" + self.name + "_val_ns_v" + str(int(version)) + ".pkl"
            self.meta_dict["test_ns"] = self.root + "/" + self.name + "_test_ns_v" + str(int(version)) + ".pkl"

            if (not osp.exists(self.meta_dict["fname"])):
                print(f"Dataset {self.name} version {int(version)} not found.")
                print(f"Please download the latest version of the dataset.")
                self.version_passed = False
                return None


    def download(self):
        """
        downloads this dataset from url
        check if files are already downloaded
        """
        # check if the file already exists
        if osp.exists(self.meta_dict["fname"]):
            print("raw file found, skipping download")
            return

        inp = input(
            "Will you download the dataset(s) now? (y/N)\n"
        ).lower()  # ask if the user wants to download the dataset

        if inp == "y":
            print(
                f"{BColors.WARNING}Download started, this might take a while . . . {BColors.ENDC}"
            )
            print(f"Dataset title: {self.name}")

            if self.url is None:
                raise Exception("Dataset url not found, download not supported yet.")
            else:
                r = requests.get(self.url, stream=True)
                # download_dir = self.root + "/" + "download"
                if osp.isdir(self.root):
                    print("Dataset directory is ", self.root)
                else:
                    os.makedirs(self.root)

                path_download = self.root + "/" + self.name + ".zip"
                with open(path_download, "wb") as f:
                    total_length = int(r.headers.get("content-length"))
                    for chunk in progress.bar(
                        r.iter_content(chunk_size=1024),
                        expected_size=(total_length / 1024) + 1,
                    ):
                        if chunk:
                            f.write(chunk)
                            f.flush()
                # for unzipping the file
                with zipfile.ZipFile(path_download, "r") as zip_ref:
                    zip_ref.extractall(self.root)
                print(f"{BColors.OKGREEN}Download completed {BColors.ENDC}")
                self.version_passed = True
        else:
            raise Exception(
                BColors.FAIL + "Data not found error, download " + self.name + " failed"
            )

    def generate_processed_files(self) -> pd.DataFrame:
        r"""
        turns raw data .csv file into a pandas data frame, stored on disc if not already
        Returns:
            df: pandas data frame
        """
        node_feat = None
        if not osp.exists(self.meta_dict["fname"]):
            raise FileNotFoundError(f"File not found at {self.meta_dict['fname']}")

        if self.meta_dict["nodefile"] is not None:
            if not osp.exists(self.meta_dict["nodefile"]):
                raise FileNotFoundError(
                    f"File not found at {self.meta_dict['nodefile']}"
                )
        OUT_DF = self.root + "/" + "ml_{}.pkl".format(self.name)
        OUT_EDGE_FEAT = self.root + "/" + "ml_{}.pkl".format(self.name + "_edge")
        if self.meta_dict["nodefile"] is not None:
            OUT_NODE_FEAT = self.root + "/" + "ml_{}.pkl".format(self.name + "_node")

        if (osp.exists(OUT_DF)) and (self.version_passed is True):
            print("loading processed file")
            df = pd.read_pickle(OUT_DF)
            edge_feat = load_pkl(OUT_EDGE_FEAT)
            if self.meta_dict["nodefile"] is not None:
                node_feat = load_pkl(OUT_NODE_FEAT)

        else:
            print("file not processed, generating processed file")
            if self.name == "tgbl-flight":
                df, edge_feat, node_ids = csv_to_pd_data(self.meta_dict["fname"])
            elif self.name == "tgbl-coin":
                df, edge_feat, node_ids = csv_to_pd_data_sc(self.meta_dict["fname"])
            elif self.name == "tgbl-comment":
                df, edge_feat, node_ids = csv_to_pd_data_rc(self.meta_dict["fname"])
            elif self.name == "tgbl-review":
                df, edge_feat, node_ids = csv_to_pd_data_sc(self.meta_dict["fname"])
            elif self.name == "tgbl-wiki":
                df, edge_feat, node_ids = load_edgelist_wiki(self.meta_dict["fname"])

            save_pkl(edge_feat, OUT_EDGE_FEAT)
            df.to_pickle(OUT_DF)
            if self.meta_dict["nodefile"] is not None:
                node_feat = process_node_feat(self.meta_dict["nodefile"], node_ids)
                save_pkl(node_feat, OUT_NODE_FEAT)

        return df, edge_feat, node_feat

    def pre_process(self):
        """
        Pre-process the dataset and generates the splits, must be run before dataset properties can be accessed
        generates the edge data and different train, val, test splits
        """
        # TODO for link prediction, y =1 because these are all true edges, edge feat = weight + edge feat

        # check if path to file is valid
        df, edge_feat, node_feat = self.generate_processed_files()
        sources = np.array(df["u"])
        destinations = np.array(df["i"])
        timestamps = np.array(df["ts"])
        edge_idxs = np.array(df["idx"])
        weights = np.array(df["w"])

        edge_label = np.ones(len(df))  # should be 1 for all pos edges
        self._edge_feat = edge_feat
        self._node_feat = node_feat

        full_data = {
            "sources": sources,
            "destinations": destinations,
            "timestamps": timestamps,
            "edge_idxs": edge_idxs,
            "edge_feat": edge_feat,
            "w": weights,
            "edge_label": edge_label,
        }
        self._full_data = full_data
        _train_mask, _val_mask, _test_mask = self.generate_splits(full_data)
        self._train_mask = _train_mask
        self._val_mask = _val_mask
        self._test_mask = _test_mask

    def generate_splits(
        self,
        full_data: Dict[str, Any],
        val_ratio: float = 0.15,
        test_ratio: float = 0.15,
    ) -> Tuple[Dict[str, Any], Dict[str, Any], Dict[str, Any]]:
        r"""Generates train, validation, and test splits from the full dataset
        Args:
            full_data: dictionary containing the full dataset
            val_ratio: ratio of validation data
            test_ratio: ratio of test data
        Returns:
            train_data: dictionary containing the training dataset
            val_data: dictionary containing the validation dataset
            test_data: dictionary containing the test dataset
        """
        val_time, test_time = list(
            np.quantile(
                full_data["timestamps"],
                [(1 - val_ratio - test_ratio), (1 - test_ratio)],
            )
        )
        timestamps = full_data["timestamps"]

        train_mask = timestamps <= val_time
        val_mask = np.logical_and(timestamps <= test_time, timestamps > val_time)
        test_mask = timestamps > test_time

        return train_mask, val_mask, test_mask

    @property
    def eval_metric(self) -> str:
        """
        the official evaluation metric for the dataset, loaded from info.py
        Returns:
            eval_metric: str, the evaluation metric
        """
        return self.metric

    @property
    def negative_sampler(self) -> NegativeEdgeSampler:
        r"""
        Returns the negative sampler of the dataset, will load negative samples from disc
        Returns:
            negative_sampler: NegativeEdgeSampler
        """
        return self.ns_sampler

    def load_val_ns(self) -> None:
        r"""
        load the negative samples for the validation set
        """
        self.ns_sampler.load_eval_set(
            fname=self.meta_dict["val_ns"], split_mode="val"
        )

    def load_test_ns(self) -> None:
        r"""
        load the negative samples for the test set
        """
        self.ns_sampler.load_eval_set(
            fname=self.meta_dict["test_ns"], split_mode="test"
        )

    @property
    def node_feat(self) -> Optional[np.ndarray]:
        r"""
        Returns the node features of the dataset with dim [N, feat_dim]
        Returns:
            node_feat: np.ndarray, [N, feat_dim] or None if there is no node feature
        """
        return self._node_feat

    @property
    def edge_feat(self) -> Optional[np.ndarray]:
        r"""
        Returns the edge features of the dataset with dim [E, feat_dim]
        Returns:
            edge_feat: np.ndarray, [E, feat_dim] or None if there is no edge feature
        """
        return self._edge_feat

    @property
    def full_data(self) -> Dict[str, Any]:
        r"""
        the full data of the dataset as a dictionary with keys: 'sources', 'destinations', 'timestamps', 'edge_idxs', 'edge_feat', 'w', 'edge_label',

        Returns:
            full_data: Dict[str, Any]
        """
        if self._full_data is None:
            raise ValueError(
                "dataset has not been processed yet, please call pre_process() first"
            )
        return self._full_data

    @property
    def train_mask(self) -> np.ndarray:
        r"""
        Returns the train mask of the dataset
        Returns:
            train_mask: training masks
        """
        if self._train_mask is None:
            raise ValueError("training split hasn't been loaded")
        return self._train_mask

    @property
    def val_mask(self) -> np.ndarray:
        r"""
        Returns the validation mask of the dataset
        Returns:
            val_mask: Dict[str, Any]
        """
        if self._val_mask is None:
            raise ValueError("validation split hasn't been loaded")
        return self._val_mask

    @property
    def test_mask(self) -> np.ndarray:
        r"""
        Returns the test mask of the dataset:
        Returns:
            test_mask: Dict[str, Any]
        """
        if self._test_mask is None:
            raise ValueError("test split hasn't been loaded")
        return self._test_mask

edge_feat: Optional[np.ndarray] property

Returns the edge features of the dataset with dim [E, feat_dim] Returns: edge_feat: np.ndarray, [E, feat_dim] or None if there is no edge feature

eval_metric: str property

the official evaluation metric for the dataset, loaded from info.py Returns: eval_metric: str, the evaluation metric

full_data: Dict[str, Any] property

the full data of the dataset as a dictionary with keys: 'sources', 'destinations', 'timestamps', 'edge_idxs', 'edge_feat', 'w', 'edge_label',

Returns:

Name Type Description
full_data Dict[str, Any]

Dict[str, Any]

negative_sampler: NegativeEdgeSampler property

Returns the negative sampler of the dataset, will load negative samples from disc Returns: negative_sampler: NegativeEdgeSampler

node_feat: Optional[np.ndarray] property

Returns the node features of the dataset with dim [N, feat_dim] Returns: node_feat: np.ndarray, [N, feat_dim] or None if there is no node feature

test_mask: np.ndarray property

Returns the test mask of the dataset: Returns: test_mask: Dict[str, Any]

train_mask: np.ndarray property

Returns the train mask of the dataset Returns: train_mask: training masks

val_mask: np.ndarray property

Returns the validation mask of the dataset Returns: val_mask: Dict[str, Any]

__init__(name, root='datasets', meta_dict=None, preprocess=True)

Dataset class for link prediction dataset. Stores meta information about each dataset such as evaluation metrics etc. also automatically pre-processes the dataset. Args: name: name of the dataset root: root directory to store the dataset folder meta_dict: dictionary containing meta information about the dataset, should contain key 'dir_name' which is the name of the dataset folder preprocess: whether to pre-process the dataset

Source code in tgb/linkproppred/dataset.py
def __init__(
    self,
    name: str,
    root: Optional[str] = "datasets",
    meta_dict: Optional[dict] = None,
    preprocess: Optional[bool] = True,
):
    r"""Dataset class for link prediction dataset. Stores meta information about each dataset such as evaluation metrics etc.
    also automatically pre-processes the dataset.
    Args:
        name: name of the dataset
        root: root directory to store the dataset folder
        meta_dict: dictionary containing meta information about the dataset, should contain key 'dir_name' which is the name of the dataset folder
        preprocess: whether to pre-process the dataset
    """
    self.name = name  ## original name
    # check if dataset url exist
    if self.name in DATA_URL_DICT:
        self.url = DATA_URL_DICT[self.name]
    else:
        self.url = None
        print(f"Dataset {self.name} url not found, download not supported yet.")


    # check if the evaluatioin metric are specified
    if self.name in DATA_EVAL_METRIC_DICT:
        self.metric = DATA_EVAL_METRIC_DICT[self.name]
    else:
        self.metric = None
        print(
            f"Dataset {self.name} default evaluation metric not found, it is not supported yet."
        )


    root = PROJ_DIR + root

    if meta_dict is None:
        self.dir_name = "_".join(name.split("-"))  ## replace hyphen with underline
        meta_dict = {"dir_name": self.dir_name}
    else:
        self.dir_name = meta_dict["dir_name"]
    self.root = osp.join(root, self.dir_name)
    self.meta_dict = meta_dict
    if "fname" not in self.meta_dict:
        self.meta_dict["fname"] = self.root + "/" + self.name + "_edgelist.csv"
        self.meta_dict["nodefile"] = None

    if name == "tgbl-flight":
        self.meta_dict["nodefile"] = self.root + "/" + "airport_node_feat.csv"

    self.meta_dict["val_ns"] = self.root + "/" + self.name + "_val_ns.pkl"
    self.meta_dict["test_ns"] = self.root + "/" + self.name + "_test_ns.pkl"

    #! version check
    self.version_passed = True
    self._version_check()

    # initialize
    self._node_feat = None
    self._edge_feat = None
    self._full_data = None
    self._train_data = None
    self._val_data = None
    self._test_data = None

    self.download()
    # check if the root directory exists, if not create it
    if osp.isdir(self.root):
        print("Dataset directory is ", self.root)
    else:
        # os.makedirs(self.root)
        raise FileNotFoundError(f"Directory not found at {self.root}")

    if preprocess:
        self.pre_process()

    self.ns_sampler = NegativeEdgeSampler(
        dataset_name=self.name, strategy="hist_rnd"
    )

download()

downloads this dataset from url check if files are already downloaded

Source code in tgb/linkproppred/dataset.py
def download(self):
    """
    downloads this dataset from url
    check if files are already downloaded
    """
    # check if the file already exists
    if osp.exists(self.meta_dict["fname"]):
        print("raw file found, skipping download")
        return

    inp = input(
        "Will you download the dataset(s) now? (y/N)\n"
    ).lower()  # ask if the user wants to download the dataset

    if inp == "y":
        print(
            f"{BColors.WARNING}Download started, this might take a while . . . {BColors.ENDC}"
        )
        print(f"Dataset title: {self.name}")

        if self.url is None:
            raise Exception("Dataset url not found, download not supported yet.")
        else:
            r = requests.get(self.url, stream=True)
            # download_dir = self.root + "/" + "download"
            if osp.isdir(self.root):
                print("Dataset directory is ", self.root)
            else:
                os.makedirs(self.root)

            path_download = self.root + "/" + self.name + ".zip"
            with open(path_download, "wb") as f:
                total_length = int(r.headers.get("content-length"))
                for chunk in progress.bar(
                    r.iter_content(chunk_size=1024),
                    expected_size=(total_length / 1024) + 1,
                ):
                    if chunk:
                        f.write(chunk)
                        f.flush()
            # for unzipping the file
            with zipfile.ZipFile(path_download, "r") as zip_ref:
                zip_ref.extractall(self.root)
            print(f"{BColors.OKGREEN}Download completed {BColors.ENDC}")
            self.version_passed = True
    else:
        raise Exception(
            BColors.FAIL + "Data not found error, download " + self.name + " failed"
        )

generate_processed_files()

turns raw data .csv file into a pandas data frame, stored on disc if not already Returns: df: pandas data frame

Source code in tgb/linkproppred/dataset.py
def generate_processed_files(self) -> pd.DataFrame:
    r"""
    turns raw data .csv file into a pandas data frame, stored on disc if not already
    Returns:
        df: pandas data frame
    """
    node_feat = None
    if not osp.exists(self.meta_dict["fname"]):
        raise FileNotFoundError(f"File not found at {self.meta_dict['fname']}")

    if self.meta_dict["nodefile"] is not None:
        if not osp.exists(self.meta_dict["nodefile"]):
            raise FileNotFoundError(
                f"File not found at {self.meta_dict['nodefile']}"
            )
    OUT_DF = self.root + "/" + "ml_{}.pkl".format(self.name)
    OUT_EDGE_FEAT = self.root + "/" + "ml_{}.pkl".format(self.name + "_edge")
    if self.meta_dict["nodefile"] is not None:
        OUT_NODE_FEAT = self.root + "/" + "ml_{}.pkl".format(self.name + "_node")

    if (osp.exists(OUT_DF)) and (self.version_passed is True):
        print("loading processed file")
        df = pd.read_pickle(OUT_DF)
        edge_feat = load_pkl(OUT_EDGE_FEAT)
        if self.meta_dict["nodefile"] is not None:
            node_feat = load_pkl(OUT_NODE_FEAT)

    else:
        print("file not processed, generating processed file")
        if self.name == "tgbl-flight":
            df, edge_feat, node_ids = csv_to_pd_data(self.meta_dict["fname"])
        elif self.name == "tgbl-coin":
            df, edge_feat, node_ids = csv_to_pd_data_sc(self.meta_dict["fname"])
        elif self.name == "tgbl-comment":
            df, edge_feat, node_ids = csv_to_pd_data_rc(self.meta_dict["fname"])
        elif self.name == "tgbl-review":
            df, edge_feat, node_ids = csv_to_pd_data_sc(self.meta_dict["fname"])
        elif self.name == "tgbl-wiki":
            df, edge_feat, node_ids = load_edgelist_wiki(self.meta_dict["fname"])

        save_pkl(edge_feat, OUT_EDGE_FEAT)
        df.to_pickle(OUT_DF)
        if self.meta_dict["nodefile"] is not None:
            node_feat = process_node_feat(self.meta_dict["nodefile"], node_ids)
            save_pkl(node_feat, OUT_NODE_FEAT)

    return df, edge_feat, node_feat

generate_splits(full_data, val_ratio=0.15, test_ratio=0.15)

Generates train, validation, and test splits from the full dataset Args: full_data: dictionary containing the full dataset val_ratio: ratio of validation data test_ratio: ratio of test data Returns: train_data: dictionary containing the training dataset val_data: dictionary containing the validation dataset test_data: dictionary containing the test dataset

Source code in tgb/linkproppred/dataset.py
def generate_splits(
    self,
    full_data: Dict[str, Any],
    val_ratio: float = 0.15,
    test_ratio: float = 0.15,
) -> Tuple[Dict[str, Any], Dict[str, Any], Dict[str, Any]]:
    r"""Generates train, validation, and test splits from the full dataset
    Args:
        full_data: dictionary containing the full dataset
        val_ratio: ratio of validation data
        test_ratio: ratio of test data
    Returns:
        train_data: dictionary containing the training dataset
        val_data: dictionary containing the validation dataset
        test_data: dictionary containing the test dataset
    """
    val_time, test_time = list(
        np.quantile(
            full_data["timestamps"],
            [(1 - val_ratio - test_ratio), (1 - test_ratio)],
        )
    )
    timestamps = full_data["timestamps"]

    train_mask = timestamps <= val_time
    val_mask = np.logical_and(timestamps <= test_time, timestamps > val_time)
    test_mask = timestamps > test_time

    return train_mask, val_mask, test_mask

load_test_ns()

load the negative samples for the test set

Source code in tgb/linkproppred/dataset.py
def load_test_ns(self) -> None:
    r"""
    load the negative samples for the test set
    """
    self.ns_sampler.load_eval_set(
        fname=self.meta_dict["test_ns"], split_mode="test"
    )

load_val_ns()

load the negative samples for the validation set

Source code in tgb/linkproppred/dataset.py
def load_val_ns(self) -> None:
    r"""
    load the negative samples for the validation set
    """
    self.ns_sampler.load_eval_set(
        fname=self.meta_dict["val_ns"], split_mode="val"
    )

pre_process()

Pre-process the dataset and generates the splits, must be run before dataset properties can be accessed generates the edge data and different train, val, test splits

Source code in tgb/linkproppred/dataset.py
def pre_process(self):
    """
    Pre-process the dataset and generates the splits, must be run before dataset properties can be accessed
    generates the edge data and different train, val, test splits
    """
    # TODO for link prediction, y =1 because these are all true edges, edge feat = weight + edge feat

    # check if path to file is valid
    df, edge_feat, node_feat = self.generate_processed_files()
    sources = np.array(df["u"])
    destinations = np.array(df["i"])
    timestamps = np.array(df["ts"])
    edge_idxs = np.array(df["idx"])
    weights = np.array(df["w"])

    edge_label = np.ones(len(df))  # should be 1 for all pos edges
    self._edge_feat = edge_feat
    self._node_feat = node_feat

    full_data = {
        "sources": sources,
        "destinations": destinations,
        "timestamps": timestamps,
        "edge_idxs": edge_idxs,
        "edge_feat": edge_feat,
        "w": weights,
        "edge_label": edge_label,
    }
    self._full_data = full_data
    _train_mask, _val_mask, _test_mask = self.generate_splits(full_data)
    self._train_mask = _train_mask
    self._val_mask = _val_mask
    self._test_mask = _test_mask

PyGLinkPropPredDataset

Bases: Dataset

Source code in tgb/linkproppred/dataset_pyg.py
class PyGLinkPropPredDataset(Dataset):
    def __init__(
        self,
        name: str,
        root: str,
        transform: Optional[Callable] = None,
        pre_transform: Optional[Callable] = None,
    ):
        r"""
        PyG wrapper for the LinkPropPredDataset
        can return pytorch tensors for src,dst,t,msg,label
        can return Temporal Data object
        Parameters:
            name: name of the dataset, passed to `LinkPropPredDataset`
            root (string): Root directory where the dataset should be saved, passed to `LinkPropPredDataset`
            transform (callable, optional): A function/transform that takes in an, not used in this case
            pre_transform (callable, optional): A function/transform that takes in, not used in this case
        """
        self.name = name
        self.root = root
        self.dataset = LinkPropPredDataset(name=name, root=root)
        self._train_mask = torch.from_numpy(self.dataset.train_mask)
        self._val_mask = torch.from_numpy(self.dataset.val_mask)
        self._test_mask = torch.from_numpy(self.dataset.test_mask)
        super().__init__(root, transform, pre_transform)
        self._node_feat = self.dataset.node_feat

        if self._node_feat is None:
            self._node_feat = None
        else:
            self._node_feat = torch.from_numpy(self._node_feat).float()
        self.process_data()

        self._ns_sampler = self.dataset.negative_sampler

    @property
    def eval_metric(self) -> str:
        """
        the official evaluation metric for the dataset, loaded from info.py
        Returns:
            eval_metric: str, the evaluation metric
        """
        return self.dataset.eval_metric

    @property
    def negative_sampler(self) -> NegativeEdgeSampler:
        r"""
        Returns the negative sampler of the dataset, will load negative samples from disc
        Returns:
            negative_sampler: NegativeEdgeSampler
        """
        return self._ns_sampler

    def load_val_ns(self) -> None:
        r"""
        load the negative samples for the validation set
        """
        self.dataset.load_val_ns()

    def load_test_ns(self) -> None:
        r"""
        load the negative samples for the test set
        """
        self.dataset.load_test_ns()

    @property
    def train_mask(self) -> torch.Tensor:
        r"""
        Returns the train mask of the dataset
        Returns:
            train_mask: the mask for edges in the training set
        """
        if self._train_mask is None:
            raise ValueError("training split hasn't been loaded")
        return self._train_mask

    @property
    def val_mask(self) -> torch.Tensor:
        r"""
        Returns the validation mask of the dataset
        Returns:
            val_mask: the mask for edges in the validation set
        """
        if self._val_mask is None:
            raise ValueError("validation split hasn't been loaded")
        return self._val_mask

    @property
    def test_mask(self) -> torch.Tensor:
        r"""
        Returns the test mask of the dataset:
        Returns:
            test_mask: the mask for edges in the test set
        """
        if self._test_mask is None:
            raise ValueError("test split hasn't been loaded")
        return self._test_mask

    @property
    def node_feat(self) -> torch.Tensor:
        r"""
        Returns the node features of the dataset
        Returns:
            node_feat: the node features
        """
        return self._node_feat

    @property
    def src(self) -> torch.Tensor:
        r"""
        Returns the source nodes of the dataset
        Returns:
            src: the idx of the source nodes
        """
        return self._src

    @property
    def dst(self) -> torch.Tensor:
        r"""
        Returns the destination nodes of the dataset
        Returns:
            dst: the idx of the destination nodes
        """
        return self._dst

    @property
    def ts(self) -> torch.Tensor:
        r"""
        Returns the timestamps of the dataset
        Returns:
            ts: the timestamps of the edges
        """
        return self._ts

    @property
    def edge_feat(self) -> torch.Tensor:
        r"""
        Returns the edge features of the dataset
        Returns:
            edge_feat: the edge features
        """
        return self._edge_feat

    @property
    def edge_label(self) -> torch.Tensor:
        r"""
        Returns the edge labels of the dataset
        Returns:
            edge_label: the labels of the edges
        """
        return self._edge_label

    def process_data(self) -> None:
        r"""
        convert the numpy arrays from dataset to pytorch tensors
        """
        src = torch.from_numpy(self.dataset.full_data["sources"])
        dst = torch.from_numpy(self.dataset.full_data["destinations"])
        ts = torch.from_numpy(self.dataset.full_data["timestamps"])
        msg = torch.from_numpy(
            self.dataset.full_data["edge_feat"]
        )  # use edge features here if available
        edge_label = torch.from_numpy(
            self.dataset.full_data["edge_label"]
        )  # this is the label indicating if an edge is a true edge, always 1 for true edges

        # * first check typing for all tensors
        # source tensor must be of type int64
        # warnings.warn("sources tensor is not of type int64 or int32, forcing conversion")
        if src.dtype != torch.int64:
            src = src.long()

        # destination tensor must be of type int64
        if dst.dtype != torch.int64:
            dst = dst.long()

        # timestamp tensor must be of type int64
        if ts.dtype != torch.int64:
            ts = ts.long()

        # message tensor must be of type float32
        if msg.dtype != torch.float32:
            msg = msg.float()

        self._src = src
        self._dst = dst
        self._ts = ts
        self._edge_label = edge_label
        self._edge_feat = msg

    def get_TemporalData(self) -> TemporalData:
        """
        return the TemporalData object for the entire dataset
        """
        data = TemporalData(
            src=self._src,
            dst=self._dst,
            t=self._ts,
            msg=self._edge_feat,
            y=self._edge_label,
        )
        return data

    def len(self) -> int:
        """
        size of the dataset
        Returns:
            size: int
        """
        return self._src.shape[0]

    def get(self, idx: int) -> TemporalData:
        """
        construct temporal data object for a single edge
        Parameters:
            idx: index of the edge
        Returns:
            data: TemporalData object
        """
        data = TemporalData(
            src=self._src[idx],
            dst=self._dst[idx],
            t=self._ts[idx],
            msg=self._edge_feat[idx],
            y=self._edge_label[idx],
        )
        return data

    def __repr__(self) -> str:
        return f"{self.name.capitalize()}()"

dst: torch.Tensor property

Returns the destination nodes of the dataset Returns: dst: the idx of the destination nodes

edge_feat: torch.Tensor property

Returns the edge features of the dataset Returns: edge_feat: the edge features

edge_label: torch.Tensor property

Returns the edge labels of the dataset Returns: edge_label: the labels of the edges

eval_metric: str property

the official evaluation metric for the dataset, loaded from info.py Returns: eval_metric: str, the evaluation metric

negative_sampler: NegativeEdgeSampler property

Returns the negative sampler of the dataset, will load negative samples from disc Returns: negative_sampler: NegativeEdgeSampler

node_feat: torch.Tensor property

Returns the node features of the dataset Returns: node_feat: the node features

src: torch.Tensor property

Returns the source nodes of the dataset Returns: src: the idx of the source nodes

test_mask: torch.Tensor property

Returns the test mask of the dataset: Returns: test_mask: the mask for edges in the test set

train_mask: torch.Tensor property

Returns the train mask of the dataset Returns: train_mask: the mask for edges in the training set

ts: torch.Tensor property

Returns the timestamps of the dataset Returns: ts: the timestamps of the edges

val_mask: torch.Tensor property

Returns the validation mask of the dataset Returns: val_mask: the mask for edges in the validation set

__init__(name, root, transform=None, pre_transform=None)

PyG wrapper for the LinkPropPredDataset can return pytorch tensors for src,dst,t,msg,label can return Temporal Data object Parameters: name: name of the dataset, passed to LinkPropPredDataset root (string): Root directory where the dataset should be saved, passed to LinkPropPredDataset transform (callable, optional): A function/transform that takes in an, not used in this case pre_transform (callable, optional): A function/transform that takes in, not used in this case

Source code in tgb/linkproppred/dataset_pyg.py
def __init__(
    self,
    name: str,
    root: str,
    transform: Optional[Callable] = None,
    pre_transform: Optional[Callable] = None,
):
    r"""
    PyG wrapper for the LinkPropPredDataset
    can return pytorch tensors for src,dst,t,msg,label
    can return Temporal Data object
    Parameters:
        name: name of the dataset, passed to `LinkPropPredDataset`
        root (string): Root directory where the dataset should be saved, passed to `LinkPropPredDataset`
        transform (callable, optional): A function/transform that takes in an, not used in this case
        pre_transform (callable, optional): A function/transform that takes in, not used in this case
    """
    self.name = name
    self.root = root
    self.dataset = LinkPropPredDataset(name=name, root=root)
    self._train_mask = torch.from_numpy(self.dataset.train_mask)
    self._val_mask = torch.from_numpy(self.dataset.val_mask)
    self._test_mask = torch.from_numpy(self.dataset.test_mask)
    super().__init__(root, transform, pre_transform)
    self._node_feat = self.dataset.node_feat

    if self._node_feat is None:
        self._node_feat = None
    else:
        self._node_feat = torch.from_numpy(self._node_feat).float()
    self.process_data()

    self._ns_sampler = self.dataset.negative_sampler

get(idx)

construct temporal data object for a single edge Parameters: idx: index of the edge Returns: data: TemporalData object

Source code in tgb/linkproppred/dataset_pyg.py
def get(self, idx: int) -> TemporalData:
    """
    construct temporal data object for a single edge
    Parameters:
        idx: index of the edge
    Returns:
        data: TemporalData object
    """
    data = TemporalData(
        src=self._src[idx],
        dst=self._dst[idx],
        t=self._ts[idx],
        msg=self._edge_feat[idx],
        y=self._edge_label[idx],
    )
    return data

get_TemporalData()

return the TemporalData object for the entire dataset

Source code in tgb/linkproppred/dataset_pyg.py
def get_TemporalData(self) -> TemporalData:
    """
    return the TemporalData object for the entire dataset
    """
    data = TemporalData(
        src=self._src,
        dst=self._dst,
        t=self._ts,
        msg=self._edge_feat,
        y=self._edge_label,
    )
    return data

len()

size of the dataset Returns: size: int

Source code in tgb/linkproppred/dataset_pyg.py
def len(self) -> int:
    """
    size of the dataset
    Returns:
        size: int
    """
    return self._src.shape[0]

load_test_ns()

load the negative samples for the test set

Source code in tgb/linkproppred/dataset_pyg.py
def load_test_ns(self) -> None:
    r"""
    load the negative samples for the test set
    """
    self.dataset.load_test_ns()

load_val_ns()

load the negative samples for the validation set

Source code in tgb/linkproppred/dataset_pyg.py
def load_val_ns(self) -> None:
    r"""
    load the negative samples for the validation set
    """
    self.dataset.load_val_ns()

process_data()

convert the numpy arrays from dataset to pytorch tensors

Source code in tgb/linkproppred/dataset_pyg.py
def process_data(self) -> None:
    r"""
    convert the numpy arrays from dataset to pytorch tensors
    """
    src = torch.from_numpy(self.dataset.full_data["sources"])
    dst = torch.from_numpy(self.dataset.full_data["destinations"])
    ts = torch.from_numpy(self.dataset.full_data["timestamps"])
    msg = torch.from_numpy(
        self.dataset.full_data["edge_feat"]
    )  # use edge features here if available
    edge_label = torch.from_numpy(
        self.dataset.full_data["edge_label"]
    )  # this is the label indicating if an edge is a true edge, always 1 for true edges

    # * first check typing for all tensors
    # source tensor must be of type int64
    # warnings.warn("sources tensor is not of type int64 or int32, forcing conversion")
    if src.dtype != torch.int64:
        src = src.long()

    # destination tensor must be of type int64
    if dst.dtype != torch.int64:
        dst = dst.long()

    # timestamp tensor must be of type int64
    if ts.dtype != torch.int64:
        ts = ts.long()

    # message tensor must be of type float32
    if msg.dtype != torch.float32:
        msg = msg.float()

    self._src = src
    self._dst = dst
    self._ts = ts
    self._edge_label = edge_label
    self._edge_feat = msg

Evaluator Module for Dynamic Link Prediction

Evaluator

Bases: object

Evaluator for Link Property Prediction

Source code in tgb/linkproppred/evaluate.py
class Evaluator(object):
    r"""Evaluator for Link Property Prediction """

    def __init__(self, name: str, k_value: int = 10):
        r"""
        Parameters:
            name: name of the dataset
            k_value: the desired 'k' value for calculating metric@k
        """
        self.name = name
        self.k_value = k_value  # for computing `hits@k`
        self.valid_metric_list = ['hits@', 'mrr']
        if self.name not in DATA_EVAL_METRIC_DICT:
            raise NotImplementedError("Dataset not supported")

    def _parse_and_check_input(self, input_dict):
        r"""
        Check whether the input has the appropriate format
        Parametrers:
            input_dict: a dictionary containing "y_pred_pos", "y_pred_neg", and "eval_metric"
            note: "eval_metric" should be a list including one or more of the followin metrics: ["hits@", "mrr"]
        Returns:
            y_pred_pos: positive predicted scores
            y_pred_neg: negative predicted scores
        """

        if "eval_metric" not in input_dict:
            raise RuntimeError("Missing key of eval_metric!")

        for eval_metric in input_dict["eval_metric"]:
            if eval_metric in self.valid_metric_list:
                if "y_pred_pos" not in input_dict:
                    raise RuntimeError("Missing key of y_true")
                if "y_pred_neg" not in input_dict:
                    raise RuntimeError("Missing key of y_pred")

                y_pred_pos, y_pred_neg = input_dict["y_pred_pos"], input_dict["y_pred_neg"]

                # converting to numpy on cpu
                if torch is not None and isinstance(y_pred_pos, torch.Tensor):
                    y_pred_pos = y_pred_pos.detach().cpu().numpy()
                if torch is not None and isinstance(y_pred_neg, torch.Tensor):
                    y_pred_neg = y_pred_neg.detach().cpu().numpy()

                # check type and shape
                if not isinstance(y_pred_pos, np.ndarray) or not isinstance(y_pred_neg, np.ndarray):
                    raise RuntimeError(
                        "Arguments to Evaluator need to be either numpy ndarray or torch tensor!"
                    )
            else:
                print(
                    "ERROR: The evaluation metric should be in:", self.valid_metric_list
                )
                raise ValueError("Unsupported eval metric %s " % (eval_metric))
        self.eval_metric = input_dict["eval_metric"]

        return y_pred_pos, y_pred_neg

    def _eval_hits_and_mrr(self, y_pred_pos, y_pred_neg, type_info, k_value):
        r"""
        compute hist@k and mrr
        reference:
            - https://github.com/snap-stanford/ogb/blob/d5c11d91c9e1c22ed090a2e0bbda3fe357de66e7/ogb/linkproppred/evaluate.py#L214

        Parameters:
            y_pred_pos: positive predicted scores
            y_pred_neg: negative predicted scores
            type_info: type of the predicted scores; could be 'torch' or 'numpy'
            k_value: the desired 'k' value for calculating metric@k

        Returns:
            a dictionary containing the computed performance metrics
        """
        if type_info == 'torch':
            # calculate ranks
            y_pred_pos = y_pred_pos.view(-1, 1)
            # optimistic rank: "how many negatives have a larger score than the positive?"
            # ~> the positive is ranked first among those with equal score
            optimistic_rank = (y_pred_neg > y_pred_pos).sum(dim=1)
            # pessimistic rank: "how many negatives have at least the positive score?"
            # ~> the positive is ranked last among those with equal score
            pessimistic_rank = (y_pred_neg >= y_pred_pos).sum(dim=1)
            ranking_list = 0.5 * (optimistic_rank + pessimistic_rank) + 1
            hitsK_list = (ranking_list <= k_value).to(torch.float)
            mrr_list = 1./ranking_list.to(torch.float)

            return {
                    f'hits@{k_value}': hitsK_list.mean(),
                    'mrr': mrr_list.mean()
                    }

        else:
            y_pred_pos = y_pred_pos.reshape(-1, 1)
            optimistic_rank = (y_pred_neg >= y_pred_pos).sum(axis=1)
            pessimistic_rank = (y_pred_neg > y_pred_pos).sum(axis=1)
            ranking_list = 0.5 * (optimistic_rank + pessimistic_rank) + 1
            hitsK_list = (ranking_list <= k_value).astype(np.float32)
            mrr_list = 1./ranking_list.astype(np.float32)

            return {
                    f'hits@{k_value}': hitsK_list.mean(),
                    'mrr': mrr_list.mean()
                    }

    def eval(self, 
             input_dict: dict, 
             verbose: bool = False) -> dict:
        r"""
        evaluate the link prediction task
        this method is callable through an instance of this object to compute the metric

        Parameters:
            input_dict: a dictionary containing "y_pred_pos", "y_pred_neg", and "eval_metric"
                        the performance metric is calculated for the provided scores
            verbose: whether to print out the computed metric

        Returns:
            perf_dict: a dictionary containing the computed performance metric
        """
        y_pred_pos, y_pred_neg = self._parse_and_check_input(input_dict)  # convert the predictions to numpy
        perf_dict = self._eval_hits_and_mrr(y_pred_pos, y_pred_neg, type_info='numpy', k_value=self.k_value)

        return perf_dict

__init__(name, k_value=10)

Parameters:

Name Type Description Default
name str

name of the dataset

required
k_value int

the desired 'k' value for calculating metric@k

10
Source code in tgb/linkproppred/evaluate.py
def __init__(self, name: str, k_value: int = 10):
    r"""
    Parameters:
        name: name of the dataset
        k_value: the desired 'k' value for calculating metric@k
    """
    self.name = name
    self.k_value = k_value  # for computing `hits@k`
    self.valid_metric_list = ['hits@', 'mrr']
    if self.name not in DATA_EVAL_METRIC_DICT:
        raise NotImplementedError("Dataset not supported")

eval(input_dict, verbose=False)

evaluate the link prediction task this method is callable through an instance of this object to compute the metric

Parameters:

Name Type Description Default
input_dict dict

a dictionary containing "y_pred_pos", "y_pred_neg", and "eval_metric" the performance metric is calculated for the provided scores

required
verbose bool

whether to print out the computed metric

False

Returns:

Name Type Description
perf_dict dict

a dictionary containing the computed performance metric

Source code in tgb/linkproppred/evaluate.py
def eval(self, 
         input_dict: dict, 
         verbose: bool = False) -> dict:
    r"""
    evaluate the link prediction task
    this method is callable through an instance of this object to compute the metric

    Parameters:
        input_dict: a dictionary containing "y_pred_pos", "y_pred_neg", and "eval_metric"
                    the performance metric is calculated for the provided scores
        verbose: whether to print out the computed metric

    Returns:
        perf_dict: a dictionary containing the computed performance metric
    """
    y_pred_pos, y_pred_neg = self._parse_and_check_input(input_dict)  # convert the predictions to numpy
    perf_dict = self._eval_hits_and_mrr(y_pred_pos, y_pred_neg, type_info='numpy', k_value=self.k_value)

    return perf_dict

Sample negative edges for evaluation of dynamic link prediction Load already generated negative edges from file, batch them based on the positive edge, and return the evaluation set

NegativeEdgeSampler

Bases: object

Source code in tgb/linkproppred/negative_sampler.py
class NegativeEdgeSampler(object):
    def __init__(
        self,
        dataset_name: str,
        strategy: str = "hist_rnd",
    ) -> None:
        r"""
        Negative Edge Sampler
            Loads and query the negative batches based on the positive batches provided.
        constructor for the negative edge sampler class

        Parameters:
            dataset_name: name of the dataset
            strategy: specifies which set of negatives should be loaded;
                    can be 'rnd' or 'hist_rnd'

        Returns:
            None
        """
        self.dataset_name = dataset_name
        assert strategy in [
            "rnd",
            "hist_rnd",
        ], "The supported strategies are `rnd` or `hist_rnd`!"
        self.strategy = strategy
        self.eval_set = {}

    def load_eval_set(
        self,
        fname: str,
        split_mode: str = "val",
    ) -> None:
        r"""
        Load the evaluation set from disk, can be either val or test set ns samples
        Parameters:
            fname: the file name of the evaluation ns on disk
            split_mode: the split mode of the evaluation set, can be either `val` or `test`

        Returns:
            None
        """
        assert split_mode in [
            "val",
            "test",
        ], "Invalid split-mode! It should be `val`, `test`"
        if not os.path.exists(fname):
            raise FileNotFoundError(f"File not found at {fname}")
        self.eval_set[split_mode] = load_pkl(fname)

    def reset_eval_set(self, 
                       split_mode: str = "test",
                       ) -> None:
        r"""
        Reset evaluation set

        Parameters:
            split_mode: specifies whether to generate negative edges for 'validation' or 'test' splits

        Returns:
            None
        """
        assert split_mode in [
            "val",
            "test",
        ], "Invalid split-mode! It should be `val`, `test`!"
        self.eval_set[split_mode] = None

    def query_batch(self, 
                    pos_src: Tensor, 
                    pos_dst: Tensor, 
                    pos_timestamp: Tensor, 
                    split_mode: str = "test") -> list:
        r"""
        For each positive edge in the `pos_batch`, return a list of negative edges
        `split_mode` specifies whether the valiation or test evaluation set should be retrieved.

        Parameters:
            pos_src: list of positive source nodes
            pos_dst: list of positive destination nodes
            pos_timestamp: list of timestamps of the positive edges
            split_mode: specifies whether to generate negative edges for 'validation' or 'test' splits

        Returns:
            neg_samples: a list of list; each internal list contains the set of negative edges that
                        should be evaluated against each positive edge.
        """
        assert split_mode in [
            "val",
            "test",
        ], "Invalid split-mode! It should be `val`, `test`!"
        if self.eval_set[split_mode] == None:
            raise ValueError(
                f"Evaluation set is None! You should load the {split_mode} evaluation set first!"
            )

        # check the argument types...
        if torch is not None and isinstance(pos_src, torch.Tensor):
            pos_src = pos_src.detach().cpu().numpy()
        if torch is not None and isinstance(pos_dst, torch.Tensor):
            pos_dst = pos_dst.detach().cpu().numpy()
        if torch is not None and isinstance(pos_timestamp, torch.Tensor):
            pos_timestamp = pos_timestamp.detach().cpu().numpy()

        if not isinstance(pos_src, np.ndarray) or not isinstance(pos_dst, np.ndarray) or not(pos_timestamp, np.ndarray):
            raise RuntimeError(
                "pos_src, pos_dst, and pos_timestamp need to be either numpy ndarray or torch tensor!"
                )

        neg_samples = []
        for pos_s, pos_d, pos_t in zip(pos_src, pos_dst, pos_timestamp):
            if (pos_s, pos_d, pos_t) not in self.eval_set[split_mode]:
                raise ValueError(
                    f"The edge ({pos_s}, {pos_d}, {pos_t}) is not in the '{split_mode}' evaluation set! Please check the implementation."
                )
            else:
                neg_samples.append(
                    [
                        int(neg_dst)
                        for neg_dst in self.eval_set[split_mode][(pos_s, pos_d, pos_t)]
                    ]
                )

        return neg_samples

__init__(dataset_name, strategy='hist_rnd')

Negative Edge Sampler Loads and query the negative batches based on the positive batches provided. constructor for the negative edge sampler class

Parameters:

Name Type Description Default
dataset_name str

name of the dataset

required
strategy str

specifies which set of negatives should be loaded; can be 'rnd' or 'hist_rnd'

'hist_rnd'

Returns:

Type Description
None

None

Source code in tgb/linkproppred/negative_sampler.py
def __init__(
    self,
    dataset_name: str,
    strategy: str = "hist_rnd",
) -> None:
    r"""
    Negative Edge Sampler
        Loads and query the negative batches based on the positive batches provided.
    constructor for the negative edge sampler class

    Parameters:
        dataset_name: name of the dataset
        strategy: specifies which set of negatives should be loaded;
                can be 'rnd' or 'hist_rnd'

    Returns:
        None
    """
    self.dataset_name = dataset_name
    assert strategy in [
        "rnd",
        "hist_rnd",
    ], "The supported strategies are `rnd` or `hist_rnd`!"
    self.strategy = strategy
    self.eval_set = {}

load_eval_set(fname, split_mode='val')

Load the evaluation set from disk, can be either val or test set ns samples Parameters: fname: the file name of the evaluation ns on disk split_mode: the split mode of the evaluation set, can be either val or test

Returns:

Type Description
None

None

Source code in tgb/linkproppred/negative_sampler.py
def load_eval_set(
    self,
    fname: str,
    split_mode: str = "val",
) -> None:
    r"""
    Load the evaluation set from disk, can be either val or test set ns samples
    Parameters:
        fname: the file name of the evaluation ns on disk
        split_mode: the split mode of the evaluation set, can be either `val` or `test`

    Returns:
        None
    """
    assert split_mode in [
        "val",
        "test",
    ], "Invalid split-mode! It should be `val`, `test`"
    if not os.path.exists(fname):
        raise FileNotFoundError(f"File not found at {fname}")
    self.eval_set[split_mode] = load_pkl(fname)

query_batch(pos_src, pos_dst, pos_timestamp, split_mode='test')

For each positive edge in the pos_batch, return a list of negative edges split_mode specifies whether the valiation or test evaluation set should be retrieved.

Parameters:

Name Type Description Default
pos_src Tensor

list of positive source nodes

required
pos_dst Tensor

list of positive destination nodes

required
pos_timestamp Tensor

list of timestamps of the positive edges

required
split_mode str

specifies whether to generate negative edges for 'validation' or 'test' splits

'test'

Returns:

Name Type Description
neg_samples list

a list of list; each internal list contains the set of negative edges that should be evaluated against each positive edge.

Source code in tgb/linkproppred/negative_sampler.py
def query_batch(self, 
                pos_src: Tensor, 
                pos_dst: Tensor, 
                pos_timestamp: Tensor, 
                split_mode: str = "test") -> list:
    r"""
    For each positive edge in the `pos_batch`, return a list of negative edges
    `split_mode` specifies whether the valiation or test evaluation set should be retrieved.

    Parameters:
        pos_src: list of positive source nodes
        pos_dst: list of positive destination nodes
        pos_timestamp: list of timestamps of the positive edges
        split_mode: specifies whether to generate negative edges for 'validation' or 'test' splits

    Returns:
        neg_samples: a list of list; each internal list contains the set of negative edges that
                    should be evaluated against each positive edge.
    """
    assert split_mode in [
        "val",
        "test",
    ], "Invalid split-mode! It should be `val`, `test`!"
    if self.eval_set[split_mode] == None:
        raise ValueError(
            f"Evaluation set is None! You should load the {split_mode} evaluation set first!"
        )

    # check the argument types...
    if torch is not None and isinstance(pos_src, torch.Tensor):
        pos_src = pos_src.detach().cpu().numpy()
    if torch is not None and isinstance(pos_dst, torch.Tensor):
        pos_dst = pos_dst.detach().cpu().numpy()
    if torch is not None and isinstance(pos_timestamp, torch.Tensor):
        pos_timestamp = pos_timestamp.detach().cpu().numpy()

    if not isinstance(pos_src, np.ndarray) or not isinstance(pos_dst, np.ndarray) or not(pos_timestamp, np.ndarray):
        raise RuntimeError(
            "pos_src, pos_dst, and pos_timestamp need to be either numpy ndarray or torch tensor!"
            )

    neg_samples = []
    for pos_s, pos_d, pos_t in zip(pos_src, pos_dst, pos_timestamp):
        if (pos_s, pos_d, pos_t) not in self.eval_set[split_mode]:
            raise ValueError(
                f"The edge ({pos_s}, {pos_d}, {pos_t}) is not in the '{split_mode}' evaluation set! Please check the implementation."
            )
        else:
            neg_samples.append(
                [
                    int(neg_dst)
                    for neg_dst in self.eval_set[split_mode][(pos_s, pos_d, pos_t)]
                ]
            )

    return neg_samples

reset_eval_set(split_mode='test')

Reset evaluation set

Parameters:

Name Type Description Default
split_mode str

specifies whether to generate negative edges for 'validation' or 'test' splits

'test'

Returns:

Type Description
None

None

Source code in tgb/linkproppred/negative_sampler.py
def reset_eval_set(self, 
                   split_mode: str = "test",
                   ) -> None:
    r"""
    Reset evaluation set

    Parameters:
        split_mode: specifies whether to generate negative edges for 'validation' or 'test' splits

    Returns:
        None
    """
    assert split_mode in [
        "val",
        "test",
    ], "Invalid split-mode! It should be `val`, `test`!"
    self.eval_set[split_mode] = None

Sample and Generate negative edges that are going to be used for evaluation of a dynamic graph learning model Negative samples are generated and saved to files ONLY once; other times, they should be loaded from file with instances of the negative_sampler.py.

NegativeEdgeGenerator

Bases: object

Source code in tgb/linkproppred/negative_generator.py
class NegativeEdgeGenerator(object):
    def __init__(
        self,
        dataset_name: str,
        first_dst_id: int,
        last_dst_id: int,
        num_neg_e: int = 100,  # number of negative edges sampled per positive edges --> make it constant => 1000
        strategy: str = "rnd",
        rnd_seed: int = 123,
        hist_ratio: float = 0.5,
        historical_data: TemporalData = None,
    ) -> None:
        r"""
        Negative Edge Sampler class
        this is a class for generating negative samples for a specific datasets
        the set of the positive samples are provided, the negative samples are generated with specific strategies 
        and are saved for consistent evaluation across different methods
        negative edges are sampled with 'oen_vs_many' strategy.
        it is assumed that the destination nodes are indexed sequentially with 'first_dst_id' 
        and 'last_dst_id' being the first and last index, respectively.

        Parameters:
            dataset_name: name of the dataset
            first_dst_id: identity of the first destination node
            last_dst_id: indentity of the last destination node
            num_neg_e: number of negative edges being generated per each positive edge
            strategy: how to generate negative edges; can be 'rnd' or 'hist_rnd'
            rnd_seed: random seed for consistency
            hist_ratio: if the startegy is 'hist_rnd', how much of the negatives are historical
            historical_data: previous records of the positive edges

        Returns:
            None
        """
        self.rnd_seed = rnd_seed
        np.random.seed(self.rnd_seed)
        self.dataset_name = dataset_name

        self.first_dst_id = first_dst_id
        self.last_dst_id = last_dst_id
        self.num_neg_e = num_neg_e
        assert strategy in [
            "rnd",
            "hist_rnd",
        ], "The supported strategies are `rnd` or `hist_rnd`!"
        self.strategy = strategy
        if self.strategy == "hist_rnd":
            assert (
                historical_data != None
            ), "Train data should be passed when `hist_rnd` strategy is selected."
            self.hist_ratio = hist_ratio
            self.historical_data = historical_data

    def generate_negative_samples(self, 
                                  data: TemporalData, 
                                  split_mode: str, 
                                  partial_path: str,
                                  ) -> None:
        r"""
        Generate negative samples

        Parameters:
            data: an object containing positive edges information
            split_mode: specifies whether to generate negative edges for 'validation' or 'test' splits
            partial_path: in which directory save the generated negatives
        """
        # file name for saving or loading...
        filename = (
            partial_path
            + "/"
            + self.dataset_name
            + "_"
            + split_mode
            + "_"
            + "ns"
            + ".pkl"
        )

        if self.strategy == "rnd":
            self.generate_negative_samples_rnd(data, split_mode, filename)
        elif self.strategy == "hist_rnd":
            self.generate_negative_samples_hist_rnd(
                self.historical_data, data, split_mode, filename
            )
        else:
            raise ValueError("Unsupported negative sample generation strategy!")

    def generate_negative_samples_rnd(self, 
                                      data: TemporalData, 
                                      split_mode: str, 
                                      filename: str,
                                      ) -> None:
        r"""
        Generate negative samples based on the `HIST-RND` strategy:
            - for each positive edge, sample a batch of negative edges from all possible edges with the same source node
            - filter actual positive edges

        Parameters:
            data: an object containing positive edges information
            split_mode: specifies whether to generate negative edges for 'validation' or 'test' splits
            filename: name of the file containing the generated negative edges
        """
        print(
            f"INFO: Negative Sampling Strategy: {self.strategy}, Data Split: {split_mode}"
        )
        assert split_mode in [
            "val",
            "test",
        ], "Invalid split-mode! It should be `val` or `test`!"

        if os.path.exists(filename):
            print(
                f"INFO: negative samples for '{split_mode}' evaluation are already generated!"
            )
        else:
            print(f"INFO: Generating negative samples for '{split_mode}' evaluation!")
            # retrieve the information from the batch
            pos_src, pos_dst, pos_timestamp = (
                data.src.cpu().numpy(),
                data.dst.cpu().numpy(),
                data.t.cpu().numpy(),
            )

            # all possible destinations
            all_dst = np.arange(self.first_dst_id, self.last_dst_id + 1)

            evaluation_set = {}
            # generate a list of negative destinations for each positive edge
            pos_edge_tqdm = tqdm(
                zip(pos_src, pos_dst, pos_timestamp), total=len(pos_src)
            )
            for (
                pos_s,
                pos_d,
                pos_t,
            ) in pos_edge_tqdm:
                t_mask = pos_timestamp == pos_t
                src_mask = pos_src == pos_s
                fn_mask = np.logical_and(t_mask, src_mask)
                pos_e_dst_same_src = pos_dst[fn_mask]
                filtered_all_dst = np.setdiff1d(all_dst, pos_e_dst_same_src)

                '''
                when num_neg_e is larger than all possible destinations simple return all possible destinations
                '''
                if (self.num_neg_e > len(filtered_all_dst)):
                    neg_d_arr = filtered_all_dst
                else:
                    neg_d_arr = np.random.choice(
                    filtered_all_dst, self.num_neg_e, replace=False) #never replace negatives

                evaluation_set[(pos_s, pos_d, pos_t)] = neg_d_arr

            # save the generated evaluation set to disk
            save_pkl(evaluation_set, filename)

    def generate_historical_edge_set(self, 
                                     historical_data: TemporalData,
                                     ) -> tuple:
        r"""
        Generate the set of edges seen durign training or validation

        ONLY `train_data` should be passed as historical data; i.e., the HISTORICAL negative edges should be selected from training data only.

        Parameters:
            historical_data: contains the positive edges observed previously

        Returns:
            historical_edges: distict historical positive edges
            hist_edge_set_per_node: historical edges observed for each node
        """
        sources = historical_data.src.cpu().numpy()
        destinations = historical_data.dst.cpu().numpy()
        historical_edges = {}
        hist_e_per_node = {}
        for src, dst in zip(sources, destinations):
            # edge-centric
            if (src, dst) not in historical_edges:
                historical_edges[(src, dst)] = 1

            # node-centric
            if src not in hist_e_per_node:
                hist_e_per_node[src] = [dst]
            else:
                hist_e_per_node[src].append(dst)

        hist_edge_set_per_node = {}
        for src, dst_list in hist_e_per_node.items():
            hist_edge_set_per_node[src] = np.array(list(set(dst_list)))

        return historical_edges, hist_edge_set_per_node

    def generate_negative_samples_hist_rnd(
        self, 
        historical_data : TemporalData, 
        data: TemporalData, 
        split_mode: str, 
        filename: str,
    ) -> None:
        r"""
        Generate negative samples based on the `HIST-RND` strategy:
            - up to 50% of the negative samples are selected from the set of edges seen during the training with the same source node.
            - the rest of the negative edges are randomly sampled with the fixed source node.

        Parameters:
            historical_data: contains the history of the observed positive edges including 
                            distinct positive edges and edges observed for each positive node
            data: an object containing positive edges information
            split_mode: specifies whether to generate negative edges for 'validation' or 'test' splits
            filename: name of the file to save generated negative edges

        Returns:
            None
        """
        print(
            f"INFO: Negative Sampling Strategy: {self.strategy}, Data Split: {split_mode}"
        )
        assert split_mode in [
            "val",
            "test",
        ], "Invalid split-mode! It should be `val` or `test`!"

        if os.path.exists(filename):
            print(
                f"INFO: negative samples for '{split_mode}' evaluation are already generated!"
            )
        else:
            print(f"INFO: Generating negative samples for '{split_mode}' evaluation!")
            # retrieve the information from the batch
            pos_src, pos_dst, pos_timestamp = (
                data.src.cpu().numpy(),
                data.dst.cpu().numpy(),
                data.t.cpu().numpy(),
            )

            pos_ts_edge_dict = {} #{ts: {src: [dsts]}}
            pos_edge_tqdm = tqdm(
                zip(pos_src, pos_dst, pos_timestamp), total=len(pos_src)
            )
            for (
                pos_s,
                pos_d,
                pos_t,
            ) in pos_edge_tqdm:
                if (pos_t not in pos_ts_edge_dict):
                    pos_ts_edge_dict[pos_t] = {pos_s: [pos_d]}
                else:
                    if (pos_s not in pos_ts_edge_dict[pos_t]):
                        pos_ts_edge_dict[pos_t][pos_s] = [pos_d]
                    else:
                        pos_ts_edge_dict[pos_t][pos_s].append(pos_d)

            # all possible destinations
            all_dst = np.arange(self.first_dst_id, self.last_dst_id + 1)

            # get seen edge history
            (
                historical_edges,
                hist_edge_set_per_node,
            ) = self.generate_historical_edge_set(historical_data)

            # sample historical edges
            max_num_hist_neg_e = int(self.num_neg_e * self.hist_ratio)

            evaluation_set = {}
            # generate a list of negative destinations for each positive edge
            pos_edge_tqdm = tqdm(
                zip(pos_src, pos_dst, pos_timestamp), total=len(pos_src)
            )
            for (
                pos_s,
                pos_d,
                pos_t,
            ) in pos_edge_tqdm:
                pos_e_dst_same_src = np.array(pos_ts_edge_dict[pos_t][pos_s])

                # sample historical edges
                num_hist_neg_e = 0
                neg_hist_dsts = np.array([])
                seen_dst = []
                if pos_s in hist_edge_set_per_node:
                    seen_dst = hist_edge_set_per_node[pos_s]
                    if len(seen_dst) >= 1:
                        filtered_all_seen_dst = np.setdiff1d(seen_dst, pos_e_dst_same_src)
                        #filtered_all_seen_dst = seen_dst #! no collision check
                        num_hist_neg_e = (
                            max_num_hist_neg_e
                            if max_num_hist_neg_e <= len(filtered_all_seen_dst)
                            else len(filtered_all_seen_dst)
                        )
                        neg_hist_dsts = np.random.choice(
                            filtered_all_seen_dst, num_hist_neg_e, replace=False
                        )

                # sample random edges
                if (len(seen_dst) >= 1):
                    invalid_dst = np.concatenate((np.array(pos_e_dst_same_src), seen_dst))
                else:
                    invalid_dst = np.array(pos_e_dst_same_src)
                filtered_all_rnd_dst = np.setdiff1d(all_dst, invalid_dst)

                num_rnd_neg_e = self.num_neg_e - num_hist_neg_e
                '''
                when num_neg_e is larger than all possible destinations simple return all possible destinations
                '''
                if (num_rnd_neg_e > len(filtered_all_rnd_dst)):
                    neg_rnd_dsts = filtered_all_rnd_dst
                else:
                    neg_rnd_dsts = np.random.choice(
                    filtered_all_rnd_dst, num_rnd_neg_e, replace=False
                )
                # concatenate the two sets: historical and random
                neg_dst_arr = np.concatenate((neg_hist_dsts, neg_rnd_dsts))
                evaluation_set[(pos_s, pos_d, pos_t)] = neg_dst_arr

            # save the generated evaluation set to disk
            save_pkl(evaluation_set, filename)

__init__(dataset_name, first_dst_id, last_dst_id, num_neg_e=100, strategy='rnd', rnd_seed=123, hist_ratio=0.5, historical_data=None)

Negative Edge Sampler class this is a class for generating negative samples for a specific datasets the set of the positive samples are provided, the negative samples are generated with specific strategies and are saved for consistent evaluation across different methods negative edges are sampled with 'oen_vs_many' strategy. it is assumed that the destination nodes are indexed sequentially with 'first_dst_id' and 'last_dst_id' being the first and last index, respectively.

Parameters:

Name Type Description Default
dataset_name str

name of the dataset

required
first_dst_id int

identity of the first destination node

required
last_dst_id int

indentity of the last destination node

required
num_neg_e int

number of negative edges being generated per each positive edge

100
strategy str

how to generate negative edges; can be 'rnd' or 'hist_rnd'

'rnd'
rnd_seed int

random seed for consistency

123
hist_ratio float

if the startegy is 'hist_rnd', how much of the negatives are historical

0.5
historical_data TemporalData

previous records of the positive edges

None

Returns:

Type Description
None

None

Source code in tgb/linkproppred/negative_generator.py
def __init__(
    self,
    dataset_name: str,
    first_dst_id: int,
    last_dst_id: int,
    num_neg_e: int = 100,  # number of negative edges sampled per positive edges --> make it constant => 1000
    strategy: str = "rnd",
    rnd_seed: int = 123,
    hist_ratio: float = 0.5,
    historical_data: TemporalData = None,
) -> None:
    r"""
    Negative Edge Sampler class
    this is a class for generating negative samples for a specific datasets
    the set of the positive samples are provided, the negative samples are generated with specific strategies 
    and are saved for consistent evaluation across different methods
    negative edges are sampled with 'oen_vs_many' strategy.
    it is assumed that the destination nodes are indexed sequentially with 'first_dst_id' 
    and 'last_dst_id' being the first and last index, respectively.

    Parameters:
        dataset_name: name of the dataset
        first_dst_id: identity of the first destination node
        last_dst_id: indentity of the last destination node
        num_neg_e: number of negative edges being generated per each positive edge
        strategy: how to generate negative edges; can be 'rnd' or 'hist_rnd'
        rnd_seed: random seed for consistency
        hist_ratio: if the startegy is 'hist_rnd', how much of the negatives are historical
        historical_data: previous records of the positive edges

    Returns:
        None
    """
    self.rnd_seed = rnd_seed
    np.random.seed(self.rnd_seed)
    self.dataset_name = dataset_name

    self.first_dst_id = first_dst_id
    self.last_dst_id = last_dst_id
    self.num_neg_e = num_neg_e
    assert strategy in [
        "rnd",
        "hist_rnd",
    ], "The supported strategies are `rnd` or `hist_rnd`!"
    self.strategy = strategy
    if self.strategy == "hist_rnd":
        assert (
            historical_data != None
        ), "Train data should be passed when `hist_rnd` strategy is selected."
        self.hist_ratio = hist_ratio
        self.historical_data = historical_data

generate_historical_edge_set(historical_data)

Generate the set of edges seen durign training or validation

ONLY train_data should be passed as historical data; i.e., the HISTORICAL negative edges should be selected from training data only.

Parameters:

Name Type Description Default
historical_data TemporalData

contains the positive edges observed previously

required

Returns:

Name Type Description
historical_edges tuple

distict historical positive edges

hist_edge_set_per_node tuple

historical edges observed for each node

Source code in tgb/linkproppred/negative_generator.py
def generate_historical_edge_set(self, 
                                 historical_data: TemporalData,
                                 ) -> tuple:
    r"""
    Generate the set of edges seen durign training or validation

    ONLY `train_data` should be passed as historical data; i.e., the HISTORICAL negative edges should be selected from training data only.

    Parameters:
        historical_data: contains the positive edges observed previously

    Returns:
        historical_edges: distict historical positive edges
        hist_edge_set_per_node: historical edges observed for each node
    """
    sources = historical_data.src.cpu().numpy()
    destinations = historical_data.dst.cpu().numpy()
    historical_edges = {}
    hist_e_per_node = {}
    for src, dst in zip(sources, destinations):
        # edge-centric
        if (src, dst) not in historical_edges:
            historical_edges[(src, dst)] = 1

        # node-centric
        if src not in hist_e_per_node:
            hist_e_per_node[src] = [dst]
        else:
            hist_e_per_node[src].append(dst)

    hist_edge_set_per_node = {}
    for src, dst_list in hist_e_per_node.items():
        hist_edge_set_per_node[src] = np.array(list(set(dst_list)))

    return historical_edges, hist_edge_set_per_node

generate_negative_samples(data, split_mode, partial_path)

Generate negative samples

Parameters:

Name Type Description Default
data TemporalData

an object containing positive edges information

required
split_mode str

specifies whether to generate negative edges for 'validation' or 'test' splits

required
partial_path str

in which directory save the generated negatives

required
Source code in tgb/linkproppred/negative_generator.py
def generate_negative_samples(self, 
                              data: TemporalData, 
                              split_mode: str, 
                              partial_path: str,
                              ) -> None:
    r"""
    Generate negative samples

    Parameters:
        data: an object containing positive edges information
        split_mode: specifies whether to generate negative edges for 'validation' or 'test' splits
        partial_path: in which directory save the generated negatives
    """
    # file name for saving or loading...
    filename = (
        partial_path
        + "/"
        + self.dataset_name
        + "_"
        + split_mode
        + "_"
        + "ns"
        + ".pkl"
    )

    if self.strategy == "rnd":
        self.generate_negative_samples_rnd(data, split_mode, filename)
    elif self.strategy == "hist_rnd":
        self.generate_negative_samples_hist_rnd(
            self.historical_data, data, split_mode, filename
        )
    else:
        raise ValueError("Unsupported negative sample generation strategy!")

generate_negative_samples_hist_rnd(historical_data, data, split_mode, filename)

Generate negative samples based on the HIST-RND strategy: - up to 50% of the negative samples are selected from the set of edges seen during the training with the same source node. - the rest of the negative edges are randomly sampled with the fixed source node.

Parameters:

Name Type Description Default
historical_data TemporalData

contains the history of the observed positive edges including distinct positive edges and edges observed for each positive node

required
data TemporalData

an object containing positive edges information

required
split_mode str

specifies whether to generate negative edges for 'validation' or 'test' splits

required
filename str

name of the file to save generated negative edges

required

Returns:

Type Description
None

None

Source code in tgb/linkproppred/negative_generator.py
def generate_negative_samples_hist_rnd(
    self, 
    historical_data : TemporalData, 
    data: TemporalData, 
    split_mode: str, 
    filename: str,
) -> None:
    r"""
    Generate negative samples based on the `HIST-RND` strategy:
        - up to 50% of the negative samples are selected from the set of edges seen during the training with the same source node.
        - the rest of the negative edges are randomly sampled with the fixed source node.

    Parameters:
        historical_data: contains the history of the observed positive edges including 
                        distinct positive edges and edges observed for each positive node
        data: an object containing positive edges information
        split_mode: specifies whether to generate negative edges for 'validation' or 'test' splits
        filename: name of the file to save generated negative edges

    Returns:
        None
    """
    print(
        f"INFO: Negative Sampling Strategy: {self.strategy}, Data Split: {split_mode}"
    )
    assert split_mode in [
        "val",
        "test",
    ], "Invalid split-mode! It should be `val` or `test`!"

    if os.path.exists(filename):
        print(
            f"INFO: negative samples for '{split_mode}' evaluation are already generated!"
        )
    else:
        print(f"INFO: Generating negative samples for '{split_mode}' evaluation!")
        # retrieve the information from the batch
        pos_src, pos_dst, pos_timestamp = (
            data.src.cpu().numpy(),
            data.dst.cpu().numpy(),
            data.t.cpu().numpy(),
        )

        pos_ts_edge_dict = {} #{ts: {src: [dsts]}}
        pos_edge_tqdm = tqdm(
            zip(pos_src, pos_dst, pos_timestamp), total=len(pos_src)
        )
        for (
            pos_s,
            pos_d,
            pos_t,
        ) in pos_edge_tqdm:
            if (pos_t not in pos_ts_edge_dict):
                pos_ts_edge_dict[pos_t] = {pos_s: [pos_d]}
            else:
                if (pos_s not in pos_ts_edge_dict[pos_t]):
                    pos_ts_edge_dict[pos_t][pos_s] = [pos_d]
                else:
                    pos_ts_edge_dict[pos_t][pos_s].append(pos_d)

        # all possible destinations
        all_dst = np.arange(self.first_dst_id, self.last_dst_id + 1)

        # get seen edge history
        (
            historical_edges,
            hist_edge_set_per_node,
        ) = self.generate_historical_edge_set(historical_data)

        # sample historical edges
        max_num_hist_neg_e = int(self.num_neg_e * self.hist_ratio)

        evaluation_set = {}
        # generate a list of negative destinations for each positive edge
        pos_edge_tqdm = tqdm(
            zip(pos_src, pos_dst, pos_timestamp), total=len(pos_src)
        )
        for (
            pos_s,
            pos_d,
            pos_t,
        ) in pos_edge_tqdm:
            pos_e_dst_same_src = np.array(pos_ts_edge_dict[pos_t][pos_s])

            # sample historical edges
            num_hist_neg_e = 0
            neg_hist_dsts = np.array([])
            seen_dst = []
            if pos_s in hist_edge_set_per_node:
                seen_dst = hist_edge_set_per_node[pos_s]
                if len(seen_dst) >= 1:
                    filtered_all_seen_dst = np.setdiff1d(seen_dst, pos_e_dst_same_src)
                    #filtered_all_seen_dst = seen_dst #! no collision check
                    num_hist_neg_e = (
                        max_num_hist_neg_e
                        if max_num_hist_neg_e <= len(filtered_all_seen_dst)
                        else len(filtered_all_seen_dst)
                    )
                    neg_hist_dsts = np.random.choice(
                        filtered_all_seen_dst, num_hist_neg_e, replace=False
                    )

            # sample random edges
            if (len(seen_dst) >= 1):
                invalid_dst = np.concatenate((np.array(pos_e_dst_same_src), seen_dst))
            else:
                invalid_dst = np.array(pos_e_dst_same_src)
            filtered_all_rnd_dst = np.setdiff1d(all_dst, invalid_dst)

            num_rnd_neg_e = self.num_neg_e - num_hist_neg_e
            '''
            when num_neg_e is larger than all possible destinations simple return all possible destinations
            '''
            if (num_rnd_neg_e > len(filtered_all_rnd_dst)):
                neg_rnd_dsts = filtered_all_rnd_dst
            else:
                neg_rnd_dsts = np.random.choice(
                filtered_all_rnd_dst, num_rnd_neg_e, replace=False
            )
            # concatenate the two sets: historical and random
            neg_dst_arr = np.concatenate((neg_hist_dsts, neg_rnd_dsts))
            evaluation_set[(pos_s, pos_d, pos_t)] = neg_dst_arr

        # save the generated evaluation set to disk
        save_pkl(evaluation_set, filename)

generate_negative_samples_rnd(data, split_mode, filename)

Generate negative samples based on the HIST-RND strategy: - for each positive edge, sample a batch of negative edges from all possible edges with the same source node - filter actual positive edges

Parameters:

Name Type Description Default
data TemporalData

an object containing positive edges information

required
split_mode str

specifies whether to generate negative edges for 'validation' or 'test' splits

required
filename str

name of the file containing the generated negative edges

required
Source code in tgb/linkproppred/negative_generator.py
def generate_negative_samples_rnd(self, 
                                  data: TemporalData, 
                                  split_mode: str, 
                                  filename: str,
                                  ) -> None:
    r"""
    Generate negative samples based on the `HIST-RND` strategy:
        - for each positive edge, sample a batch of negative edges from all possible edges with the same source node
        - filter actual positive edges

    Parameters:
        data: an object containing positive edges information
        split_mode: specifies whether to generate negative edges for 'validation' or 'test' splits
        filename: name of the file containing the generated negative edges
    """
    print(
        f"INFO: Negative Sampling Strategy: {self.strategy}, Data Split: {split_mode}"
    )
    assert split_mode in [
        "val",
        "test",
    ], "Invalid split-mode! It should be `val` or `test`!"

    if os.path.exists(filename):
        print(
            f"INFO: negative samples for '{split_mode}' evaluation are already generated!"
        )
    else:
        print(f"INFO: Generating negative samples for '{split_mode}' evaluation!")
        # retrieve the information from the batch
        pos_src, pos_dst, pos_timestamp = (
            data.src.cpu().numpy(),
            data.dst.cpu().numpy(),
            data.t.cpu().numpy(),
        )

        # all possible destinations
        all_dst = np.arange(self.first_dst_id, self.last_dst_id + 1)

        evaluation_set = {}
        # generate a list of negative destinations for each positive edge
        pos_edge_tqdm = tqdm(
            zip(pos_src, pos_dst, pos_timestamp), total=len(pos_src)
        )
        for (
            pos_s,
            pos_d,
            pos_t,
        ) in pos_edge_tqdm:
            t_mask = pos_timestamp == pos_t
            src_mask = pos_src == pos_s
            fn_mask = np.logical_and(t_mask, src_mask)
            pos_e_dst_same_src = pos_dst[fn_mask]
            filtered_all_dst = np.setdiff1d(all_dst, pos_e_dst_same_src)

            '''
            when num_neg_e is larger than all possible destinations simple return all possible destinations
            '''
            if (self.num_neg_e > len(filtered_all_dst)):
                neg_d_arr = filtered_all_dst
            else:
                neg_d_arr = np.random.choice(
                filtered_all_dst, self.num_neg_e, replace=False) #never replace negatives

            evaluation_set[(pos_s, pos_d, pos_t)] = neg_d_arr

        # save the generated evaluation set to disk
        save_pkl(evaluation_set, filename)