Skip to content

tgb.linkproppred

LinkPropPredDataset

Bases: object

Source code in tgb/linkproppred/dataset.py
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
class LinkPropPredDataset(object):
    def __init__(
        self,
        name: str,
        root: Optional[str] = "datasets",
        meta_dict: Optional[dict] = None,
        preprocess: Optional[bool] = True,
    ):
        r"""Dataset class for link prediction dataset. Stores meta information about each dataset such as evaluation metrics etc.
        also automatically pre-processes the dataset.
        Args:
            name: name of the dataset
            root: root directory to store the dataset folder
            meta_dict: dictionary containing meta information about the dataset, should contain key 'dir_name' which is the name of the dataset folder
            preprocess: whether to pre-process the dataset
        """
        self.name = name  ## original name
        # check if dataset url exist
        if self.name in DATA_URL_DICT:
            self.url = DATA_URL_DICT[self.name]
        else:
            self.url = None
            print(f"Dataset {self.name} url not found, download not supported yet.")


        # check if the evaluatioin metric are specified
        if self.name in DATA_EVAL_METRIC_DICT:
            self.metric = DATA_EVAL_METRIC_DICT[self.name]
        else:
            self.metric = None
            print(
                f"Dataset {self.name} default evaluation metric not found, it is not supported yet."
            )


        root = PROJ_DIR + root

        if meta_dict is None:
            self.dir_name = "_".join(name.split("-"))  ## replace hyphen with underline
            meta_dict = {"dir_name": self.dir_name}
        else:
            self.dir_name = meta_dict["dir_name"]
        self.root = osp.join(root, self.dir_name)
        self.meta_dict = meta_dict
        if "fname" not in self.meta_dict:
            self.meta_dict["fname"] = self.root + "/" + self.name + "_edgelist.csv"
            self.meta_dict["nodefile"] = None

        if name == "tgbl-flight":
            self.meta_dict["nodefile"] = self.root + "/" + "airport_node_feat.csv"

        if name == "tkgl-wikidata" or name == "tkgl-smallpedia":
            self.meta_dict["staticfile"] = self.root + "/" + self.name + "_static_edgelist.csv"

        if "thg" in name:
            self.meta_dict["nodeTypeFile"] = self.root + "/" + self.name + "_nodetype.csv"
        else:
            self.meta_dict["nodeTypeFile"] = None

        self.meta_dict["val_ns"] = self.root + "/" + self.name + "_val_ns.pkl"
        self.meta_dict["test_ns"] = self.root + "/" + self.name + "_test_ns.pkl"

        #! version check
        self.version_passed = True
        self._version_check()

        # initialize
        self._node_feat = None
        self._edge_feat = None
        self._full_data = None
        self._train_data = None
        self._val_data = None
        self._test_data = None

        # for tkg and thg
        self._edge_type = None

        #tkgl-wikidata and tkgl-smallpedia only
        self._static_data = None

        # for thg only
        self._node_type = None
        self._node_id = None

        self.download()
        # check if the root directory exists, if not create it
        if osp.isdir(self.root):
            print("Dataset directory is ", self.root)
        else:
            # os.makedirs(self.root)
            raise FileNotFoundError(f"Directory not found at {self.root}")

        if preprocess:
            self.pre_process()

        self.min_dst_idx, self.max_dst_idx = int(self._full_data["destinations"].min()), int(self._full_data["destinations"].max())

        if ('tkg' in self.name):
            if self.name in DATA_NS_STRATEGY_DICT:
                self.ns_sampler = TKGNegativeEdgeSampler(
                    dataset_name=self.name,
                    first_dst_id=self.min_dst_idx,
                    last_dst_id=self.max_dst_idx,
                    strategy=DATA_NS_STRATEGY_DICT[self.name],
                    partial_path=self.root + "/" + self.name,
                )
            else:
                raise ValueError(f"Dataset {self.name} negative sampling strategy not found.")
        elif ('thg' in self.name):
            #* need to find the smallest node id of all nodes (regardless of types)

            min_node_idx = min(int(self._full_data["sources"].min()), int(self._full_data["destinations"].min()))
            max_node_idx = max(int(self._full_data["sources"].max()), int(self._full_data["destinations"].max()))
            self.ns_sampler = THGNegativeEdgeSampler(
                dataset_name=self.name,
                first_node_id=min_node_idx,
                last_node_id=max_node_idx,
                node_type=self._node_type,
            )
        else:
            self.ns_sampler = NegativeEdgeSampler(
                dataset_name=self.name,
                first_dst_id=self.min_dst_idx,
                last_dst_id=self.max_dst_idx,
            )


    def _version_check(self) -> None:
        r"""Implement Version checks for dataset files
        updates the file names based on the current version number
        prompt the user to download the new version via self.version_passed variable
        """
        if (self.name in DATA_VERSION_DICT):
            version = DATA_VERSION_DICT[self.name]
        else:
            print(f"Dataset {self.name} version number not found.")
            self.version_passed = False
            return None

        if (version > 1):
            #* check if current version is outdated
            self.meta_dict["fname"] = self.root + "/" + self.name + "_edgelist_v" + str(int(version)) + ".csv"
            self.meta_dict["nodefile"] = None
            if self.name == "tgbl-flight":
                self.meta_dict["nodefile"] = self.root + "/" + "airport_node_feat_v" + str(int(version)) + ".csv"
            self.meta_dict["val_ns"] = self.root + "/" + self.name + "_val_ns_v" + str(int(version)) + ".pkl"
            self.meta_dict["test_ns"] = self.root + "/" + self.name + "_test_ns_v" + str(int(version)) + ".pkl"

            if (not osp.exists(self.meta_dict["fname"])):
                print(f"Dataset {self.name} version {int(version)} not found.")
                print(f"Please download the latest version of the dataset.")
                self.version_passed = False
                return None


    def download(self):
        """
        downloads this dataset from url
        check if files are already downloaded
        """
        # check if the file already exists
        if osp.exists(self.meta_dict["fname"]):
            print("raw file found, skipping download")
            return

        inp = input(
            "Will you download the dataset(s) now? (y/N)\n"
        ).lower()  # ask if the user wants to download the dataset

        if inp == "y":
            print(
                f"{BColors.WARNING}Download started, this might take a while . . . {BColors.ENDC}"
            )
            print(f"Dataset title: {self.name}")

            if self.url is None:
                raise Exception("Dataset url not found, download not supported yet.")
            else:
                r = requests.get(self.url, stream=True)
                # download_dir = self.root + "/" + "download"
                if osp.isdir(self.root):
                    print("Dataset directory is ", self.root)
                else:
                    os.makedirs(self.root)

                path_download = self.root + "/" + self.name + ".zip"
                with open(path_download, "wb") as f:
                    total_length = int(r.headers.get("content-length"))
                    for chunk in progress.bar(
                        r.iter_content(chunk_size=1024),
                        expected_size=(total_length / 1024) + 1,
                    ):
                        if chunk:
                            f.write(chunk)
                            f.flush()
                # for unzipping the file
                with zipfile.ZipFile(path_download, "r") as zip_ref:
                    zip_ref.extractall(self.root)
                print(f"{BColors.OKGREEN}Download completed {BColors.ENDC}")
                self.version_passed = True
        else:
            raise Exception(
                BColors.FAIL + "Data not found error, download " + self.name + " failed"
            )

    def generate_processed_files(self) -> pd.DataFrame:
        r"""
        turns raw data .csv file into a pandas data frame, stored on disc if not already
        Returns:
            df: pandas data frame
        """
        node_feat = None
        if not osp.exists(self.meta_dict["fname"]):
            raise FileNotFoundError(f"File not found at {self.meta_dict['fname']}")

        if self.meta_dict["nodefile"] is not None:
            if not osp.exists(self.meta_dict["nodefile"]):
                raise FileNotFoundError(
                    f"File not found at {self.meta_dict['nodefile']}"
                )
        #* for thg must have nodetypes 
        if self.meta_dict["nodeTypeFile"] is not None:
            if not osp.exists(self.meta_dict["nodeTypeFile"]):
                raise FileNotFoundError(
                    f"File not found at {self.meta_dict['nodeTypeFile']}"
                )


        OUT_DF = self.root + "/" + "ml_{}.pkl".format(self.name)
        OUT_EDGE_FEAT = self.root + "/" + "ml_{}.pkl".format(self.name + "_edge")
        OUT_NODE_ID = self.root + "/" + "ml_{}.pkl".format(self.name + "_nodeid")
        if self.meta_dict["nodefile"] is not None:
            OUT_NODE_FEAT = self.root + "/" + "ml_{}.pkl".format(self.name + "_node")
        if self.meta_dict["nodeTypeFile"] is not None:
            OUT_NODE_TYPE = self.root + "/" + "ml_{}.pkl".format(self.name + "_nodeType")

        if (osp.exists(OUT_DF)) and (self.version_passed is True):
            print("loading processed file")
            df = pd.read_pickle(OUT_DF)
            edge_feat = load_pkl(OUT_EDGE_FEAT)
            if (self.name == "tkgl-wikidata") or (self.name == "tkgl-smallpedia"):
                node_id = load_pkl(OUT_NODE_ID)
                self._node_id = node_id
            if self.meta_dict["nodefile"] is not None:
                node_feat = load_pkl(OUT_NODE_FEAT)
            if self.meta_dict["nodeTypeFile"] is not None:
                node_type = load_pkl(OUT_NODE_TYPE)
                self._node_type = node_type

        else:
            print("file not processed, generating processed file")
            if self.name == "tgbl-flight":
                df, edge_feat, node_ids = csv_to_pd_data(self.meta_dict["fname"])
            elif self.name == "tgbl-coin":
                df, edge_feat, node_ids = csv_to_pd_data_sc(self.meta_dict["fname"])
            elif self.name == "tgbl-comment":
                df, edge_feat, node_ids = csv_to_pd_data_rc(self.meta_dict["fname"])
            elif self.name == "tgbl-review":
                df, edge_feat, node_ids = csv_to_pd_data_sc(self.meta_dict["fname"])
            elif self.name == "tgbl-wiki":
                df, edge_feat, node_ids = load_edgelist_wiki(self.meta_dict["fname"])
            elif self.name == "tgbl-subreddit":
                df, edge_feat, node_ids = load_edgelist_wiki(self.meta_dict["fname"])
            elif self.name == "tgbl-lastfm":
                df, edge_feat, node_ids = load_edgelist_wiki(self.meta_dict["fname"])
            elif self.name == "tkgl-polecat":
                df, edge_feat, node_ids = csv_to_tkg_data(self.meta_dict["fname"])
            elif self.name == "tkgl-icews":
                df, edge_feat, node_ids = csv_to_tkg_data(self.meta_dict["fname"])
            elif self.name == "tkgl-yago":
                df, edge_feat, node_ids = csv_to_tkg_data(self.meta_dict["fname"])
            elif self.name == "tkgl-wikidata":
                df, edge_feat, node_ids = csv_to_wikidata(self.meta_dict["fname"])
                save_pkl(node_ids, OUT_NODE_ID)
                self._node_id = node_ids
            elif self.name == "tkgl-smallpedia":
                df, edge_feat, node_ids = csv_to_wikidata(self.meta_dict["fname"])
                save_pkl(node_ids, OUT_NODE_ID)
                self._node_id = node_ids
            elif self.name == "thgl-myket":
                df, edge_feat, node_ids = csv_to_thg_data(self.meta_dict["fname"])
            elif self.name == "thgl-github":
                df, edge_feat, node_ids = csv_to_thg_data(self.meta_dict["fname"])
            elif self.name == "thgl-forum":
                df, edge_feat, node_ids = csv_to_forum_data(self.meta_dict["fname"])
            elif self.name == "thgl-software":
                df, edge_feat, node_ids = csv_to_thg_data(self.meta_dict["fname"])
            else:
                raise ValueError(f"Dataset {self.name} not found.")

            save_pkl(edge_feat, OUT_EDGE_FEAT)
            df.to_pickle(OUT_DF)
            if self.meta_dict["nodefile"] is not None:
                node_feat = process_node_feat(self.meta_dict["nodefile"], node_ids)
                save_pkl(node_feat, OUT_NODE_FEAT)
            if self.meta_dict["nodeTypeFile"] is not None:
                node_type = process_node_type(self.meta_dict["nodeTypeFile"], node_ids)
                save_pkl(node_type, OUT_NODE_TYPE)
                #? do not return node_type, simply set it
                self._node_type = node_type


        return df, edge_feat, node_feat

    def pre_process(self):
        """
        Pre-process the dataset and generates the splits, must be run before dataset properties can be accessed
        generates the edge data and different train, val, test splits
        """

        # check if path to file is valid
        df, edge_feat, node_feat = self.generate_processed_files()

        #* design choice, only stores the original edges not the inverse relations on disc
        if ("tkgl" in self.name):
            df = add_inverse_quadruples(df)

        sources = np.array(df["u"])
        destinations = np.array(df["i"])
        timestamps = np.array(df["ts"])
        edge_idxs = np.array(df["idx"])
        weights = np.array(df["w"])
        edge_label = np.ones(len(df))  # should be 1 for all pos edges
        self._edge_feat = edge_feat
        self._node_feat = node_feat

        full_data = {
            "sources": sources.astype(int),
            "destinations": destinations.astype(int),
            "timestamps": timestamps.astype(int),
            "edge_idxs": edge_idxs,
            "edge_feat": edge_feat,
            "w": weights,
            "edge_label": edge_label,
        }

        #* for tkg and thg
        if ("edge_type" in df):
            edge_type = np.array(df["edge_type"]).astype(int)
            self._edge_type = edge_type
            full_data["edge_type"] = edge_type

        self._full_data = full_data

        if ("yago" in self.name):
            _train_mask, _val_mask, _test_mask = self.generate_splits(full_data, val_ratio=0.1, test_ratio=0.10) #99) #val_ratio=0.097, test_ratio=0.099)
        else:
            _train_mask, _val_mask, _test_mask = self.generate_splits(full_data, val_ratio=0.15, test_ratio=0.15)
        self._train_mask = _train_mask
        self._val_mask = _val_mask
        self._test_mask = _test_mask

    def generate_splits(
        self,
        full_data: Dict[str, Any],
        val_ratio: float = 0.15,
        test_ratio: float = 0.15,
    ) -> Tuple[Dict[str, Any], Dict[str, Any], Dict[str, Any]]:
        r"""Generates train, validation, and test splits from the full dataset
        Args:
            full_data: dictionary containing the full dataset
            val_ratio: ratio of validation data
            test_ratio: ratio of test data
        Returns:
            train_data: dictionary containing the training dataset
            val_data: dictionary containing the validation dataset
            test_data: dictionary containing the test dataset
        """
        val_time, test_time = list(
            np.quantile(
                full_data["timestamps"],
                [(1 - val_ratio - test_ratio), (1 - test_ratio)],
            )
        )
        timestamps = full_data["timestamps"]

        train_mask = timestamps <= val_time
        val_mask = np.logical_and(timestamps <= test_time, timestamps > val_time)
        test_mask = timestamps > test_time

        return train_mask, val_mask, test_mask

    def preprocess_static_edges(self):
        """
        Pre-process the static edges of the dataset
        """
        if ("staticfile" in self.meta_dict):
            OUT_DF = self.root + "/" + "ml_{}.pkl".format(self.name + "_static")
            if (osp.exists(OUT_DF)) and (self.version_passed is True):
                print("loading processed file")
                static_dict = load_pkl(OUT_DF)
                self._static_data = static_dict
            else:
                print("file not processed, generating processed file")
                static_dict, node_ids =  csv_to_staticdata(self.meta_dict["staticfile"], self._node_id)
                save_pkl(static_dict, OUT_DF)
                self._static_data = static_dict
        else:
            print ("static edges are only for tkgl-wikidata and tkgl-smallpedia datasets")


    @property
    def eval_metric(self) -> str:
        """
        the official evaluation metric for the dataset, loaded from info.py
        Returns:
            eval_metric: str, the evaluation metric
        """
        return self.metric

    @property
    def negative_sampler(self) -> NegativeEdgeSampler:
        r"""
        Returns the negative sampler of the dataset, will load negative samples from disc
        Returns:
            negative_sampler: NegativeEdgeSampler
        """
        return self.ns_sampler


    def load_val_ns(self) -> None:
        r"""
        load the negative samples for the validation set
        """
        self.ns_sampler.load_eval_set(
            fname=self.meta_dict["val_ns"], split_mode="val"
        )

    def load_test_ns(self) -> None:
        r"""
        load the negative samples for the test set
        """
        self.ns_sampler.load_eval_set(
            fname=self.meta_dict["test_ns"], split_mode="test"
        )

    @property
    def num_nodes(self) -> int:
        r"""
        Returns the total number of unique nodes in the dataset 
        Returns:
            num_nodes: int, the number of unique nodes
        """
        src = self._full_data["sources"]
        dst = self._full_data["destinations"]
        all_nodes = np.concatenate((src, dst), axis=0)
        uniq_nodes = np.unique(all_nodes, axis=0)
        return uniq_nodes.shape[0]


    @property
    def num_edges(self) -> int:
        r"""
        Returns the total number of edges in the dataset
        Returns:
            num_edges: int, the number of edges
        """
        src = self._full_data["sources"]
        return src.shape[0]


    @property
    def num_rels(self) -> int:
        r"""
        Returns the number of relation types in the dataset
        Returns:
            num_rels: int, the number of relation types
        """
        #* if it is a homogenous graph
        if ("edge_type" not in self._full_data):
            return 1
        else:
            return np.unique(self._full_data["edge_type"]).shape[0]

    @property
    def node_feat(self) -> Optional[np.ndarray]:
        r"""
        Returns the node features of the dataset with dim [N, feat_dim]
        Returns:
            node_feat: np.ndarray, [N, feat_dim] or None if there is no node feature
        """
        return self._node_feat

    @property
    def node_type(self) -> Optional[np.ndarray]:
        r"""
        Returns the node types of the dataset with dim [N], only for temporal heterogeneous graphs
        Returns:
            node_feat: np.ndarray, [N] or None if there is no node feature
        """
        return self._node_type

    @property
    def edge_feat(self) -> Optional[np.ndarray]:
        r"""
        Returns the edge features of the dataset with dim [E, feat_dim]
        Returns:
            edge_feat: np.ndarray, [E, feat_dim] or None if there is no edge feature
        """
        return self._edge_feat

    @property
    def edge_type(self) -> Optional[np.ndarray]:
        r"""
        Returns the edge types of the dataset with dim [E, 1], only for temporal knowledge graph and temporal heterogeneous graph
        Returns:
            edge_type: np.ndarray, [E, 1] or None if it is not a TKG or THG
        """
        return self._edge_type

    @property
    def static_data(self) -> Optional[np.ndarray]:
        r"""
        Returns the static edges related to this dataset, applies for tkgl-wikidata and tkgl-smallpedia, edges are (src, dst, rel_type)
        Returns:
            df: pd.DataFrame {"head": np.ndarray, "tail": np.ndarray, "rel_type": np.ndarray}
        """
        if (self.name == "tkgl-wikidata") or (self.name == "tkgl-smallpedia"):
            self.preprocess_static_edges()
        return self._static_data

    @property
    def full_data(self) -> Dict[str, Any]:
        r"""
        the full data of the dataset as a dictionary with keys: 'sources', 'destinations', 'timestamps', 'edge_idxs', 'edge_feat', 'w', 'edge_label',

        Returns:
            full_data: Dict[str, Any]
        """
        if self._full_data is None:
            raise ValueError(
                "dataset has not been processed yet, please call pre_process() first"
            )
        return self._full_data

    @property
    def train_mask(self) -> np.ndarray:
        r"""
        Returns the train mask of the dataset
        Returns:
            train_mask: training masks
        """
        if self._train_mask is None:
            raise ValueError("training split hasn't been loaded")
        return self._train_mask

    @property
    def val_mask(self) -> np.ndarray:
        r"""
        Returns the validation mask of the dataset
        Returns:
            val_mask: Dict[str, Any]
        """
        if self._val_mask is None:
            raise ValueError("validation split hasn't been loaded")
        return self._val_mask

    @property
    def test_mask(self) -> np.ndarray:
        r"""
        Returns the test mask of the dataset:
        Returns:
            test_mask: Dict[str, Any]
        """
        if self._test_mask is None:
            raise ValueError("test split hasn't been loaded")
        return self._test_mask

edge_feat: Optional[np.ndarray] property

Returns the edge features of the dataset with dim [E, feat_dim] Returns: edge_feat: np.ndarray, [E, feat_dim] or None if there is no edge feature

edge_type: Optional[np.ndarray] property

Returns the edge types of the dataset with dim [E, 1], only for temporal knowledge graph and temporal heterogeneous graph Returns: edge_type: np.ndarray, [E, 1] or None if it is not a TKG or THG

eval_metric: str property

the official evaluation metric for the dataset, loaded from info.py Returns: eval_metric: str, the evaluation metric

full_data: Dict[str, Any] property

the full data of the dataset as a dictionary with keys: 'sources', 'destinations', 'timestamps', 'edge_idxs', 'edge_feat', 'w', 'edge_label',

Returns:

Name Type Description
full_data Dict[str, Any]

Dict[str, Any]

negative_sampler: NegativeEdgeSampler property

Returns the negative sampler of the dataset, will load negative samples from disc Returns: negative_sampler: NegativeEdgeSampler

node_feat: Optional[np.ndarray] property

Returns the node features of the dataset with dim [N, feat_dim] Returns: node_feat: np.ndarray, [N, feat_dim] or None if there is no node feature

node_type: Optional[np.ndarray] property

Returns the node types of the dataset with dim [N], only for temporal heterogeneous graphs Returns: node_feat: np.ndarray, [N] or None if there is no node feature

num_edges: int property

Returns the total number of edges in the dataset Returns: num_edges: int, the number of edges

num_nodes: int property

Returns the total number of unique nodes in the dataset Returns: num_nodes: int, the number of unique nodes

num_rels: int property

Returns the number of relation types in the dataset Returns: num_rels: int, the number of relation types

static_data: Optional[np.ndarray] property

Returns the static edges related to this dataset, applies for tkgl-wikidata and tkgl-smallpedia, edges are (src, dst, rel_type) Returns: df: pd.DataFrame {"head": np.ndarray, "tail": np.ndarray, "rel_type": np.ndarray}

test_mask: np.ndarray property

Returns the test mask of the dataset: Returns: test_mask: Dict[str, Any]

train_mask: np.ndarray property

Returns the train mask of the dataset Returns: train_mask: training masks

val_mask: np.ndarray property

Returns the validation mask of the dataset Returns: val_mask: Dict[str, Any]

__init__(name, root='datasets', meta_dict=None, preprocess=True)

Dataset class for link prediction dataset. Stores meta information about each dataset such as evaluation metrics etc. also automatically pre-processes the dataset. Args: name: name of the dataset root: root directory to store the dataset folder meta_dict: dictionary containing meta information about the dataset, should contain key 'dir_name' which is the name of the dataset folder preprocess: whether to pre-process the dataset

Source code in tgb/linkproppred/dataset.py
def __init__(
    self,
    name: str,
    root: Optional[str] = "datasets",
    meta_dict: Optional[dict] = None,
    preprocess: Optional[bool] = True,
):
    r"""Dataset class for link prediction dataset. Stores meta information about each dataset such as evaluation metrics etc.
    also automatically pre-processes the dataset.
    Args:
        name: name of the dataset
        root: root directory to store the dataset folder
        meta_dict: dictionary containing meta information about the dataset, should contain key 'dir_name' which is the name of the dataset folder
        preprocess: whether to pre-process the dataset
    """
    self.name = name  ## original name
    # check if dataset url exist
    if self.name in DATA_URL_DICT:
        self.url = DATA_URL_DICT[self.name]
    else:
        self.url = None
        print(f"Dataset {self.name} url not found, download not supported yet.")


    # check if the evaluatioin metric are specified
    if self.name in DATA_EVAL_METRIC_DICT:
        self.metric = DATA_EVAL_METRIC_DICT[self.name]
    else:
        self.metric = None
        print(
            f"Dataset {self.name} default evaluation metric not found, it is not supported yet."
        )


    root = PROJ_DIR + root

    if meta_dict is None:
        self.dir_name = "_".join(name.split("-"))  ## replace hyphen with underline
        meta_dict = {"dir_name": self.dir_name}
    else:
        self.dir_name = meta_dict["dir_name"]
    self.root = osp.join(root, self.dir_name)
    self.meta_dict = meta_dict
    if "fname" not in self.meta_dict:
        self.meta_dict["fname"] = self.root + "/" + self.name + "_edgelist.csv"
        self.meta_dict["nodefile"] = None

    if name == "tgbl-flight":
        self.meta_dict["nodefile"] = self.root + "/" + "airport_node_feat.csv"

    if name == "tkgl-wikidata" or name == "tkgl-smallpedia":
        self.meta_dict["staticfile"] = self.root + "/" + self.name + "_static_edgelist.csv"

    if "thg" in name:
        self.meta_dict["nodeTypeFile"] = self.root + "/" + self.name + "_nodetype.csv"
    else:
        self.meta_dict["nodeTypeFile"] = None

    self.meta_dict["val_ns"] = self.root + "/" + self.name + "_val_ns.pkl"
    self.meta_dict["test_ns"] = self.root + "/" + self.name + "_test_ns.pkl"

    #! version check
    self.version_passed = True
    self._version_check()

    # initialize
    self._node_feat = None
    self._edge_feat = None
    self._full_data = None
    self._train_data = None
    self._val_data = None
    self._test_data = None

    # for tkg and thg
    self._edge_type = None

    #tkgl-wikidata and tkgl-smallpedia only
    self._static_data = None

    # for thg only
    self._node_type = None
    self._node_id = None

    self.download()
    # check if the root directory exists, if not create it
    if osp.isdir(self.root):
        print("Dataset directory is ", self.root)
    else:
        # os.makedirs(self.root)
        raise FileNotFoundError(f"Directory not found at {self.root}")

    if preprocess:
        self.pre_process()

    self.min_dst_idx, self.max_dst_idx = int(self._full_data["destinations"].min()), int(self._full_data["destinations"].max())

    if ('tkg' in self.name):
        if self.name in DATA_NS_STRATEGY_DICT:
            self.ns_sampler = TKGNegativeEdgeSampler(
                dataset_name=self.name,
                first_dst_id=self.min_dst_idx,
                last_dst_id=self.max_dst_idx,
                strategy=DATA_NS_STRATEGY_DICT[self.name],
                partial_path=self.root + "/" + self.name,
            )
        else:
            raise ValueError(f"Dataset {self.name} negative sampling strategy not found.")
    elif ('thg' in self.name):
        #* need to find the smallest node id of all nodes (regardless of types)

        min_node_idx = min(int(self._full_data["sources"].min()), int(self._full_data["destinations"].min()))
        max_node_idx = max(int(self._full_data["sources"].max()), int(self._full_data["destinations"].max()))
        self.ns_sampler = THGNegativeEdgeSampler(
            dataset_name=self.name,
            first_node_id=min_node_idx,
            last_node_id=max_node_idx,
            node_type=self._node_type,
        )
    else:
        self.ns_sampler = NegativeEdgeSampler(
            dataset_name=self.name,
            first_dst_id=self.min_dst_idx,
            last_dst_id=self.max_dst_idx,
        )

download()

downloads this dataset from url check if files are already downloaded

Source code in tgb/linkproppred/dataset.py
def download(self):
    """
    downloads this dataset from url
    check if files are already downloaded
    """
    # check if the file already exists
    if osp.exists(self.meta_dict["fname"]):
        print("raw file found, skipping download")
        return

    inp = input(
        "Will you download the dataset(s) now? (y/N)\n"
    ).lower()  # ask if the user wants to download the dataset

    if inp == "y":
        print(
            f"{BColors.WARNING}Download started, this might take a while . . . {BColors.ENDC}"
        )
        print(f"Dataset title: {self.name}")

        if self.url is None:
            raise Exception("Dataset url not found, download not supported yet.")
        else:
            r = requests.get(self.url, stream=True)
            # download_dir = self.root + "/" + "download"
            if osp.isdir(self.root):
                print("Dataset directory is ", self.root)
            else:
                os.makedirs(self.root)

            path_download = self.root + "/" + self.name + ".zip"
            with open(path_download, "wb") as f:
                total_length = int(r.headers.get("content-length"))
                for chunk in progress.bar(
                    r.iter_content(chunk_size=1024),
                    expected_size=(total_length / 1024) + 1,
                ):
                    if chunk:
                        f.write(chunk)
                        f.flush()
            # for unzipping the file
            with zipfile.ZipFile(path_download, "r") as zip_ref:
                zip_ref.extractall(self.root)
            print(f"{BColors.OKGREEN}Download completed {BColors.ENDC}")
            self.version_passed = True
    else:
        raise Exception(
            BColors.FAIL + "Data not found error, download " + self.name + " failed"
        )

generate_processed_files()

turns raw data .csv file into a pandas data frame, stored on disc if not already Returns: df: pandas data frame

Source code in tgb/linkproppred/dataset.py
def generate_processed_files(self) -> pd.DataFrame:
    r"""
    turns raw data .csv file into a pandas data frame, stored on disc if not already
    Returns:
        df: pandas data frame
    """
    node_feat = None
    if not osp.exists(self.meta_dict["fname"]):
        raise FileNotFoundError(f"File not found at {self.meta_dict['fname']}")

    if self.meta_dict["nodefile"] is not None:
        if not osp.exists(self.meta_dict["nodefile"]):
            raise FileNotFoundError(
                f"File not found at {self.meta_dict['nodefile']}"
            )
    #* for thg must have nodetypes 
    if self.meta_dict["nodeTypeFile"] is not None:
        if not osp.exists(self.meta_dict["nodeTypeFile"]):
            raise FileNotFoundError(
                f"File not found at {self.meta_dict['nodeTypeFile']}"
            )


    OUT_DF = self.root + "/" + "ml_{}.pkl".format(self.name)
    OUT_EDGE_FEAT = self.root + "/" + "ml_{}.pkl".format(self.name + "_edge")
    OUT_NODE_ID = self.root + "/" + "ml_{}.pkl".format(self.name + "_nodeid")
    if self.meta_dict["nodefile"] is not None:
        OUT_NODE_FEAT = self.root + "/" + "ml_{}.pkl".format(self.name + "_node")
    if self.meta_dict["nodeTypeFile"] is not None:
        OUT_NODE_TYPE = self.root + "/" + "ml_{}.pkl".format(self.name + "_nodeType")

    if (osp.exists(OUT_DF)) and (self.version_passed is True):
        print("loading processed file")
        df = pd.read_pickle(OUT_DF)
        edge_feat = load_pkl(OUT_EDGE_FEAT)
        if (self.name == "tkgl-wikidata") or (self.name == "tkgl-smallpedia"):
            node_id = load_pkl(OUT_NODE_ID)
            self._node_id = node_id
        if self.meta_dict["nodefile"] is not None:
            node_feat = load_pkl(OUT_NODE_FEAT)
        if self.meta_dict["nodeTypeFile"] is not None:
            node_type = load_pkl(OUT_NODE_TYPE)
            self._node_type = node_type

    else:
        print("file not processed, generating processed file")
        if self.name == "tgbl-flight":
            df, edge_feat, node_ids = csv_to_pd_data(self.meta_dict["fname"])
        elif self.name == "tgbl-coin":
            df, edge_feat, node_ids = csv_to_pd_data_sc(self.meta_dict["fname"])
        elif self.name == "tgbl-comment":
            df, edge_feat, node_ids = csv_to_pd_data_rc(self.meta_dict["fname"])
        elif self.name == "tgbl-review":
            df, edge_feat, node_ids = csv_to_pd_data_sc(self.meta_dict["fname"])
        elif self.name == "tgbl-wiki":
            df, edge_feat, node_ids = load_edgelist_wiki(self.meta_dict["fname"])
        elif self.name == "tgbl-subreddit":
            df, edge_feat, node_ids = load_edgelist_wiki(self.meta_dict["fname"])
        elif self.name == "tgbl-lastfm":
            df, edge_feat, node_ids = load_edgelist_wiki(self.meta_dict["fname"])
        elif self.name == "tkgl-polecat":
            df, edge_feat, node_ids = csv_to_tkg_data(self.meta_dict["fname"])
        elif self.name == "tkgl-icews":
            df, edge_feat, node_ids = csv_to_tkg_data(self.meta_dict["fname"])
        elif self.name == "tkgl-yago":
            df, edge_feat, node_ids = csv_to_tkg_data(self.meta_dict["fname"])
        elif self.name == "tkgl-wikidata":
            df, edge_feat, node_ids = csv_to_wikidata(self.meta_dict["fname"])
            save_pkl(node_ids, OUT_NODE_ID)
            self._node_id = node_ids
        elif self.name == "tkgl-smallpedia":
            df, edge_feat, node_ids = csv_to_wikidata(self.meta_dict["fname"])
            save_pkl(node_ids, OUT_NODE_ID)
            self._node_id = node_ids
        elif self.name == "thgl-myket":
            df, edge_feat, node_ids = csv_to_thg_data(self.meta_dict["fname"])
        elif self.name == "thgl-github":
            df, edge_feat, node_ids = csv_to_thg_data(self.meta_dict["fname"])
        elif self.name == "thgl-forum":
            df, edge_feat, node_ids = csv_to_forum_data(self.meta_dict["fname"])
        elif self.name == "thgl-software":
            df, edge_feat, node_ids = csv_to_thg_data(self.meta_dict["fname"])
        else:
            raise ValueError(f"Dataset {self.name} not found.")

        save_pkl(edge_feat, OUT_EDGE_FEAT)
        df.to_pickle(OUT_DF)
        if self.meta_dict["nodefile"] is not None:
            node_feat = process_node_feat(self.meta_dict["nodefile"], node_ids)
            save_pkl(node_feat, OUT_NODE_FEAT)
        if self.meta_dict["nodeTypeFile"] is not None:
            node_type = process_node_type(self.meta_dict["nodeTypeFile"], node_ids)
            save_pkl(node_type, OUT_NODE_TYPE)
            #? do not return node_type, simply set it
            self._node_type = node_type


    return df, edge_feat, node_feat

generate_splits(full_data, val_ratio=0.15, test_ratio=0.15)

Generates train, validation, and test splits from the full dataset Args: full_data: dictionary containing the full dataset val_ratio: ratio of validation data test_ratio: ratio of test data Returns: train_data: dictionary containing the training dataset val_data: dictionary containing the validation dataset test_data: dictionary containing the test dataset

Source code in tgb/linkproppred/dataset.py
def generate_splits(
    self,
    full_data: Dict[str, Any],
    val_ratio: float = 0.15,
    test_ratio: float = 0.15,
) -> Tuple[Dict[str, Any], Dict[str, Any], Dict[str, Any]]:
    r"""Generates train, validation, and test splits from the full dataset
    Args:
        full_data: dictionary containing the full dataset
        val_ratio: ratio of validation data
        test_ratio: ratio of test data
    Returns:
        train_data: dictionary containing the training dataset
        val_data: dictionary containing the validation dataset
        test_data: dictionary containing the test dataset
    """
    val_time, test_time = list(
        np.quantile(
            full_data["timestamps"],
            [(1 - val_ratio - test_ratio), (1 - test_ratio)],
        )
    )
    timestamps = full_data["timestamps"]

    train_mask = timestamps <= val_time
    val_mask = np.logical_and(timestamps <= test_time, timestamps > val_time)
    test_mask = timestamps > test_time

    return train_mask, val_mask, test_mask

load_test_ns()

load the negative samples for the test set

Source code in tgb/linkproppred/dataset.py
def load_test_ns(self) -> None:
    r"""
    load the negative samples for the test set
    """
    self.ns_sampler.load_eval_set(
        fname=self.meta_dict["test_ns"], split_mode="test"
    )

load_val_ns()

load the negative samples for the validation set

Source code in tgb/linkproppred/dataset.py
def load_val_ns(self) -> None:
    r"""
    load the negative samples for the validation set
    """
    self.ns_sampler.load_eval_set(
        fname=self.meta_dict["val_ns"], split_mode="val"
    )

pre_process()

Pre-process the dataset and generates the splits, must be run before dataset properties can be accessed generates the edge data and different train, val, test splits

Source code in tgb/linkproppred/dataset.py
def pre_process(self):
    """
    Pre-process the dataset and generates the splits, must be run before dataset properties can be accessed
    generates the edge data and different train, val, test splits
    """

    # check if path to file is valid
    df, edge_feat, node_feat = self.generate_processed_files()

    #* design choice, only stores the original edges not the inverse relations on disc
    if ("tkgl" in self.name):
        df = add_inverse_quadruples(df)

    sources = np.array(df["u"])
    destinations = np.array(df["i"])
    timestamps = np.array(df["ts"])
    edge_idxs = np.array(df["idx"])
    weights = np.array(df["w"])
    edge_label = np.ones(len(df))  # should be 1 for all pos edges
    self._edge_feat = edge_feat
    self._node_feat = node_feat

    full_data = {
        "sources": sources.astype(int),
        "destinations": destinations.astype(int),
        "timestamps": timestamps.astype(int),
        "edge_idxs": edge_idxs,
        "edge_feat": edge_feat,
        "w": weights,
        "edge_label": edge_label,
    }

    #* for tkg and thg
    if ("edge_type" in df):
        edge_type = np.array(df["edge_type"]).astype(int)
        self._edge_type = edge_type
        full_data["edge_type"] = edge_type

    self._full_data = full_data

    if ("yago" in self.name):
        _train_mask, _val_mask, _test_mask = self.generate_splits(full_data, val_ratio=0.1, test_ratio=0.10) #99) #val_ratio=0.097, test_ratio=0.099)
    else:
        _train_mask, _val_mask, _test_mask = self.generate_splits(full_data, val_ratio=0.15, test_ratio=0.15)
    self._train_mask = _train_mask
    self._val_mask = _val_mask
    self._test_mask = _test_mask

preprocess_static_edges()

Pre-process the static edges of the dataset

Source code in tgb/linkproppred/dataset.py
def preprocess_static_edges(self):
    """
    Pre-process the static edges of the dataset
    """
    if ("staticfile" in self.meta_dict):
        OUT_DF = self.root + "/" + "ml_{}.pkl".format(self.name + "_static")
        if (osp.exists(OUT_DF)) and (self.version_passed is True):
            print("loading processed file")
            static_dict = load_pkl(OUT_DF)
            self._static_data = static_dict
        else:
            print("file not processed, generating processed file")
            static_dict, node_ids =  csv_to_staticdata(self.meta_dict["staticfile"], self._node_id)
            save_pkl(static_dict, OUT_DF)
            self._static_data = static_dict
    else:
        print ("static edges are only for tkgl-wikidata and tkgl-smallpedia datasets")

PyGLinkPropPredDataset

Bases: Dataset

Source code in tgb/linkproppred/dataset_pyg.py
class PyGLinkPropPredDataset(Dataset):
    def __init__(
        self,
        name: str,
        root: str,
        transform: Optional[Callable] = None,
        pre_transform: Optional[Callable] = None,
    ):
        r"""
        PyG wrapper for the LinkPropPredDataset
        can return pytorch tensors for src,dst,t,msg,label
        can return Temporal Data object
        Parameters:
            name: name of the dataset, passed to `LinkPropPredDataset`
            root (string): Root directory where the dataset should be saved, passed to `LinkPropPredDataset`
            transform (callable, optional): A function/transform that takes in an, not used in this case
            pre_transform (callable, optional): A function/transform that takes in, not used in this case
        """
        self.name = name
        self.root = root
        self.dataset = LinkPropPredDataset(name=name, root=root)
        self._train_mask = torch.from_numpy(self.dataset.train_mask)
        self._val_mask = torch.from_numpy(self.dataset.val_mask)
        self._test_mask = torch.from_numpy(self.dataset.test_mask)
        super().__init__(root, transform, pre_transform)
        self._node_feat = self.dataset.node_feat
        self._edge_type = None
        self._static_data = None

        if self._node_feat is None:
            self._node_feat = None
        else:
            self._node_feat = torch.from_numpy(self._node_feat).float()

        self._node_type = self.dataset.node_type
        if self.node_type is not None:
            self._node_type = torch.from_numpy(self.dataset.node_type).long()

        self.process_data()

        self._ns_sampler = self.dataset.negative_sampler

    @property
    def eval_metric(self) -> str:
        """
        the official evaluation metric for the dataset, loaded from info.py
        Returns:
            eval_metric: str, the evaluation metric
        """
        return self.dataset.eval_metric

    @property
    def negative_sampler(self) -> NegativeEdgeSampler:
        r"""
        Returns the negative sampler of the dataset, will load negative samples from disc
        Returns:
            negative_sampler: NegativeEdgeSampler
        """
        return self._ns_sampler

    @property
    def num_nodes(self) -> int:
        r"""
        Returns the total number of unique nodes in the dataset 
        Returns:
            num_nodes: int, the number of unique nodes
        """
        return self.dataset.num_nodes

    @property
    def num_rels(self) -> int:
        r"""
        Returns the total number of unique relations in the dataset 
        Returns:
            num_rels: int, the number of unique relations
        """
        return self.dataset.num_rels

    @property
    def num_edges(self) -> int:
        r"""
        Returns the total number of edges in the dataset 
        Returns:
            num_edges: int, the number of edges
        """
        return self.dataset.num_edges

    def load_val_ns(self) -> None:
        r"""
        load the negative samples for the validation set
        """
        self.dataset.load_val_ns()

    def load_test_ns(self) -> None:
        r"""
        load the negative samples for the test set
        """
        self.dataset.load_test_ns()

    @property
    def train_mask(self) -> torch.Tensor:
        r"""
        Returns the train mask of the dataset
        Returns:
            train_mask: the mask for edges in the training set
        """
        if self._train_mask is None:
            raise ValueError("training split hasn't been loaded")
        return self._train_mask

    @property
    def val_mask(self) -> torch.Tensor:
        r"""
        Returns the validation mask of the dataset
        Returns:
            val_mask: the mask for edges in the validation set
        """
        if self._val_mask is None:
            raise ValueError("validation split hasn't been loaded")
        return self._val_mask

    @property
    def test_mask(self) -> torch.Tensor:
        r"""
        Returns the test mask of the dataset:
        Returns:
            test_mask: the mask for edges in the test set
        """
        if self._test_mask is None:
            raise ValueError("test split hasn't been loaded")
        return self._test_mask

    @property
    def node_feat(self) -> torch.Tensor:
        r"""
        Returns the node features of the dataset
        Returns:
            node_feat: the node features
        """
        return self._node_feat

    @property
    def node_type(self) -> torch.Tensor:
        r"""
        Returns the node types of the dataset
        Returns:
            node_type: the node types [N]
        """
        return self._node_type

    @property
    def src(self) -> torch.Tensor:
        r"""
        Returns the source nodes of the dataset
        Returns:
            src: the idx of the source nodes
        """
        return self._src

    @property
    def dst(self) -> torch.Tensor:
        r"""
        Returns the destination nodes of the dataset
        Returns:
            dst: the idx of the destination nodes
        """
        return self._dst

    @property
    def ts(self) -> torch.Tensor:
        r"""
        Returns the timestamps of the dataset
        Returns:
            ts: the timestamps of the edges
        """
        return self._ts

    @property
    def static_data(self) -> torch.Tensor:
        r"""
        Returns the static data of the dataset for tkgl-wikidata and tkgl-smallpedia
        Returns:
            static_data: the static data of the dataset
        """
        if (self._static_data is None):
            static_dict = {}
            static_dict["head"] = torch.from_numpy(self.dataset.static_data["head"]).long()
            static_dict["tail"] = torch.from_numpy(self.dataset.static_data["tail"]).long()
            static_dict["edge_type"] = torch.from_numpy(self.dataset.static_data["edge_type"]).long()
            self._static_data = static_dict
            return self._static_data
        else:
            return self._static_data 

    @property
    def edge_type(self) -> torch.Tensor:
        r"""
        Returns the edge types for each edge
        Returns:
            edge_type: edge type tensor (int)
        """
        return self._edge_type

    @property
    def edge_feat(self) -> torch.Tensor:
        r"""
        Returns the edge features of the dataset
        Returns:
            edge_feat: the edge features
        """
        return self._edge_feat

    @property
    def edge_label(self) -> torch.Tensor:
        r"""
        Returns the edge labels of the dataset
        Returns:
            edge_label: the labels of the edges
        """
        return self._edge_label

    def process_data(self) -> None:
        r"""
        convert the numpy arrays from dataset to pytorch tensors
        """
        src = torch.from_numpy(self.dataset.full_data["sources"])
        dst = torch.from_numpy(self.dataset.full_data["destinations"])
        ts = torch.from_numpy(self.dataset.full_data["timestamps"])
        msg = torch.from_numpy(
            self.dataset.full_data["edge_feat"]
        )  # use edge features here if available
        edge_label = torch.from_numpy(
            self.dataset.full_data["edge_label"]
        )  # this is the label indicating if an edge is a true edge, always 1 for true edges


        # * first check typing for all tensors
        # source tensor must be of type int64
        # warnings.warn("sources tensor is not of type int64 or int32, forcing conversion")
        if src.dtype != torch.int64:
            src = src.long()

        # destination tensor must be of type int64
        if dst.dtype != torch.int64:
            dst = dst.long()

        # timestamp tensor must be of type int64
        if ts.dtype != torch.int64:
            ts = ts.long()

        # message tensor must be of type float32
        if msg.dtype != torch.float32:
            msg = msg.float()

        #* for tkg
        if ("edge_type" in self.dataset.full_data):
            edge_type = torch.from_numpy(self.dataset.full_data["edge_type"])
            if edge_type.dtype != torch.int64:
                edge_type = edge_type.long()
            self._edge_type = edge_type

        self._src = src
        self._dst = dst
        self._ts = ts
        self._edge_label = edge_label
        self._edge_feat = msg

    def get_TemporalData(self) -> TemporalData:
        """
        return the TemporalData object for the entire dataset
        """
        if (self._edge_type is not None):
            data = TemporalData(
                src=self._src,
                dst=self._dst,
                t=self._ts,
                msg=self._edge_feat,
                y=self._edge_label,
                edge_type=self._edge_type
            )
        else:
            data = TemporalData(
                src=self._src,
                dst=self._dst,
                t=self._ts,
                msg=self._edge_feat,
                y=self._edge_label,
            )
        return data

    def len(self) -> int:
        """
        size of the dataset
        Returns:
            size: int
        """
        return self._src.shape[0]

    def get(self, idx: int) -> TemporalData:
        """
        construct temporal data object for a single edge
        Parameters:
            idx: index of the edge
        Returns:
            data: TemporalData object
        """
        if (self._edge_type is not None):
            data = TemporalData(
                src=self._src[idx],
                dst=self._dst[idx],
                t=self._ts[idx],
                msg=self._edge_feat[idx],
                y=self._edge_label[idx],
                edge_type=self._edge_type[idx]
            )
        else:
            data = TemporalData(
                src=self._src[idx],
                dst=self._dst[idx],
                t=self._ts[idx],
                msg=self._edge_feat[idx],
                y=self._edge_label[idx],
            )
        return data

    def __repr__(self) -> str:
        return f"{self.name.capitalize()}()"

dst: torch.Tensor property

Returns the destination nodes of the dataset Returns: dst: the idx of the destination nodes

edge_feat: torch.Tensor property

Returns the edge features of the dataset Returns: edge_feat: the edge features

edge_label: torch.Tensor property

Returns the edge labels of the dataset Returns: edge_label: the labels of the edges

edge_type: torch.Tensor property

Returns the edge types for each edge Returns: edge_type: edge type tensor (int)

eval_metric: str property

the official evaluation metric for the dataset, loaded from info.py Returns: eval_metric: str, the evaluation metric

negative_sampler: NegativeEdgeSampler property

Returns the negative sampler of the dataset, will load negative samples from disc Returns: negative_sampler: NegativeEdgeSampler

node_feat: torch.Tensor property

Returns the node features of the dataset Returns: node_feat: the node features

node_type: torch.Tensor property

Returns the node types of the dataset Returns: node_type: the node types [N]

num_edges: int property

Returns the total number of edges in the dataset Returns: num_edges: int, the number of edges

num_nodes: int property

Returns the total number of unique nodes in the dataset Returns: num_nodes: int, the number of unique nodes

num_rels: int property

Returns the total number of unique relations in the dataset Returns: num_rels: int, the number of unique relations

src: torch.Tensor property

Returns the source nodes of the dataset Returns: src: the idx of the source nodes

static_data: torch.Tensor property

Returns the static data of the dataset for tkgl-wikidata and tkgl-smallpedia Returns: static_data: the static data of the dataset

test_mask: torch.Tensor property

Returns the test mask of the dataset: Returns: test_mask: the mask for edges in the test set

train_mask: torch.Tensor property

Returns the train mask of the dataset Returns: train_mask: the mask for edges in the training set

ts: torch.Tensor property

Returns the timestamps of the dataset Returns: ts: the timestamps of the edges

val_mask: torch.Tensor property

Returns the validation mask of the dataset Returns: val_mask: the mask for edges in the validation set

__init__(name, root, transform=None, pre_transform=None)

PyG wrapper for the LinkPropPredDataset can return pytorch tensors for src,dst,t,msg,label can return Temporal Data object Parameters: name: name of the dataset, passed to LinkPropPredDataset root (string): Root directory where the dataset should be saved, passed to LinkPropPredDataset transform (callable, optional): A function/transform that takes in an, not used in this case pre_transform (callable, optional): A function/transform that takes in, not used in this case

Source code in tgb/linkproppred/dataset_pyg.py
def __init__(
    self,
    name: str,
    root: str,
    transform: Optional[Callable] = None,
    pre_transform: Optional[Callable] = None,
):
    r"""
    PyG wrapper for the LinkPropPredDataset
    can return pytorch tensors for src,dst,t,msg,label
    can return Temporal Data object
    Parameters:
        name: name of the dataset, passed to `LinkPropPredDataset`
        root (string): Root directory where the dataset should be saved, passed to `LinkPropPredDataset`
        transform (callable, optional): A function/transform that takes in an, not used in this case
        pre_transform (callable, optional): A function/transform that takes in, not used in this case
    """
    self.name = name
    self.root = root
    self.dataset = LinkPropPredDataset(name=name, root=root)
    self._train_mask = torch.from_numpy(self.dataset.train_mask)
    self._val_mask = torch.from_numpy(self.dataset.val_mask)
    self._test_mask = torch.from_numpy(self.dataset.test_mask)
    super().__init__(root, transform, pre_transform)
    self._node_feat = self.dataset.node_feat
    self._edge_type = None
    self._static_data = None

    if self._node_feat is None:
        self._node_feat = None
    else:
        self._node_feat = torch.from_numpy(self._node_feat).float()

    self._node_type = self.dataset.node_type
    if self.node_type is not None:
        self._node_type = torch.from_numpy(self.dataset.node_type).long()

    self.process_data()

    self._ns_sampler = self.dataset.negative_sampler

get(idx)

construct temporal data object for a single edge Parameters: idx: index of the edge Returns: data: TemporalData object

Source code in tgb/linkproppred/dataset_pyg.py
def get(self, idx: int) -> TemporalData:
    """
    construct temporal data object for a single edge
    Parameters:
        idx: index of the edge
    Returns:
        data: TemporalData object
    """
    if (self._edge_type is not None):
        data = TemporalData(
            src=self._src[idx],
            dst=self._dst[idx],
            t=self._ts[idx],
            msg=self._edge_feat[idx],
            y=self._edge_label[idx],
            edge_type=self._edge_type[idx]
        )
    else:
        data = TemporalData(
            src=self._src[idx],
            dst=self._dst[idx],
            t=self._ts[idx],
            msg=self._edge_feat[idx],
            y=self._edge_label[idx],
        )
    return data

get_TemporalData()

return the TemporalData object for the entire dataset

Source code in tgb/linkproppred/dataset_pyg.py
def get_TemporalData(self) -> TemporalData:
    """
    return the TemporalData object for the entire dataset
    """
    if (self._edge_type is not None):
        data = TemporalData(
            src=self._src,
            dst=self._dst,
            t=self._ts,
            msg=self._edge_feat,
            y=self._edge_label,
            edge_type=self._edge_type
        )
    else:
        data = TemporalData(
            src=self._src,
            dst=self._dst,
            t=self._ts,
            msg=self._edge_feat,
            y=self._edge_label,
        )
    return data

len()

size of the dataset Returns: size: int

Source code in tgb/linkproppred/dataset_pyg.py
def len(self) -> int:
    """
    size of the dataset
    Returns:
        size: int
    """
    return self._src.shape[0]

load_test_ns()

load the negative samples for the test set

Source code in tgb/linkproppred/dataset_pyg.py
def load_test_ns(self) -> None:
    r"""
    load the negative samples for the test set
    """
    self.dataset.load_test_ns()

load_val_ns()

load the negative samples for the validation set

Source code in tgb/linkproppred/dataset_pyg.py
def load_val_ns(self) -> None:
    r"""
    load the negative samples for the validation set
    """
    self.dataset.load_val_ns()

process_data()

convert the numpy arrays from dataset to pytorch tensors

Source code in tgb/linkproppred/dataset_pyg.py
def process_data(self) -> None:
    r"""
    convert the numpy arrays from dataset to pytorch tensors
    """
    src = torch.from_numpy(self.dataset.full_data["sources"])
    dst = torch.from_numpy(self.dataset.full_data["destinations"])
    ts = torch.from_numpy(self.dataset.full_data["timestamps"])
    msg = torch.from_numpy(
        self.dataset.full_data["edge_feat"]
    )  # use edge features here if available
    edge_label = torch.from_numpy(
        self.dataset.full_data["edge_label"]
    )  # this is the label indicating if an edge is a true edge, always 1 for true edges


    # * first check typing for all tensors
    # source tensor must be of type int64
    # warnings.warn("sources tensor is not of type int64 or int32, forcing conversion")
    if src.dtype != torch.int64:
        src = src.long()

    # destination tensor must be of type int64
    if dst.dtype != torch.int64:
        dst = dst.long()

    # timestamp tensor must be of type int64
    if ts.dtype != torch.int64:
        ts = ts.long()

    # message tensor must be of type float32
    if msg.dtype != torch.float32:
        msg = msg.float()

    #* for tkg
    if ("edge_type" in self.dataset.full_data):
        edge_type = torch.from_numpy(self.dataset.full_data["edge_type"])
        if edge_type.dtype != torch.int64:
            edge_type = edge_type.long()
        self._edge_type = edge_type

    self._src = src
    self._dst = dst
    self._ts = ts
    self._edge_label = edge_label
    self._edge_feat = msg

Evaluator Module for Dynamic Link Prediction

Evaluator

Bases: object

Evaluator for Link Property Prediction

Source code in tgb/linkproppred/evaluate.py
class Evaluator(object):
    r"""Evaluator for Link Property Prediction """

    def __init__(self, name: str, k_value: int = 10):
        r"""
        Parameters:
            name: name of the dataset
            k_value: the desired 'k' value for calculating metric@k
        """
        self.name = name
        self.k_value = k_value  # for computing `hits@k`
        self.valid_metric_list = ['hits@', 'mrr']
        if self.name not in DATA_EVAL_METRIC_DICT:
            raise NotImplementedError("Dataset not supported")

    def _parse_and_check_input(self, input_dict):
        r"""
        Check whether the input has the appropriate format
        Parametrers:
            input_dict: a dictionary containing "y_pred_pos", "y_pred_neg", and "eval_metric"
            note: "eval_metric" should be a list including one or more of the followin metrics: ["hits@", "mrr"]
        Returns:
            y_pred_pos: positive predicted scores
            y_pred_neg: negative predicted scores
        """

        if "eval_metric" not in input_dict:
            raise RuntimeError("Missing key of eval_metric!")

        for eval_metric in input_dict["eval_metric"]:
            if eval_metric in self.valid_metric_list:
                if "y_pred_pos" not in input_dict:
                    raise RuntimeError("Missing key of y_true")
                if "y_pred_neg" not in input_dict:
                    raise RuntimeError("Missing key of y_pred")

                y_pred_pos, y_pred_neg = input_dict["y_pred_pos"], input_dict["y_pred_neg"]

                # converting to numpy on cpu
                if torch is not None and isinstance(y_pred_pos, torch.Tensor):
                    y_pred_pos = y_pred_pos.detach().cpu().numpy()
                if torch is not None and isinstance(y_pred_neg, torch.Tensor):
                    y_pred_neg = y_pred_neg.detach().cpu().numpy()

                # check type and shape
                if not isinstance(y_pred_pos, np.ndarray) or not isinstance(y_pred_neg, np.ndarray):
                    raise RuntimeError(
                        "Arguments to Evaluator need to be either numpy ndarray or torch tensor!"
                    )
            else:
                print(
                    "ERROR: The evaluation metric should be in:", self.valid_metric_list
                )
                raise ValueError("Unsupported eval metric %s " % (eval_metric))
        self.eval_metric = input_dict["eval_metric"]

        return y_pred_pos, y_pred_neg

    def _eval_hits_and_mrr(self, y_pred_pos, y_pred_neg, type_info, k_value):
        r"""
        compute hist@k and mrr
        reference:
            - https://github.com/snap-stanford/ogb/blob/d5c11d91c9e1c22ed090a2e0bbda3fe357de66e7/ogb/linkproppred/evaluate.py#L214

        Parameters:
            y_pred_pos: positive predicted scores
            y_pred_neg: negative predicted scores
            type_info: type of the predicted scores; could be 'torch' or 'numpy'
            k_value: the desired 'k' value for calculating metric@k

        Returns:
            a dictionary containing the computed performance metrics
        """
        if type_info == 'torch':
            # calculate ranks
            y_pred_pos = y_pred_pos.view(-1, 1)
            # optimistic rank: "how many negatives have a larger score than the positive?"
            # ~> the positive is ranked first among those with equal score
            optimistic_rank = (y_pred_neg > y_pred_pos).sum(dim=1)
            # pessimistic rank: "how many negatives have at least the positive score?"
            # ~> the positive is ranked last among those with equal score
            pessimistic_rank = (y_pred_neg >= y_pred_pos).sum(dim=1)
            ranking_list = 0.5 * (optimistic_rank + pessimistic_rank) + 1
            hitsK_list = (ranking_list <= k_value).to(torch.float)
            mrr_list = 1./ranking_list.to(torch.float)

            return {
                    f'hits@{k_value}': hitsK_list.mean(),
                    'mrr': mrr_list.mean()
                    }

        else:
            y_pred_pos = y_pred_pos.reshape(-1, 1)
            optimistic_rank = (y_pred_neg > y_pred_pos).sum(axis=1)
            pessimistic_rank = (y_pred_neg >= y_pred_pos).sum(axis=1)
            ranking_list = 0.5 * (optimistic_rank + pessimistic_rank) + 1
            hitsK_list = (ranking_list <= k_value).astype(np.float32)
            mrr_list = 1./ranking_list.astype(np.float32)

            return {
                    f'hits@{k_value}': hitsK_list.mean(),
                    'mrr': mrr_list.mean()
                    }

    def eval(self, 
             input_dict: dict, 
             verbose: bool = False) -> dict:
        r"""
        evaluate the link prediction task
        this method is callable through an instance of this object to compute the metric

        Parameters:
            input_dict: a dictionary containing "y_pred_pos", "y_pred_neg", and "eval_metric"
                        the performance metric is calculated for the provided scores
            verbose: whether to print out the computed metric

        Returns:
            perf_dict: a dictionary containing the computed performance metric
        """
        y_pred_pos, y_pred_neg = self._parse_and_check_input(input_dict)  # convert the predictions to numpy
        perf_dict = self._eval_hits_and_mrr(y_pred_pos, y_pred_neg, type_info='numpy', k_value=self.k_value)

        return perf_dict

__init__(name, k_value=10)

Parameters:

Name Type Description Default
name str

name of the dataset

required
k_value int

the desired 'k' value for calculating metric@k

10
Source code in tgb/linkproppred/evaluate.py
def __init__(self, name: str, k_value: int = 10):
    r"""
    Parameters:
        name: name of the dataset
        k_value: the desired 'k' value for calculating metric@k
    """
    self.name = name
    self.k_value = k_value  # for computing `hits@k`
    self.valid_metric_list = ['hits@', 'mrr']
    if self.name not in DATA_EVAL_METRIC_DICT:
        raise NotImplementedError("Dataset not supported")

eval(input_dict, verbose=False)

evaluate the link prediction task this method is callable through an instance of this object to compute the metric

Parameters:

Name Type Description Default
input_dict dict

a dictionary containing "y_pred_pos", "y_pred_neg", and "eval_metric" the performance metric is calculated for the provided scores

required
verbose bool

whether to print out the computed metric

False

Returns:

Name Type Description
perf_dict dict

a dictionary containing the computed performance metric

Source code in tgb/linkproppred/evaluate.py
def eval(self, 
         input_dict: dict, 
         verbose: bool = False) -> dict:
    r"""
    evaluate the link prediction task
    this method is callable through an instance of this object to compute the metric

    Parameters:
        input_dict: a dictionary containing "y_pred_pos", "y_pred_neg", and "eval_metric"
                    the performance metric is calculated for the provided scores
        verbose: whether to print out the computed metric

    Returns:
        perf_dict: a dictionary containing the computed performance metric
    """
    y_pred_pos, y_pred_neg = self._parse_and_check_input(input_dict)  # convert the predictions to numpy
    perf_dict = self._eval_hits_and_mrr(y_pred_pos, y_pred_neg, type_info='numpy', k_value=self.k_value)

    return perf_dict

Sample negative edges for evaluation of dynamic link prediction Load already generated negative edges from file, batch them based on the positive edge, and return the evaluation set

NegativeEdgeSampler

Bases: object

Source code in tgb/linkproppred/negative_sampler.py
class NegativeEdgeSampler(object):
    def __init__(
        self,
        dataset_name: str,
        first_dst_id: int = 0,
        last_dst_id: int = 0,
        strategy: str = "hist_rnd",
    ) -> None:
        r"""
        Negative Edge Sampler
            Loads and query the negative batches based on the positive batches provided.
        constructor for the negative edge sampler class

        Parameters:
            dataset_name: name of the dataset
            first_dst_id: identity of the first destination node
            last_dst_id: indentity of the last destination node
            strategy: will always load the pre-generated negatives

        Returns:
            None
        """
        self.dataset_name = dataset_name
        assert strategy in [
            "rnd",
            "hist_rnd",
        ], "The supported strategies are `rnd` or `hist_rnd`!"
        self.strategy = strategy
        self.eval_set = {}

    def load_eval_set(
        self,
        fname: str,
        split_mode: str = "val",
    ) -> None:
        r"""
        Load the evaluation set from disk, can be either val or test set ns samples
        Parameters:
            fname: the file name of the evaluation ns on disk
            split_mode: the split mode of the evaluation set, can be either `val` or `test`

        Returns:
            None
        """
        assert split_mode in [
            "val",
            "test",
        ], "Invalid split-mode! It should be `val`, `test`"
        if not os.path.exists(fname):
            raise FileNotFoundError(f"File not found at {fname}")
        self.eval_set[split_mode] = load_pkl(fname)

    def reset_eval_set(self, 
                       split_mode: str = "test",
                       ) -> None:
        r"""
        Reset evaluation set

        Parameters:
            split_mode: specifies whether to generate negative edges for 'validation' or 'test' splits

        Returns:
            None
        """
        assert split_mode in [
            "val",
            "test",
        ], "Invalid split-mode! It should be `val`, `test`!"
        self.eval_set[split_mode] = None

    def query_batch(self, 
                    pos_src: Tensor, 
                    pos_dst: Tensor, 
                    pos_timestamp: Tensor, 
                    edge_type: Tensor = None,
                    split_mode: str = "test") -> list:
        r"""
        For each positive edge in the `pos_batch`, return a list of negative edges
        `split_mode` specifies whether the valiation or test evaluation set should be retrieved.
        modify now to include edge type argument

        Parameters:
            pos_src: list of positive source nodes
            pos_dst: list of positive destination nodes
            pos_timestamp: list of timestamps of the positive edges
            split_mode: specifies whether to generate negative edges for 'validation' or 'test' splits

        Returns:
            neg_samples: a list of list; each internal list contains the set of negative edges that
                        should be evaluated against each positive edge.
        """
        assert split_mode in [
            "val",
            "test",
        ], "Invalid split-mode! It should be `val`, `test`!"
        if self.eval_set[split_mode] == None:
            raise ValueError(
                f"Evaluation set is None! You should load the {split_mode} evaluation set first!"
            )

        # check the argument types...
        if torch is not None and isinstance(pos_src, torch.Tensor):
            pos_src = pos_src.detach().cpu().numpy()
        if torch is not None and isinstance(pos_dst, torch.Tensor):
            pos_dst = pos_dst.detach().cpu().numpy()
        if torch is not None and isinstance(pos_timestamp, torch.Tensor):
            pos_timestamp = pos_timestamp.detach().cpu().numpy()
        if torch is not None and isinstance(edge_type, torch.Tensor):
            edge_type = edge_type.detach().cpu().numpy()

        if not isinstance(pos_src, np.ndarray) or not isinstance(pos_dst, np.ndarray) or not(pos_timestamp, np.ndarray):
            raise RuntimeError(
                "pos_src, pos_dst, and pos_timestamp need to be either numpy ndarray or torch tensor!"
                )

        neg_samples = []
        if (edge_type is None):
            for pos_s, pos_d, pos_t in zip(pos_src, pos_dst, pos_timestamp):
                if (pos_s, pos_d, pos_t) not in self.eval_set[split_mode]:
                    raise ValueError(
                        f"The edge ({pos_s}, {pos_d}, {pos_t}) is not in the '{split_mode}' evaluation set! Please check the implementation."
                    )
                else:
                    neg_samples.append(
                        [
                            int(neg_dst)
                            for neg_dst in self.eval_set[split_mode][(pos_s, pos_d, pos_t)]
                        ]
                    )
        else:
            for pos_s, pos_d, pos_t, e_type in zip(pos_src, pos_dst, pos_timestamp, edge_type):
                if (pos_s, pos_d, pos_t, e_type) not in self.eval_set[split_mode]:
                    raise ValueError(
                        f"The edge ({pos_s}, {pos_d}, {pos_t}, {e_type}) is not in the '{split_mode}' evaluation set! Please check the implementation."
                    )
                else:
                    neg_samples.append(
                        [
                            int(neg_dst)
                            for neg_dst in self.eval_set[split_mode][(pos_s, pos_d, pos_t, e_type)]
                        ]
                    )

        return neg_samples

__init__(dataset_name, first_dst_id=0, last_dst_id=0, strategy='hist_rnd')

Negative Edge Sampler Loads and query the negative batches based on the positive batches provided. constructor for the negative edge sampler class

Parameters:

Name Type Description Default
dataset_name str

name of the dataset

required
first_dst_id int

identity of the first destination node

0
last_dst_id int

indentity of the last destination node

0
strategy str

will always load the pre-generated negatives

'hist_rnd'

Returns:

Type Description
None

None

Source code in tgb/linkproppred/negative_sampler.py
def __init__(
    self,
    dataset_name: str,
    first_dst_id: int = 0,
    last_dst_id: int = 0,
    strategy: str = "hist_rnd",
) -> None:
    r"""
    Negative Edge Sampler
        Loads and query the negative batches based on the positive batches provided.
    constructor for the negative edge sampler class

    Parameters:
        dataset_name: name of the dataset
        first_dst_id: identity of the first destination node
        last_dst_id: indentity of the last destination node
        strategy: will always load the pre-generated negatives

    Returns:
        None
    """
    self.dataset_name = dataset_name
    assert strategy in [
        "rnd",
        "hist_rnd",
    ], "The supported strategies are `rnd` or `hist_rnd`!"
    self.strategy = strategy
    self.eval_set = {}

load_eval_set(fname, split_mode='val')

Load the evaluation set from disk, can be either val or test set ns samples Parameters: fname: the file name of the evaluation ns on disk split_mode: the split mode of the evaluation set, can be either val or test

Returns:

Type Description
None

None

Source code in tgb/linkproppred/negative_sampler.py
def load_eval_set(
    self,
    fname: str,
    split_mode: str = "val",
) -> None:
    r"""
    Load the evaluation set from disk, can be either val or test set ns samples
    Parameters:
        fname: the file name of the evaluation ns on disk
        split_mode: the split mode of the evaluation set, can be either `val` or `test`

    Returns:
        None
    """
    assert split_mode in [
        "val",
        "test",
    ], "Invalid split-mode! It should be `val`, `test`"
    if not os.path.exists(fname):
        raise FileNotFoundError(f"File not found at {fname}")
    self.eval_set[split_mode] = load_pkl(fname)

query_batch(pos_src, pos_dst, pos_timestamp, edge_type=None, split_mode='test')

For each positive edge in the pos_batch, return a list of negative edges split_mode specifies whether the valiation or test evaluation set should be retrieved. modify now to include edge type argument

Parameters:

Name Type Description Default
pos_src Tensor

list of positive source nodes

required
pos_dst Tensor

list of positive destination nodes

required
pos_timestamp Tensor

list of timestamps of the positive edges

required
split_mode str

specifies whether to generate negative edges for 'validation' or 'test' splits

'test'

Returns:

Name Type Description
neg_samples list

a list of list; each internal list contains the set of negative edges that should be evaluated against each positive edge.

Source code in tgb/linkproppred/negative_sampler.py
def query_batch(self, 
                pos_src: Tensor, 
                pos_dst: Tensor, 
                pos_timestamp: Tensor, 
                edge_type: Tensor = None,
                split_mode: str = "test") -> list:
    r"""
    For each positive edge in the `pos_batch`, return a list of negative edges
    `split_mode` specifies whether the valiation or test evaluation set should be retrieved.
    modify now to include edge type argument

    Parameters:
        pos_src: list of positive source nodes
        pos_dst: list of positive destination nodes
        pos_timestamp: list of timestamps of the positive edges
        split_mode: specifies whether to generate negative edges for 'validation' or 'test' splits

    Returns:
        neg_samples: a list of list; each internal list contains the set of negative edges that
                    should be evaluated against each positive edge.
    """
    assert split_mode in [
        "val",
        "test",
    ], "Invalid split-mode! It should be `val`, `test`!"
    if self.eval_set[split_mode] == None:
        raise ValueError(
            f"Evaluation set is None! You should load the {split_mode} evaluation set first!"
        )

    # check the argument types...
    if torch is not None and isinstance(pos_src, torch.Tensor):
        pos_src = pos_src.detach().cpu().numpy()
    if torch is not None and isinstance(pos_dst, torch.Tensor):
        pos_dst = pos_dst.detach().cpu().numpy()
    if torch is not None and isinstance(pos_timestamp, torch.Tensor):
        pos_timestamp = pos_timestamp.detach().cpu().numpy()
    if torch is not None and isinstance(edge_type, torch.Tensor):
        edge_type = edge_type.detach().cpu().numpy()

    if not isinstance(pos_src, np.ndarray) or not isinstance(pos_dst, np.ndarray) or not(pos_timestamp, np.ndarray):
        raise RuntimeError(
            "pos_src, pos_dst, and pos_timestamp need to be either numpy ndarray or torch tensor!"
            )

    neg_samples = []
    if (edge_type is None):
        for pos_s, pos_d, pos_t in zip(pos_src, pos_dst, pos_timestamp):
            if (pos_s, pos_d, pos_t) not in self.eval_set[split_mode]:
                raise ValueError(
                    f"The edge ({pos_s}, {pos_d}, {pos_t}) is not in the '{split_mode}' evaluation set! Please check the implementation."
                )
            else:
                neg_samples.append(
                    [
                        int(neg_dst)
                        for neg_dst in self.eval_set[split_mode][(pos_s, pos_d, pos_t)]
                    ]
                )
    else:
        for pos_s, pos_d, pos_t, e_type in zip(pos_src, pos_dst, pos_timestamp, edge_type):
            if (pos_s, pos_d, pos_t, e_type) not in self.eval_set[split_mode]:
                raise ValueError(
                    f"The edge ({pos_s}, {pos_d}, {pos_t}, {e_type}) is not in the '{split_mode}' evaluation set! Please check the implementation."
                )
            else:
                neg_samples.append(
                    [
                        int(neg_dst)
                        for neg_dst in self.eval_set[split_mode][(pos_s, pos_d, pos_t, e_type)]
                    ]
                )

    return neg_samples

reset_eval_set(split_mode='test')

Reset evaluation set

Parameters:

Name Type Description Default
split_mode str

specifies whether to generate negative edges for 'validation' or 'test' splits

'test'

Returns:

Type Description
None

None

Source code in tgb/linkproppred/negative_sampler.py
def reset_eval_set(self, 
                   split_mode: str = "test",
                   ) -> None:
    r"""
    Reset evaluation set

    Parameters:
        split_mode: specifies whether to generate negative edges for 'validation' or 'test' splits

    Returns:
        None
    """
    assert split_mode in [
        "val",
        "test",
    ], "Invalid split-mode! It should be `val`, `test`!"
    self.eval_set[split_mode] = None

Sample and Generate negative edges that are going to be used for evaluation of a dynamic graph learning model Negative samples are generated and saved to files ONLY once; other times, they should be loaded from file with instances of the negative_sampler.py.

NegativeEdgeGenerator

Bases: object

Source code in tgb/linkproppred/negative_generator.py
class NegativeEdgeGenerator(object):
    def __init__(
        self,
        dataset_name: str,
        first_dst_id: int,
        last_dst_id: int,
        num_neg_e: int = 100,  # number of negative edges sampled per positive edges --> make it constant => 1000
        strategy: str = "rnd",
        rnd_seed: int = 123,
        hist_ratio: float = 0.5,
        historical_data: TemporalData = None,
    ) -> None:
        r"""
        Negative Edge Sampler class
        this is a class for generating negative samples for a specific datasets
        the set of the positive samples are provided, the negative samples are generated with specific strategies 
        and are saved for consistent evaluation across different methods
        negative edges are sampled with 'oen_vs_many' strategy.
        it is assumed that the destination nodes are indexed sequentially with 'first_dst_id' 
        and 'last_dst_id' being the first and last index, respectively.

        Parameters:
            dataset_name: name of the dataset
            first_dst_id: identity of the first destination node
            last_dst_id: indentity of the last destination node
            num_neg_e: number of negative edges being generated per each positive edge
            strategy: how to generate negative edges; can be 'rnd' or 'hist_rnd'
            rnd_seed: random seed for consistency
            hist_ratio: if the startegy is 'hist_rnd', how much of the negatives are historical
            historical_data: previous records of the positive edges

        Returns:
            None
        """
        self.rnd_seed = rnd_seed
        np.random.seed(self.rnd_seed)
        self.dataset_name = dataset_name

        self.first_dst_id = first_dst_id
        self.last_dst_id = last_dst_id
        self.num_neg_e = num_neg_e
        assert strategy in [
            "rnd",
            "hist_rnd",
        ], "The supported strategies are `rnd` or `hist_rnd`!"
        self.strategy = strategy
        if self.strategy == "hist_rnd":
            assert (
                historical_data != None
            ), "Train data should be passed when `hist_rnd` strategy is selected."
            self.hist_ratio = hist_ratio
            self.historical_data = historical_data

    def generate_negative_samples(self, 
                                  data: TemporalData, 
                                  split_mode: str, 
                                  partial_path: str,
                                  ) -> None:
        r"""
        Generate negative samples

        Parameters:
            data: an object containing positive edges information
            split_mode: specifies whether to generate negative edges for 'validation' or 'test' splits
            partial_path: in which directory save the generated negatives
        """
        # file name for saving or loading...
        filename = (
            partial_path
            + "/"
            + self.dataset_name
            + "_"
            + split_mode
            + "_"
            + "ns"
            + ".pkl"
        )

        if self.strategy == "rnd":
            self.generate_negative_samples_rnd(data, split_mode, filename)
        elif self.strategy == "hist_rnd":
            self.generate_negative_samples_hist_rnd(
                self.historical_data, data, split_mode, filename
            )
        else:
            raise ValueError("Unsupported negative sample generation strategy!")

    def generate_negative_samples_rnd(self, 
                                      data: TemporalData, 
                                      split_mode: str, 
                                      filename: str,
                                      ) -> None:
        r"""
        Generate negative samples based on the `HIST-RND` strategy:
            - for each positive edge, sample a batch of negative edges from all possible edges with the same source node
            - filter actual positive edges

        Parameters:
            data: an object containing positive edges information
            split_mode: specifies whether to generate negative edges for 'validation' or 'test' splits
            filename: name of the file containing the generated negative edges
        """
        print(
            f"INFO: Negative Sampling Strategy: {self.strategy}, Data Split: {split_mode}"
        )
        assert split_mode in [
            "val",
            "test",
        ], "Invalid split-mode! It should be `val` or `test`!"

        if os.path.exists(filename):
            print(
                f"INFO: negative samples for '{split_mode}' evaluation are already generated!"
            )
        else:
            print(f"INFO: Generating negative samples for '{split_mode}' evaluation!")
            # retrieve the information from the batch
            pos_src, pos_dst, pos_timestamp = (
                data.src.cpu().numpy(),
                data.dst.cpu().numpy(),
                data.t.cpu().numpy(),
            )

            # all possible destinations
            all_dst = np.arange(self.first_dst_id, self.last_dst_id + 1)

            evaluation_set = {}
            # generate a list of negative destinations for each positive edge
            pos_edge_tqdm = tqdm(
                zip(pos_src, pos_dst, pos_timestamp), total=len(pos_src)
            )
            for (
                pos_s,
                pos_d,
                pos_t,
            ) in pos_edge_tqdm:
                t_mask = pos_timestamp == pos_t
                src_mask = pos_src == pos_s
                fn_mask = np.logical_and(t_mask, src_mask)
                pos_e_dst_same_src = pos_dst[fn_mask]
                filtered_all_dst = np.setdiff1d(all_dst, pos_e_dst_same_src)

                '''
                when num_neg_e is larger than all possible destinations simple return all possible destinations
                '''
                if (self.num_neg_e > len(filtered_all_dst)):
                    neg_d_arr = filtered_all_dst
                else:
                    neg_d_arr = np.random.choice(
                    filtered_all_dst, self.num_neg_e, replace=False) #never replace negatives

                evaluation_set[(pos_s, pos_d, pos_t)] = neg_d_arr

            # save the generated evaluation set to disk
            save_pkl(evaluation_set, filename)

    def generate_historical_edge_set(self, 
                                     historical_data: TemporalData,
                                     ) -> tuple:
        r"""
        Generate the set of edges seen durign training or validation

        ONLY `train_data` should be passed as historical data; i.e., the HISTORICAL negative edges should be selected from training data only.

        Parameters:
            historical_data: contains the positive edges observed previously

        Returns:
            historical_edges: distict historical positive edges
            hist_edge_set_per_node: historical edges observed for each node
        """
        sources = historical_data.src.cpu().numpy()
        destinations = historical_data.dst.cpu().numpy()
        historical_edges = {}
        hist_e_per_node = {}
        for src, dst in zip(sources, destinations):
            # edge-centric
            if (src, dst) not in historical_edges:
                historical_edges[(src, dst)] = 1

            # node-centric
            if src not in hist_e_per_node:
                hist_e_per_node[src] = [dst]
            else:
                hist_e_per_node[src].append(dst)

        hist_edge_set_per_node = {}
        for src, dst_list in hist_e_per_node.items():
            hist_edge_set_per_node[src] = np.array(list(set(dst_list)))

        return historical_edges, hist_edge_set_per_node

    def generate_negative_samples_hist_rnd(
        self, 
        historical_data : TemporalData, 
        data: TemporalData, 
        split_mode: str, 
        filename: str,
    ) -> None:
        r"""
        Generate negative samples based on the `HIST-RND` strategy:
            - up to 50% of the negative samples are selected from the set of edges seen during the training with the same source node.
            - the rest of the negative edges are randomly sampled with the fixed source node.

        Parameters:
            historical_data: contains the history of the observed positive edges including 
                            distinct positive edges and edges observed for each positive node
            data: an object containing positive edges information
            split_mode: specifies whether to generate negative edges for 'validation' or 'test' splits
            filename: name of the file to save generated negative edges

        Returns:
            None
        """
        print(
            f"INFO: Negative Sampling Strategy: {self.strategy}, Data Split: {split_mode}"
        )
        assert split_mode in [
            "val",
            "test",
        ], "Invalid split-mode! It should be `val` or `test`!"

        if os.path.exists(filename):
            print(
                f"INFO: negative samples for '{split_mode}' evaluation are already generated!"
            )
        else:
            print(f"INFO: Generating negative samples for '{split_mode}' evaluation!")
            # retrieve the information from the batch
            pos_src, pos_dst, pos_timestamp = (
                data.src.cpu().numpy(),
                data.dst.cpu().numpy(),
                data.t.cpu().numpy(),
            )

            pos_ts_edge_dict = {} #{ts: {src: [dsts]}}
            pos_edge_tqdm = tqdm(
                zip(pos_src, pos_dst, pos_timestamp), total=len(pos_src)
            )
            for (
                pos_s,
                pos_d,
                pos_t,
            ) in pos_edge_tqdm:
                if (pos_t not in pos_ts_edge_dict):
                    pos_ts_edge_dict[pos_t] = {pos_s: [pos_d]}
                else:
                    if (pos_s not in pos_ts_edge_dict[pos_t]):
                        pos_ts_edge_dict[pos_t][pos_s] = [pos_d]
                    else:
                        pos_ts_edge_dict[pos_t][pos_s].append(pos_d)

            # all possible destinations
            all_dst = np.arange(self.first_dst_id, self.last_dst_id + 1)

            # get seen edge history
            (
                historical_edges,
                hist_edge_set_per_node,
            ) = self.generate_historical_edge_set(historical_data)

            # sample historical edges
            max_num_hist_neg_e = int(self.num_neg_e * self.hist_ratio)

            evaluation_set = {}
            # generate a list of negative destinations for each positive edge
            pos_edge_tqdm = tqdm(
                zip(pos_src, pos_dst, pos_timestamp), total=len(pos_src)
            )
            for (
                pos_s,
                pos_d,
                pos_t,
            ) in pos_edge_tqdm:
                pos_e_dst_same_src = np.array(pos_ts_edge_dict[pos_t][pos_s])

                # sample historical edges
                num_hist_neg_e = 0
                neg_hist_dsts = np.array([])
                seen_dst = []
                if pos_s in hist_edge_set_per_node:
                    seen_dst = hist_edge_set_per_node[pos_s]
                    if len(seen_dst) >= 1:
                        filtered_all_seen_dst = np.setdiff1d(seen_dst, pos_e_dst_same_src)
                        #filtered_all_seen_dst = seen_dst #! no collision check
                        num_hist_neg_e = (
                            max_num_hist_neg_e
                            if max_num_hist_neg_e <= len(filtered_all_seen_dst)
                            else len(filtered_all_seen_dst)
                        )
                        neg_hist_dsts = np.random.choice(
                            filtered_all_seen_dst, num_hist_neg_e, replace=False
                        )

                # sample random edges
                if (len(seen_dst) >= 1):
                    invalid_dst = np.concatenate((np.array(pos_e_dst_same_src), seen_dst))
                else:
                    invalid_dst = np.array(pos_e_dst_same_src)
                filtered_all_rnd_dst = np.setdiff1d(all_dst, invalid_dst)

                num_rnd_neg_e = self.num_neg_e - num_hist_neg_e
                '''
                when num_neg_e is larger than all possible destinations simple return all possible destinations
                '''
                if (num_rnd_neg_e > len(filtered_all_rnd_dst)):
                    neg_rnd_dsts = filtered_all_rnd_dst
                else:
                    neg_rnd_dsts = np.random.choice(
                    filtered_all_rnd_dst, num_rnd_neg_e, replace=False
                )
                # concatenate the two sets: historical and random
                neg_dst_arr = np.concatenate((neg_hist_dsts, neg_rnd_dsts))
                evaluation_set[(pos_s, pos_d, pos_t)] = neg_dst_arr

            # save the generated evaluation set to disk
            save_pkl(evaluation_set, filename)

__init__(dataset_name, first_dst_id, last_dst_id, num_neg_e=100, strategy='rnd', rnd_seed=123, hist_ratio=0.5, historical_data=None)

Negative Edge Sampler class this is a class for generating negative samples for a specific datasets the set of the positive samples are provided, the negative samples are generated with specific strategies and are saved for consistent evaluation across different methods negative edges are sampled with 'oen_vs_many' strategy. it is assumed that the destination nodes are indexed sequentially with 'first_dst_id' and 'last_dst_id' being the first and last index, respectively.

Parameters:

Name Type Description Default
dataset_name str

name of the dataset

required
first_dst_id int

identity of the first destination node

required
last_dst_id int

indentity of the last destination node

required
num_neg_e int

number of negative edges being generated per each positive edge

100
strategy str

how to generate negative edges; can be 'rnd' or 'hist_rnd'

'rnd'
rnd_seed int

random seed for consistency

123
hist_ratio float

if the startegy is 'hist_rnd', how much of the negatives are historical

0.5
historical_data TemporalData

previous records of the positive edges

None

Returns:

Type Description
None

None

Source code in tgb/linkproppred/negative_generator.py
def __init__(
    self,
    dataset_name: str,
    first_dst_id: int,
    last_dst_id: int,
    num_neg_e: int = 100,  # number of negative edges sampled per positive edges --> make it constant => 1000
    strategy: str = "rnd",
    rnd_seed: int = 123,
    hist_ratio: float = 0.5,
    historical_data: TemporalData = None,
) -> None:
    r"""
    Negative Edge Sampler class
    this is a class for generating negative samples for a specific datasets
    the set of the positive samples are provided, the negative samples are generated with specific strategies 
    and are saved for consistent evaluation across different methods
    negative edges are sampled with 'oen_vs_many' strategy.
    it is assumed that the destination nodes are indexed sequentially with 'first_dst_id' 
    and 'last_dst_id' being the first and last index, respectively.

    Parameters:
        dataset_name: name of the dataset
        first_dst_id: identity of the first destination node
        last_dst_id: indentity of the last destination node
        num_neg_e: number of negative edges being generated per each positive edge
        strategy: how to generate negative edges; can be 'rnd' or 'hist_rnd'
        rnd_seed: random seed for consistency
        hist_ratio: if the startegy is 'hist_rnd', how much of the negatives are historical
        historical_data: previous records of the positive edges

    Returns:
        None
    """
    self.rnd_seed = rnd_seed
    np.random.seed(self.rnd_seed)
    self.dataset_name = dataset_name

    self.first_dst_id = first_dst_id
    self.last_dst_id = last_dst_id
    self.num_neg_e = num_neg_e
    assert strategy in [
        "rnd",
        "hist_rnd",
    ], "The supported strategies are `rnd` or `hist_rnd`!"
    self.strategy = strategy
    if self.strategy == "hist_rnd":
        assert (
            historical_data != None
        ), "Train data should be passed when `hist_rnd` strategy is selected."
        self.hist_ratio = hist_ratio
        self.historical_data = historical_data

generate_historical_edge_set(historical_data)

Generate the set of edges seen durign training or validation

ONLY train_data should be passed as historical data; i.e., the HISTORICAL negative edges should be selected from training data only.

Parameters:

Name Type Description Default
historical_data TemporalData

contains the positive edges observed previously

required

Returns:

Name Type Description
historical_edges tuple

distict historical positive edges

hist_edge_set_per_node tuple

historical edges observed for each node

Source code in tgb/linkproppred/negative_generator.py
def generate_historical_edge_set(self, 
                                 historical_data: TemporalData,
                                 ) -> tuple:
    r"""
    Generate the set of edges seen durign training or validation

    ONLY `train_data` should be passed as historical data; i.e., the HISTORICAL negative edges should be selected from training data only.

    Parameters:
        historical_data: contains the positive edges observed previously

    Returns:
        historical_edges: distict historical positive edges
        hist_edge_set_per_node: historical edges observed for each node
    """
    sources = historical_data.src.cpu().numpy()
    destinations = historical_data.dst.cpu().numpy()
    historical_edges = {}
    hist_e_per_node = {}
    for src, dst in zip(sources, destinations):
        # edge-centric
        if (src, dst) not in historical_edges:
            historical_edges[(src, dst)] = 1

        # node-centric
        if src not in hist_e_per_node:
            hist_e_per_node[src] = [dst]
        else:
            hist_e_per_node[src].append(dst)

    hist_edge_set_per_node = {}
    for src, dst_list in hist_e_per_node.items():
        hist_edge_set_per_node[src] = np.array(list(set(dst_list)))

    return historical_edges, hist_edge_set_per_node

generate_negative_samples(data, split_mode, partial_path)

Generate negative samples

Parameters:

Name Type Description Default
data TemporalData

an object containing positive edges information

required
split_mode str

specifies whether to generate negative edges for 'validation' or 'test' splits

required
partial_path str

in which directory save the generated negatives

required
Source code in tgb/linkproppred/negative_generator.py
def generate_negative_samples(self, 
                              data: TemporalData, 
                              split_mode: str, 
                              partial_path: str,
                              ) -> None:
    r"""
    Generate negative samples

    Parameters:
        data: an object containing positive edges information
        split_mode: specifies whether to generate negative edges for 'validation' or 'test' splits
        partial_path: in which directory save the generated negatives
    """
    # file name for saving or loading...
    filename = (
        partial_path
        + "/"
        + self.dataset_name
        + "_"
        + split_mode
        + "_"
        + "ns"
        + ".pkl"
    )

    if self.strategy == "rnd":
        self.generate_negative_samples_rnd(data, split_mode, filename)
    elif self.strategy == "hist_rnd":
        self.generate_negative_samples_hist_rnd(
            self.historical_data, data, split_mode, filename
        )
    else:
        raise ValueError("Unsupported negative sample generation strategy!")

generate_negative_samples_hist_rnd(historical_data, data, split_mode, filename)

Generate negative samples based on the HIST-RND strategy: - up to 50% of the negative samples are selected from the set of edges seen during the training with the same source node. - the rest of the negative edges are randomly sampled with the fixed source node.

Parameters:

Name Type Description Default
historical_data TemporalData

contains the history of the observed positive edges including distinct positive edges and edges observed for each positive node

required
data TemporalData

an object containing positive edges information

required
split_mode str

specifies whether to generate negative edges for 'validation' or 'test' splits

required
filename str

name of the file to save generated negative edges

required

Returns:

Type Description
None

None

Source code in tgb/linkproppred/negative_generator.py
def generate_negative_samples_hist_rnd(
    self, 
    historical_data : TemporalData, 
    data: TemporalData, 
    split_mode: str, 
    filename: str,
) -> None:
    r"""
    Generate negative samples based on the `HIST-RND` strategy:
        - up to 50% of the negative samples are selected from the set of edges seen during the training with the same source node.
        - the rest of the negative edges are randomly sampled with the fixed source node.

    Parameters:
        historical_data: contains the history of the observed positive edges including 
                        distinct positive edges and edges observed for each positive node
        data: an object containing positive edges information
        split_mode: specifies whether to generate negative edges for 'validation' or 'test' splits
        filename: name of the file to save generated negative edges

    Returns:
        None
    """
    print(
        f"INFO: Negative Sampling Strategy: {self.strategy}, Data Split: {split_mode}"
    )
    assert split_mode in [
        "val",
        "test",
    ], "Invalid split-mode! It should be `val` or `test`!"

    if os.path.exists(filename):
        print(
            f"INFO: negative samples for '{split_mode}' evaluation are already generated!"
        )
    else:
        print(f"INFO: Generating negative samples for '{split_mode}' evaluation!")
        # retrieve the information from the batch
        pos_src, pos_dst, pos_timestamp = (
            data.src.cpu().numpy(),
            data.dst.cpu().numpy(),
            data.t.cpu().numpy(),
        )

        pos_ts_edge_dict = {} #{ts: {src: [dsts]}}
        pos_edge_tqdm = tqdm(
            zip(pos_src, pos_dst, pos_timestamp), total=len(pos_src)
        )
        for (
            pos_s,
            pos_d,
            pos_t,
        ) in pos_edge_tqdm:
            if (pos_t not in pos_ts_edge_dict):
                pos_ts_edge_dict[pos_t] = {pos_s: [pos_d]}
            else:
                if (pos_s not in pos_ts_edge_dict[pos_t]):
                    pos_ts_edge_dict[pos_t][pos_s] = [pos_d]
                else:
                    pos_ts_edge_dict[pos_t][pos_s].append(pos_d)

        # all possible destinations
        all_dst = np.arange(self.first_dst_id, self.last_dst_id + 1)

        # get seen edge history
        (
            historical_edges,
            hist_edge_set_per_node,
        ) = self.generate_historical_edge_set(historical_data)

        # sample historical edges
        max_num_hist_neg_e = int(self.num_neg_e * self.hist_ratio)

        evaluation_set = {}
        # generate a list of negative destinations for each positive edge
        pos_edge_tqdm = tqdm(
            zip(pos_src, pos_dst, pos_timestamp), total=len(pos_src)
        )
        for (
            pos_s,
            pos_d,
            pos_t,
        ) in pos_edge_tqdm:
            pos_e_dst_same_src = np.array(pos_ts_edge_dict[pos_t][pos_s])

            # sample historical edges
            num_hist_neg_e = 0
            neg_hist_dsts = np.array([])
            seen_dst = []
            if pos_s in hist_edge_set_per_node:
                seen_dst = hist_edge_set_per_node[pos_s]
                if len(seen_dst) >= 1:
                    filtered_all_seen_dst = np.setdiff1d(seen_dst, pos_e_dst_same_src)
                    #filtered_all_seen_dst = seen_dst #! no collision check
                    num_hist_neg_e = (
                        max_num_hist_neg_e
                        if max_num_hist_neg_e <= len(filtered_all_seen_dst)
                        else len(filtered_all_seen_dst)
                    )
                    neg_hist_dsts = np.random.choice(
                        filtered_all_seen_dst, num_hist_neg_e, replace=False
                    )

            # sample random edges
            if (len(seen_dst) >= 1):
                invalid_dst = np.concatenate((np.array(pos_e_dst_same_src), seen_dst))
            else:
                invalid_dst = np.array(pos_e_dst_same_src)
            filtered_all_rnd_dst = np.setdiff1d(all_dst, invalid_dst)

            num_rnd_neg_e = self.num_neg_e - num_hist_neg_e
            '''
            when num_neg_e is larger than all possible destinations simple return all possible destinations
            '''
            if (num_rnd_neg_e > len(filtered_all_rnd_dst)):
                neg_rnd_dsts = filtered_all_rnd_dst
            else:
                neg_rnd_dsts = np.random.choice(
                filtered_all_rnd_dst, num_rnd_neg_e, replace=False
            )
            # concatenate the two sets: historical and random
            neg_dst_arr = np.concatenate((neg_hist_dsts, neg_rnd_dsts))
            evaluation_set[(pos_s, pos_d, pos_t)] = neg_dst_arr

        # save the generated evaluation set to disk
        save_pkl(evaluation_set, filename)

generate_negative_samples_rnd(data, split_mode, filename)

Generate negative samples based on the HIST-RND strategy: - for each positive edge, sample a batch of negative edges from all possible edges with the same source node - filter actual positive edges

Parameters:

Name Type Description Default
data TemporalData

an object containing positive edges information

required
split_mode str

specifies whether to generate negative edges for 'validation' or 'test' splits

required
filename str

name of the file containing the generated negative edges

required
Source code in tgb/linkproppred/negative_generator.py
def generate_negative_samples_rnd(self, 
                                  data: TemporalData, 
                                  split_mode: str, 
                                  filename: str,
                                  ) -> None:
    r"""
    Generate negative samples based on the `HIST-RND` strategy:
        - for each positive edge, sample a batch of negative edges from all possible edges with the same source node
        - filter actual positive edges

    Parameters:
        data: an object containing positive edges information
        split_mode: specifies whether to generate negative edges for 'validation' or 'test' splits
        filename: name of the file containing the generated negative edges
    """
    print(
        f"INFO: Negative Sampling Strategy: {self.strategy}, Data Split: {split_mode}"
    )
    assert split_mode in [
        "val",
        "test",
    ], "Invalid split-mode! It should be `val` or `test`!"

    if os.path.exists(filename):
        print(
            f"INFO: negative samples for '{split_mode}' evaluation are already generated!"
        )
    else:
        print(f"INFO: Generating negative samples for '{split_mode}' evaluation!")
        # retrieve the information from the batch
        pos_src, pos_dst, pos_timestamp = (
            data.src.cpu().numpy(),
            data.dst.cpu().numpy(),
            data.t.cpu().numpy(),
        )

        # all possible destinations
        all_dst = np.arange(self.first_dst_id, self.last_dst_id + 1)

        evaluation_set = {}
        # generate a list of negative destinations for each positive edge
        pos_edge_tqdm = tqdm(
            zip(pos_src, pos_dst, pos_timestamp), total=len(pos_src)
        )
        for (
            pos_s,
            pos_d,
            pos_t,
        ) in pos_edge_tqdm:
            t_mask = pos_timestamp == pos_t
            src_mask = pos_src == pos_s
            fn_mask = np.logical_and(t_mask, src_mask)
            pos_e_dst_same_src = pos_dst[fn_mask]
            filtered_all_dst = np.setdiff1d(all_dst, pos_e_dst_same_src)

            '''
            when num_neg_e is larger than all possible destinations simple return all possible destinations
            '''
            if (self.num_neg_e > len(filtered_all_dst)):
                neg_d_arr = filtered_all_dst
            else:
                neg_d_arr = np.random.choice(
                filtered_all_dst, self.num_neg_e, replace=False) #never replace negatives

            evaluation_set[(pos_s, pos_d, pos_t)] = neg_d_arr

        # save the generated evaluation set to disk
        save_pkl(evaluation_set, filename)

Sample and Generate negative edges that are going to be used for evaluation of a dynamic graph learning model Negative samples are generated and saved to files ONLY once; other times, they should be loaded from file with instances of the negative_sampler.py.

TKGNegativeEdgeGenerator

Bases: object

Source code in tgb/linkproppred/tkg_negative_generator.py
class TKGNegativeEdgeGenerator(object):
    def __init__(
        self,
        dataset_name: str,
        first_dst_id: int,
        last_dst_id: int,
        strategy: str = "time-filtered",
        num_neg_e: int = -1,  # -1 means generate all possible negatives
        rnd_seed: int = 1,
        partial_path: str = None,
        edge_data: TemporalData = None,
    ) -> None:
        r"""
        Negative Edge Generator class for Temporal Knowledge Graphs
        constructor for the negative edge generator class

        Parameters:
            dataset_name: name of the dataset
            first_dst_id: identity of the first destination node
            last_dst_id: indentity of the last destination node
            num_neg_e: number of negative edges being generated per each positive edge
            strategy: specifies which strategy should be used for generating the negatives
            rnd_seed: random seed for reproducibility
            edge_data: the positive edges to generate the negatives for, assuming sorted temporally

        Returns:
            None
        """
        self.rnd_seed = rnd_seed
        np.random.seed(self.rnd_seed)
        self.dataset_name = dataset_name
        self.first_dst_id = first_dst_id
        self.last_dst_id = last_dst_id      
        self.num_neg_e = num_neg_e  #-1 means generate all 
        assert strategy in [
            "time-filtered",
            "dst-time-filtered",
            "random"
        ], "The supported strategies are `time-filtered`, dst-time-filtered, random"
        self.strategy = strategy
        self.dst_dict = None
        if self.strategy == "dst-time-filtered":
            if partial_path is None:
                raise ValueError(
                    "The partial path to the directory where the dst_dict is stored is required")
            else:
                self.dst_dict_name = (
                    partial_path
                    + "/"
                    + self.dataset_name
                    + "_"
                    + "dst_dict"
                    + ".pkl"
                )
                self.dst_dict = self.generate_dst_dict(edge_data=edge_data, dst_name=self.dst_dict_name)
        self.edge_data = edge_data

    def generate_dst_dict(self, edge_data: TemporalData, dst_name: str) -> dict:
        r"""
        Generate a dictionary of destination nodes for each type of edge

        Parameters:
            edge_data: an object containing positive edges information
            dst_name: name of the file to save the generated dictionary of destination nodes

        Returns:
            dst_dict: a dictionary of destination nodes for each type of edge
        """

        min_dst_idx, max_dst_idx = int(edge_data.dst.min()), int(edge_data.dst.max())

        pos_src, pos_dst, pos_timestamp, edge_type = (
            edge_data.src.cpu().numpy(),
            edge_data.dst.cpu().numpy(),
            edge_data.t.cpu().numpy(),
            edge_data.edge_type.cpu().numpy(),
        )



        dst_track_dict = {} # {edge_type: {dst_1, dst_2, ..} }

        # generate a list of negative destinations for each positive edge
        pos_edge_tqdm = tqdm(
            zip(pos_src, pos_dst, pos_timestamp, edge_type), total=len(pos_src)
        )

        for (
            pos_s,
            pos_d,
            pos_t,
            edge_type,
            ) in pos_edge_tqdm:
            if edge_type not in dst_track_dict:
                dst_track_dict[edge_type] = {pos_d:1}
            else:
                dst_track_dict[edge_type][pos_d] = 1
        dst_dict = {}
        edge_type_size = []
        for key in dst_track_dict:
            dst = np.array(list(dst_track_dict[key].keys()))
            edge_type_size.append(len(dst))
            dst_dict[key] = dst
        print ('destination candidates generated for all edge types ', len(dst_dict))
        return dst_dict

    def generate_negative_samples(self, 
                                  pos_edges: TemporalData,
                                  split_mode: str, 
                                  partial_path: str,
                                  ) -> None:
        r"""
        Generate negative samples

        Parameters:
            pos_edges: positive edges to generate the negatives for
            split_mode: specifies whether to generate negative edges for 'validation' or 'test' splits
            partial_path: in which directory save the generated negatives
        """
        # file name for saving or loading...
        filename = (
            partial_path
            + "/"
            + self.dataset_name
            + "_"
            + split_mode
            + "_"
            + "ns"
            + ".pkl"
        )

        if self.strategy == "time-filtered":
            self.generate_negative_samples_ftr(pos_edges, split_mode, filename)
        elif self.strategy == "dst-time-filtered":
            self.generate_negative_samples_dst(pos_edges, split_mode, filename)
        elif self.strategy == "random":
            self.generate_negative_samples_random(pos_edges, split_mode, filename)
        else:
            raise ValueError("Unsupported negative sample generation strategy!")

    def generate_negative_samples_ftr(self, 
                                      data: TemporalData, 
                                      split_mode: str, 
                                      filename: str,
                                      ) -> None:
        r"""
        now we consider (s, d, t, edge_type) as a unique edge
        Generate negative samples based on the random strategy:
            - for each positive edge, sample a batch of negative edges from all possible edges with the same source node
            - filter actual positive edges at the same timestamp with the same edge type

        Parameters:
            data: an object containing positive edges information
            split_mode: specifies whether to generate negative edges for 'validation' or 'test' splits
            filename: name of the file containing the generated negative edges
        """
        print(
            f"INFO: Negative Sampling Strategy: {self.strategy}, Data Split: {split_mode}"
        )
        assert split_mode in [
            "val",
            "test",
        ], "Invalid split-mode! It should be `val` or `test`!"

        if os.path.exists(filename):
            print(
                f"INFO: negative samples for '{split_mode}' evaluation are already generated!"
            )
        else:
            print(f"INFO: Generating negative samples for '{split_mode}' evaluation!")
            # retrieve the information from the batch
            pos_src, pos_dst, pos_timestamp, edge_type = (
                data.src.cpu().numpy(),
                data.dst.cpu().numpy(),
                data.t.cpu().numpy(),
                data.edge_type.cpu().numpy(),
            )
            # generate a list of negative destinations for each positive edge
            pos_edge_tqdm = tqdm(
                zip(pos_src, pos_dst, pos_timestamp, edge_type), total=len(pos_src)
            )

            edge_t_dict = {} # {(t, u, edge_type): {v_1, v_2, ..} }
            #! iterate once to put all edges into a dictionary for reference
            for (
                pos_s,
                pos_d,
                pos_t,
                edge_type,
            ) in pos_edge_tqdm:
                if (pos_t, pos_s, edge_type) not in edge_t_dict:
                    edge_t_dict[(pos_t, pos_s, edge_type)] = {pos_d:1}
                else:
                    edge_t_dict[(pos_t, pos_s, edge_type)][pos_d] = 1

            conflict_dict = {}
            for key in edge_t_dict:
                conflict_dict[key] = np.array(list(edge_t_dict[key].keys()))

            print ("conflict sets for ns samples for ", len(conflict_dict), " positive edges are generated")
            # save the generated evaluation set to disk
            save_pkl(conflict_dict, filename)


    def generate_negative_samples_dst(self, 
                                      data: TemporalData, 
                                      split_mode: str, 
                                      filename: str,
                                      ) -> None:
        r"""
        now we consider (s, d, t, edge_type) as a unique edge
        Generate negative samples based on the random strategy:
            - for each positive edge, sample a batch of negative edges from all possible edges with the same source node
            - filter actual positive edges at the same timestamp with the same edge type

        Parameters:
            data: an object containing positive edges information
            split_mode: specifies whether to generate negative edges for 'validation' or 'test' splits
            filename: name of the file containing the generated negative edges
        """
        print(
            f"INFO: Negative Sampling Strategy: {self.strategy}, Data Split: {split_mode}"
        )
        assert split_mode in [
            "val",
            "test",
        ], "Invalid split-mode! It should be `val` or `test`!"

        if os.path.exists(filename):
            print(
                f"INFO: negative samples for '{split_mode}' evaluation are already generated!"
            )
        else:
            if self.dst_dict is None:
                raise ValueError("The dst_dict is not generated!")

            print(f"INFO: Generating negative samples for '{split_mode}' evaluation!")
            # retrieve the information from the batch
            pos_src, pos_dst, pos_timestamp, edge_type = (
                data.src.cpu().numpy(),
                data.dst.cpu().numpy(),
                data.t.cpu().numpy(),
                data.edge_type.cpu().numpy(),
            )
            # generate a list of negative destinations for each positive edge
            pos_edge_tqdm = tqdm(
                zip(pos_src, pos_dst, pos_timestamp, edge_type), total=len(pos_src)
            )

            edge_t_dict = {} # {(t, u, edge_type): {v_1, v_2, ..} }
            out_dict = {}
            #! iterate once to put all edges into a dictionary for reference
            for (
                pos_s,
                pos_d,
                pos_t,
                edge_type,
            ) in pos_edge_tqdm:
                if (pos_t, pos_s, edge_type) not in edge_t_dict:
                    edge_t_dict[(pos_t, pos_s, edge_type)] = {pos_d:1}
                else:
                    edge_t_dict[(pos_t, pos_s, edge_type)][pos_d] = 1


            pos_src, pos_dst, pos_timestamp, edge_type = (
                data.src.cpu().numpy(),
                data.dst.cpu().numpy(),
                data.t.cpu().numpy(),
                data.edge_type.cpu().numpy(),
            )

            new_pos_edge_tqdm = tqdm(
                zip(pos_src, pos_dst, pos_timestamp, edge_type), total=len(pos_src)
            )

            min_dst_idx, max_dst_idx = int(self.edge_data.dst.min()), int(self.edge_data.dst.max())


            for (
                pos_s,
                pos_d,
                pos_t,
                edge_type,
            ) in new_pos_edge_tqdm:
                #* generate based on # of ns samples
                conflict_set = np.array(list(edge_t_dict[(pos_t, pos_s, edge_type)].keys()))
                dst_set = self.dst_dict[edge_type]  #dst_set contains conflict set
                sample_num = self.num_neg_e
                filtered_dst_set = np.setdiff1d(dst_set, conflict_set) #more efficient
                dst_sampled = None
                all_dst = np.arange(min_dst_idx, max_dst_idx+1)
                if len(filtered_dst_set) < (sample_num):
                    #* with collision check
                    filtered_sample_set = np.setdiff1d(all_dst, filtered_dst_set)
                    dst_sampled = np.random.choice(filtered_sample_set, sample_num, replace=False)
                    # #* remove the conflict set from dst set
                    dst_sampled[0:len(filtered_dst_set)] = filtered_dst_set[:]
                else:
                    # dst_sampled = rng.choice(max_dst_idx+1, sample_num, replace=False)
                    dst_sampled = np.random.choice(filtered_dst_set, sample_num, replace=False)


                if (dst_sampled.shape[0] > sample_num):
                    print ("I am the bug that Julia worries about")
                    print ("dst_sampled shape is ", dst_sampled.shape)
                out_dict[(pos_t, pos_s, edge_type)] = dst_sampled

            print ("negative samples for ", len(out_dict), " positive edges are generated")
            # save the generated evaluation set to disk
            save_pkl(out_dict, filename)


    def generate_negative_samples_random(self, 
                                      data: TemporalData, 
                                      split_mode: str, 
                                      filename: str,
                                      ) -> None:
        r"""
        generate random negative edges for ablation study

        Parameters:
            data: an object containing positive edges information
            split_mode: specifies whether to generate negative edges for 'validation' or 'test' splits
            filename: name of the file containing the generated negative edges
        """
        print(
            f"INFO: Negative Sampling Strategy: {self.strategy}, Data Split: {split_mode}"
        )
        assert split_mode in [
            "val",
            "test",
        ], "Invalid split-mode! It should be `val` or `test`!"

        if os.path.exists(filename):
            print(
                f"INFO: negative samples for '{split_mode}' evaluation are already generated!"
            )
        else:
            print(f"INFO: Generating negative samples for '{split_mode}' evaluation!")
            # retrieve the information from the batch
            pos_src, pos_dst, pos_timestamp, edge_type = (
                data.src.cpu().numpy(),
                data.dst.cpu().numpy(),
                data.t.cpu().numpy(),
                data.edge_type.cpu().numpy(),
            )
            first_dst_id = self.edge_data.dst.min()
            last_dst_id = self.edge_data.dst.max()
            all_dst = np.arange(first_dst_id, last_dst_id + 1)
            evaluation_set = {}
            # generate a list of negative destinations for each positive edge
            pos_edge_tqdm = tqdm(
                zip(pos_src, pos_dst, pos_timestamp, edge_type), total=len(pos_src)
            )

            for (
                pos_s,
                pos_d,
                pos_t,
                edge_type,
            ) in pos_edge_tqdm:
                t_mask = pos_timestamp == pos_t
                src_mask = pos_src == pos_s
                fn_mask = np.logical_and(t_mask, src_mask)
                pos_e_dst_same_src = pos_dst[fn_mask]
                filtered_all_dst = np.setdiff1d(all_dst, pos_e_dst_same_src)
                if (self.num_neg_e > len(filtered_all_dst)):
                    neg_d_arr = filtered_all_dst
                else:
                    neg_d_arr = np.random.choice(
                    filtered_all_dst, self.num_neg_e, replace=False) #never replace negatives
                evaluation_set[(pos_t, pos_s, edge_type)] = neg_d_arr
            save_pkl(evaluation_set, filename)

__init__(dataset_name, first_dst_id, last_dst_id, strategy='time-filtered', num_neg_e=-1, rnd_seed=1, partial_path=None, edge_data=None)

Negative Edge Generator class for Temporal Knowledge Graphs constructor for the negative edge generator class

Parameters:

Name Type Description Default
dataset_name str

name of the dataset

required
first_dst_id int

identity of the first destination node

required
last_dst_id int

indentity of the last destination node

required
num_neg_e int

number of negative edges being generated per each positive edge

-1
strategy str

specifies which strategy should be used for generating the negatives

'time-filtered'
rnd_seed int

random seed for reproducibility

1
edge_data TemporalData

the positive edges to generate the negatives for, assuming sorted temporally

None

Returns:

Type Description
None

None

Source code in tgb/linkproppred/tkg_negative_generator.py
def __init__(
    self,
    dataset_name: str,
    first_dst_id: int,
    last_dst_id: int,
    strategy: str = "time-filtered",
    num_neg_e: int = -1,  # -1 means generate all possible negatives
    rnd_seed: int = 1,
    partial_path: str = None,
    edge_data: TemporalData = None,
) -> None:
    r"""
    Negative Edge Generator class for Temporal Knowledge Graphs
    constructor for the negative edge generator class

    Parameters:
        dataset_name: name of the dataset
        first_dst_id: identity of the first destination node
        last_dst_id: indentity of the last destination node
        num_neg_e: number of negative edges being generated per each positive edge
        strategy: specifies which strategy should be used for generating the negatives
        rnd_seed: random seed for reproducibility
        edge_data: the positive edges to generate the negatives for, assuming sorted temporally

    Returns:
        None
    """
    self.rnd_seed = rnd_seed
    np.random.seed(self.rnd_seed)
    self.dataset_name = dataset_name
    self.first_dst_id = first_dst_id
    self.last_dst_id = last_dst_id      
    self.num_neg_e = num_neg_e  #-1 means generate all 
    assert strategy in [
        "time-filtered",
        "dst-time-filtered",
        "random"
    ], "The supported strategies are `time-filtered`, dst-time-filtered, random"
    self.strategy = strategy
    self.dst_dict = None
    if self.strategy == "dst-time-filtered":
        if partial_path is None:
            raise ValueError(
                "The partial path to the directory where the dst_dict is stored is required")
        else:
            self.dst_dict_name = (
                partial_path
                + "/"
                + self.dataset_name
                + "_"
                + "dst_dict"
                + ".pkl"
            )
            self.dst_dict = self.generate_dst_dict(edge_data=edge_data, dst_name=self.dst_dict_name)
    self.edge_data = edge_data

generate_dst_dict(edge_data, dst_name)

Generate a dictionary of destination nodes for each type of edge

Parameters:

Name Type Description Default
edge_data TemporalData

an object containing positive edges information

required
dst_name str

name of the file to save the generated dictionary of destination nodes

required

Returns:

Name Type Description
dst_dict dict

a dictionary of destination nodes for each type of edge

Source code in tgb/linkproppred/tkg_negative_generator.py
def generate_dst_dict(self, edge_data: TemporalData, dst_name: str) -> dict:
    r"""
    Generate a dictionary of destination nodes for each type of edge

    Parameters:
        edge_data: an object containing positive edges information
        dst_name: name of the file to save the generated dictionary of destination nodes

    Returns:
        dst_dict: a dictionary of destination nodes for each type of edge
    """

    min_dst_idx, max_dst_idx = int(edge_data.dst.min()), int(edge_data.dst.max())

    pos_src, pos_dst, pos_timestamp, edge_type = (
        edge_data.src.cpu().numpy(),
        edge_data.dst.cpu().numpy(),
        edge_data.t.cpu().numpy(),
        edge_data.edge_type.cpu().numpy(),
    )



    dst_track_dict = {} # {edge_type: {dst_1, dst_2, ..} }

    # generate a list of negative destinations for each positive edge
    pos_edge_tqdm = tqdm(
        zip(pos_src, pos_dst, pos_timestamp, edge_type), total=len(pos_src)
    )

    for (
        pos_s,
        pos_d,
        pos_t,
        edge_type,
        ) in pos_edge_tqdm:
        if edge_type not in dst_track_dict:
            dst_track_dict[edge_type] = {pos_d:1}
        else:
            dst_track_dict[edge_type][pos_d] = 1
    dst_dict = {}
    edge_type_size = []
    for key in dst_track_dict:
        dst = np.array(list(dst_track_dict[key].keys()))
        edge_type_size.append(len(dst))
        dst_dict[key] = dst
    print ('destination candidates generated for all edge types ', len(dst_dict))
    return dst_dict

generate_negative_samples(pos_edges, split_mode, partial_path)

Generate negative samples

Parameters:

Name Type Description Default
pos_edges TemporalData

positive edges to generate the negatives for

required
split_mode str

specifies whether to generate negative edges for 'validation' or 'test' splits

required
partial_path str

in which directory save the generated negatives

required
Source code in tgb/linkproppred/tkg_negative_generator.py
def generate_negative_samples(self, 
                              pos_edges: TemporalData,
                              split_mode: str, 
                              partial_path: str,
                              ) -> None:
    r"""
    Generate negative samples

    Parameters:
        pos_edges: positive edges to generate the negatives for
        split_mode: specifies whether to generate negative edges for 'validation' or 'test' splits
        partial_path: in which directory save the generated negatives
    """
    # file name for saving or loading...
    filename = (
        partial_path
        + "/"
        + self.dataset_name
        + "_"
        + split_mode
        + "_"
        + "ns"
        + ".pkl"
    )

    if self.strategy == "time-filtered":
        self.generate_negative_samples_ftr(pos_edges, split_mode, filename)
    elif self.strategy == "dst-time-filtered":
        self.generate_negative_samples_dst(pos_edges, split_mode, filename)
    elif self.strategy == "random":
        self.generate_negative_samples_random(pos_edges, split_mode, filename)
    else:
        raise ValueError("Unsupported negative sample generation strategy!")

generate_negative_samples_dst(data, split_mode, filename)

now we consider (s, d, t, edge_type) as a unique edge Generate negative samples based on the random strategy: - for each positive edge, sample a batch of negative edges from all possible edges with the same source node - filter actual positive edges at the same timestamp with the same edge type

Parameters:

Name Type Description Default
data TemporalData

an object containing positive edges information

required
split_mode str

specifies whether to generate negative edges for 'validation' or 'test' splits

required
filename str

name of the file containing the generated negative edges

required
Source code in tgb/linkproppred/tkg_negative_generator.py
def generate_negative_samples_dst(self, 
                                  data: TemporalData, 
                                  split_mode: str, 
                                  filename: str,
                                  ) -> None:
    r"""
    now we consider (s, d, t, edge_type) as a unique edge
    Generate negative samples based on the random strategy:
        - for each positive edge, sample a batch of negative edges from all possible edges with the same source node
        - filter actual positive edges at the same timestamp with the same edge type

    Parameters:
        data: an object containing positive edges information
        split_mode: specifies whether to generate negative edges for 'validation' or 'test' splits
        filename: name of the file containing the generated negative edges
    """
    print(
        f"INFO: Negative Sampling Strategy: {self.strategy}, Data Split: {split_mode}"
    )
    assert split_mode in [
        "val",
        "test",
    ], "Invalid split-mode! It should be `val` or `test`!"

    if os.path.exists(filename):
        print(
            f"INFO: negative samples for '{split_mode}' evaluation are already generated!"
        )
    else:
        if self.dst_dict is None:
            raise ValueError("The dst_dict is not generated!")

        print(f"INFO: Generating negative samples for '{split_mode}' evaluation!")
        # retrieve the information from the batch
        pos_src, pos_dst, pos_timestamp, edge_type = (
            data.src.cpu().numpy(),
            data.dst.cpu().numpy(),
            data.t.cpu().numpy(),
            data.edge_type.cpu().numpy(),
        )
        # generate a list of negative destinations for each positive edge
        pos_edge_tqdm = tqdm(
            zip(pos_src, pos_dst, pos_timestamp, edge_type), total=len(pos_src)
        )

        edge_t_dict = {} # {(t, u, edge_type): {v_1, v_2, ..} }
        out_dict = {}
        #! iterate once to put all edges into a dictionary for reference
        for (
            pos_s,
            pos_d,
            pos_t,
            edge_type,
        ) in pos_edge_tqdm:
            if (pos_t, pos_s, edge_type) not in edge_t_dict:
                edge_t_dict[(pos_t, pos_s, edge_type)] = {pos_d:1}
            else:
                edge_t_dict[(pos_t, pos_s, edge_type)][pos_d] = 1


        pos_src, pos_dst, pos_timestamp, edge_type = (
            data.src.cpu().numpy(),
            data.dst.cpu().numpy(),
            data.t.cpu().numpy(),
            data.edge_type.cpu().numpy(),
        )

        new_pos_edge_tqdm = tqdm(
            zip(pos_src, pos_dst, pos_timestamp, edge_type), total=len(pos_src)
        )

        min_dst_idx, max_dst_idx = int(self.edge_data.dst.min()), int(self.edge_data.dst.max())


        for (
            pos_s,
            pos_d,
            pos_t,
            edge_type,
        ) in new_pos_edge_tqdm:
            #* generate based on # of ns samples
            conflict_set = np.array(list(edge_t_dict[(pos_t, pos_s, edge_type)].keys()))
            dst_set = self.dst_dict[edge_type]  #dst_set contains conflict set
            sample_num = self.num_neg_e
            filtered_dst_set = np.setdiff1d(dst_set, conflict_set) #more efficient
            dst_sampled = None
            all_dst = np.arange(min_dst_idx, max_dst_idx+1)
            if len(filtered_dst_set) < (sample_num):
                #* with collision check
                filtered_sample_set = np.setdiff1d(all_dst, filtered_dst_set)
                dst_sampled = np.random.choice(filtered_sample_set, sample_num, replace=False)
                # #* remove the conflict set from dst set
                dst_sampled[0:len(filtered_dst_set)] = filtered_dst_set[:]
            else:
                # dst_sampled = rng.choice(max_dst_idx+1, sample_num, replace=False)
                dst_sampled = np.random.choice(filtered_dst_set, sample_num, replace=False)


            if (dst_sampled.shape[0] > sample_num):
                print ("I am the bug that Julia worries about")
                print ("dst_sampled shape is ", dst_sampled.shape)
            out_dict[(pos_t, pos_s, edge_type)] = dst_sampled

        print ("negative samples for ", len(out_dict), " positive edges are generated")
        # save the generated evaluation set to disk
        save_pkl(out_dict, filename)

generate_negative_samples_ftr(data, split_mode, filename)

now we consider (s, d, t, edge_type) as a unique edge Generate negative samples based on the random strategy: - for each positive edge, sample a batch of negative edges from all possible edges with the same source node - filter actual positive edges at the same timestamp with the same edge type

Parameters:

Name Type Description Default
data TemporalData

an object containing positive edges information

required
split_mode str

specifies whether to generate negative edges for 'validation' or 'test' splits

required
filename str

name of the file containing the generated negative edges

required
Source code in tgb/linkproppred/tkg_negative_generator.py
def generate_negative_samples_ftr(self, 
                                  data: TemporalData, 
                                  split_mode: str, 
                                  filename: str,
                                  ) -> None:
    r"""
    now we consider (s, d, t, edge_type) as a unique edge
    Generate negative samples based on the random strategy:
        - for each positive edge, sample a batch of negative edges from all possible edges with the same source node
        - filter actual positive edges at the same timestamp with the same edge type

    Parameters:
        data: an object containing positive edges information
        split_mode: specifies whether to generate negative edges for 'validation' or 'test' splits
        filename: name of the file containing the generated negative edges
    """
    print(
        f"INFO: Negative Sampling Strategy: {self.strategy}, Data Split: {split_mode}"
    )
    assert split_mode in [
        "val",
        "test",
    ], "Invalid split-mode! It should be `val` or `test`!"

    if os.path.exists(filename):
        print(
            f"INFO: negative samples for '{split_mode}' evaluation are already generated!"
        )
    else:
        print(f"INFO: Generating negative samples for '{split_mode}' evaluation!")
        # retrieve the information from the batch
        pos_src, pos_dst, pos_timestamp, edge_type = (
            data.src.cpu().numpy(),
            data.dst.cpu().numpy(),
            data.t.cpu().numpy(),
            data.edge_type.cpu().numpy(),
        )
        # generate a list of negative destinations for each positive edge
        pos_edge_tqdm = tqdm(
            zip(pos_src, pos_dst, pos_timestamp, edge_type), total=len(pos_src)
        )

        edge_t_dict = {} # {(t, u, edge_type): {v_1, v_2, ..} }
        #! iterate once to put all edges into a dictionary for reference
        for (
            pos_s,
            pos_d,
            pos_t,
            edge_type,
        ) in pos_edge_tqdm:
            if (pos_t, pos_s, edge_type) not in edge_t_dict:
                edge_t_dict[(pos_t, pos_s, edge_type)] = {pos_d:1}
            else:
                edge_t_dict[(pos_t, pos_s, edge_type)][pos_d] = 1

        conflict_dict = {}
        for key in edge_t_dict:
            conflict_dict[key] = np.array(list(edge_t_dict[key].keys()))

        print ("conflict sets for ns samples for ", len(conflict_dict), " positive edges are generated")
        # save the generated evaluation set to disk
        save_pkl(conflict_dict, filename)

generate_negative_samples_random(data, split_mode, filename)

generate random negative edges for ablation study

Parameters:

Name Type Description Default
data TemporalData

an object containing positive edges information

required
split_mode str

specifies whether to generate negative edges for 'validation' or 'test' splits

required
filename str

name of the file containing the generated negative edges

required
Source code in tgb/linkproppred/tkg_negative_generator.py
def generate_negative_samples_random(self, 
                                  data: TemporalData, 
                                  split_mode: str, 
                                  filename: str,
                                  ) -> None:
    r"""
    generate random negative edges for ablation study

    Parameters:
        data: an object containing positive edges information
        split_mode: specifies whether to generate negative edges for 'validation' or 'test' splits
        filename: name of the file containing the generated negative edges
    """
    print(
        f"INFO: Negative Sampling Strategy: {self.strategy}, Data Split: {split_mode}"
    )
    assert split_mode in [
        "val",
        "test",
    ], "Invalid split-mode! It should be `val` or `test`!"

    if os.path.exists(filename):
        print(
            f"INFO: negative samples for '{split_mode}' evaluation are already generated!"
        )
    else:
        print(f"INFO: Generating negative samples for '{split_mode}' evaluation!")
        # retrieve the information from the batch
        pos_src, pos_dst, pos_timestamp, edge_type = (
            data.src.cpu().numpy(),
            data.dst.cpu().numpy(),
            data.t.cpu().numpy(),
            data.edge_type.cpu().numpy(),
        )
        first_dst_id = self.edge_data.dst.min()
        last_dst_id = self.edge_data.dst.max()
        all_dst = np.arange(first_dst_id, last_dst_id + 1)
        evaluation_set = {}
        # generate a list of negative destinations for each positive edge
        pos_edge_tqdm = tqdm(
            zip(pos_src, pos_dst, pos_timestamp, edge_type), total=len(pos_src)
        )

        for (
            pos_s,
            pos_d,
            pos_t,
            edge_type,
        ) in pos_edge_tqdm:
            t_mask = pos_timestamp == pos_t
            src_mask = pos_src == pos_s
            fn_mask = np.logical_and(t_mask, src_mask)
            pos_e_dst_same_src = pos_dst[fn_mask]
            filtered_all_dst = np.setdiff1d(all_dst, pos_e_dst_same_src)
            if (self.num_neg_e > len(filtered_all_dst)):
                neg_d_arr = filtered_all_dst
            else:
                neg_d_arr = np.random.choice(
                filtered_all_dst, self.num_neg_e, replace=False) #never replace negatives
            evaluation_set[(pos_t, pos_s, edge_type)] = neg_d_arr
        save_pkl(evaluation_set, filename)

Sample negative edges for evaluation of dynamic link prediction Load already generated negative edges from file, batch them based on the positive edge, and return the evaluation set

TKGNegativeEdgeSampler

Bases: object

Source code in tgb/linkproppred/tkg_negative_sampler.py
class TKGNegativeEdgeSampler(object):
    def __init__(
        self,
        dataset_name: str,
        first_dst_id: int,
        last_dst_id: int,
        strategy: str = "time-filtered",
        partial_path: str = PROJ_DIR + "/data/processed",
    ) -> None:
        r"""
        Negative Edge Sampler
            Loads and query the negative batches based on the positive batches provided.
        constructor for the negative edge sampler class

        Parameters:
            dataset_name: name of the dataset
            first_dst_id: identity of the first destination node
            last_dst_id: indentity of the last destination node
            strategy: will always load the pre-generated negatives
            partial_path: the path to the directory where the negative edges are stored

        Returns:
            None
        """
        self.dataset_name = dataset_name
        self.eval_set = {}
        self.first_dst_id = first_dst_id
        self.last_dst_id = last_dst_id
        self.strategy = strategy
        self.dst_dict = None

    def load_eval_set(
        self,
        fname: str,
        split_mode: str = "val",
    ) -> None:
        r"""
        Load the evaluation set from disk, can be either val or test set ns samples
        Parameters:
            fname: the file name of the evaluation ns on disk
            split_mode: the split mode of the evaluation set, can be either `val` or `test`

        Returns:
            None
        """
        assert split_mode in [
            "val",
            "test",
        ], "Invalid split-mode! It should be `val`, `test`"
        if not os.path.exists(fname):
            raise FileNotFoundError(f"File not found at {fname}")
        self.eval_set[split_mode] = load_pkl(fname)

    def query_batch(self, 
                    pos_src: Union[Tensor, np.ndarray], 
                    pos_dst: Union[Tensor, np.ndarray], 
                    pos_timestamp: Union[Tensor, np.ndarray], 
                    edge_type: Union[Tensor, np.ndarray],
                    split_mode: str = "test") -> list:
        r"""
        For each positive edge in the `pos_batch`, return a list of negative edges
        `split_mode` specifies whether the valiation or test evaluation set should be retrieved.
        modify now to include edge type argument

        Parameters:
            pos_src: list of positive source nodes
            pos_dst: list of positive destination nodes
            pos_timestamp: list of timestamps of the positive edges
            split_mode: specifies whether to generate negative edges for 'validation' or 'test' splits

        Returns:
            neg_samples: list of numpy array; each array contains the set of negative edges that
                        should be evaluated against each positive edge.
        """
        assert split_mode in [
            "val",
            "test",
        ], "Invalid split-mode! It should be `val`, `test`!"
        if self.eval_set[split_mode] == None:
            raise ValueError(
                f"Evaluation set is None! You should load the {split_mode} evaluation set first!"
            )

        # check the argument types...
        if torch is not None and isinstance(pos_src, torch.Tensor):
            pos_src = pos_src.detach().cpu().numpy()
        if torch is not None and isinstance(pos_dst, torch.Tensor):
            pos_dst = pos_dst.detach().cpu().numpy()
        if torch is not None and isinstance(pos_timestamp, torch.Tensor):
            pos_timestamp = pos_timestamp.detach().cpu().numpy()
        if torch is not None and isinstance(edge_type, torch.Tensor):
            edge_type = edge_type.detach().cpu().numpy()

        if not isinstance(pos_src, np.ndarray) or not isinstance(pos_dst, np.ndarray) or not(pos_timestamp, np.ndarray) or not(edge_type, np.ndarray):
            raise RuntimeError(
                "pos_src, pos_dst, and pos_timestamp need to be either numpy ndarray or torch tensor!"
                )

        if self.strategy == "time-filtered":
            neg_samples = []
            for pos_s, pos_d, pos_t, e_type in zip(pos_src, pos_dst, pos_timestamp, edge_type):
                if (pos_t, pos_s, e_type) not in self.eval_set[split_mode]:
                    raise ValueError(
                        f"The edge ({pos_s}, {pos_d}, {pos_t}, {e_type}) is not in the '{split_mode}' evaluation set! Please check the implementation."
                    )
                else:
                    conflict_dict = self.eval_set[split_mode]
                    conflict_set = conflict_dict[(pos_t, pos_s, e_type)]
                    all_dst = np.arange(self.first_dst_id, self.last_dst_id + 1)
                    filtered_all_dst = np.delete(all_dst, conflict_set, axis=0)

                    #! always using all possible destinations for evaluation
                    neg_d_arr = filtered_all_dst

                    #! this is very slow
                    neg_samples.append(
                            neg_d_arr
                        )
        elif self.strategy == "dst-time-filtered":
            neg_samples = []
            for pos_s, pos_d, pos_t, e_type in zip(pos_src, pos_dst, pos_timestamp, edge_type):
                if (pos_t, pos_s, e_type) not in self.eval_set[split_mode]:
                    raise ValueError(
                        f"The edge ({pos_s}, {pos_d}, {pos_t}, {e_type}) is not in the '{split_mode}' evaluation set! Please check the implementation."
                    )
                else:
                    filtered_dst = self.eval_set[split_mode]
                    neg_d_arr = filtered_dst[(pos_t, pos_s, e_type)]
                    neg_samples.append(
                            neg_d_arr
                        )
        #? can't convert to numpy array due to different lengths of negative samples
        return neg_samples

__init__(dataset_name, first_dst_id, last_dst_id, strategy='time-filtered', partial_path=PROJ_DIR + '/data/processed')

Negative Edge Sampler Loads and query the negative batches based on the positive batches provided. constructor for the negative edge sampler class

Parameters:

Name Type Description Default
dataset_name str

name of the dataset

required
first_dst_id int

identity of the first destination node

required
last_dst_id int

indentity of the last destination node

required
strategy str

will always load the pre-generated negatives

'time-filtered'
partial_path str

the path to the directory where the negative edges are stored

PROJ_DIR + '/data/processed'

Returns:

Type Description
None

None

Source code in tgb/linkproppred/tkg_negative_sampler.py
def __init__(
    self,
    dataset_name: str,
    first_dst_id: int,
    last_dst_id: int,
    strategy: str = "time-filtered",
    partial_path: str = PROJ_DIR + "/data/processed",
) -> None:
    r"""
    Negative Edge Sampler
        Loads and query the negative batches based on the positive batches provided.
    constructor for the negative edge sampler class

    Parameters:
        dataset_name: name of the dataset
        first_dst_id: identity of the first destination node
        last_dst_id: indentity of the last destination node
        strategy: will always load the pre-generated negatives
        partial_path: the path to the directory where the negative edges are stored

    Returns:
        None
    """
    self.dataset_name = dataset_name
    self.eval_set = {}
    self.first_dst_id = first_dst_id
    self.last_dst_id = last_dst_id
    self.strategy = strategy
    self.dst_dict = None

load_eval_set(fname, split_mode='val')

Load the evaluation set from disk, can be either val or test set ns samples Parameters: fname: the file name of the evaluation ns on disk split_mode: the split mode of the evaluation set, can be either val or test

Returns:

Type Description
None

None

Source code in tgb/linkproppred/tkg_negative_sampler.py
def load_eval_set(
    self,
    fname: str,
    split_mode: str = "val",
) -> None:
    r"""
    Load the evaluation set from disk, can be either val or test set ns samples
    Parameters:
        fname: the file name of the evaluation ns on disk
        split_mode: the split mode of the evaluation set, can be either `val` or `test`

    Returns:
        None
    """
    assert split_mode in [
        "val",
        "test",
    ], "Invalid split-mode! It should be `val`, `test`"
    if not os.path.exists(fname):
        raise FileNotFoundError(f"File not found at {fname}")
    self.eval_set[split_mode] = load_pkl(fname)

query_batch(pos_src, pos_dst, pos_timestamp, edge_type, split_mode='test')

For each positive edge in the pos_batch, return a list of negative edges split_mode specifies whether the valiation or test evaluation set should be retrieved. modify now to include edge type argument

Parameters:

Name Type Description Default
pos_src Union[Tensor, ndarray]

list of positive source nodes

required
pos_dst Union[Tensor, ndarray]

list of positive destination nodes

required
pos_timestamp Union[Tensor, ndarray]

list of timestamps of the positive edges

required
split_mode str

specifies whether to generate negative edges for 'validation' or 'test' splits

'test'

Returns:

Name Type Description
neg_samples list

list of numpy array; each array contains the set of negative edges that should be evaluated against each positive edge.

Source code in tgb/linkproppred/tkg_negative_sampler.py
def query_batch(self, 
                pos_src: Union[Tensor, np.ndarray], 
                pos_dst: Union[Tensor, np.ndarray], 
                pos_timestamp: Union[Tensor, np.ndarray], 
                edge_type: Union[Tensor, np.ndarray],
                split_mode: str = "test") -> list:
    r"""
    For each positive edge in the `pos_batch`, return a list of negative edges
    `split_mode` specifies whether the valiation or test evaluation set should be retrieved.
    modify now to include edge type argument

    Parameters:
        pos_src: list of positive source nodes
        pos_dst: list of positive destination nodes
        pos_timestamp: list of timestamps of the positive edges
        split_mode: specifies whether to generate negative edges for 'validation' or 'test' splits

    Returns:
        neg_samples: list of numpy array; each array contains the set of negative edges that
                    should be evaluated against each positive edge.
    """
    assert split_mode in [
        "val",
        "test",
    ], "Invalid split-mode! It should be `val`, `test`!"
    if self.eval_set[split_mode] == None:
        raise ValueError(
            f"Evaluation set is None! You should load the {split_mode} evaluation set first!"
        )

    # check the argument types...
    if torch is not None and isinstance(pos_src, torch.Tensor):
        pos_src = pos_src.detach().cpu().numpy()
    if torch is not None and isinstance(pos_dst, torch.Tensor):
        pos_dst = pos_dst.detach().cpu().numpy()
    if torch is not None and isinstance(pos_timestamp, torch.Tensor):
        pos_timestamp = pos_timestamp.detach().cpu().numpy()
    if torch is not None and isinstance(edge_type, torch.Tensor):
        edge_type = edge_type.detach().cpu().numpy()

    if not isinstance(pos_src, np.ndarray) or not isinstance(pos_dst, np.ndarray) or not(pos_timestamp, np.ndarray) or not(edge_type, np.ndarray):
        raise RuntimeError(
            "pos_src, pos_dst, and pos_timestamp need to be either numpy ndarray or torch tensor!"
            )

    if self.strategy == "time-filtered":
        neg_samples = []
        for pos_s, pos_d, pos_t, e_type in zip(pos_src, pos_dst, pos_timestamp, edge_type):
            if (pos_t, pos_s, e_type) not in self.eval_set[split_mode]:
                raise ValueError(
                    f"The edge ({pos_s}, {pos_d}, {pos_t}, {e_type}) is not in the '{split_mode}' evaluation set! Please check the implementation."
                )
            else:
                conflict_dict = self.eval_set[split_mode]
                conflict_set = conflict_dict[(pos_t, pos_s, e_type)]
                all_dst = np.arange(self.first_dst_id, self.last_dst_id + 1)
                filtered_all_dst = np.delete(all_dst, conflict_set, axis=0)

                #! always using all possible destinations for evaluation
                neg_d_arr = filtered_all_dst

                #! this is very slow
                neg_samples.append(
                        neg_d_arr
                    )
    elif self.strategy == "dst-time-filtered":
        neg_samples = []
        for pos_s, pos_d, pos_t, e_type in zip(pos_src, pos_dst, pos_timestamp, edge_type):
            if (pos_t, pos_s, e_type) not in self.eval_set[split_mode]:
                raise ValueError(
                    f"The edge ({pos_s}, {pos_d}, {pos_t}, {e_type}) is not in the '{split_mode}' evaluation set! Please check the implementation."
                )
            else:
                filtered_dst = self.eval_set[split_mode]
                neg_d_arr = filtered_dst[(pos_t, pos_s, e_type)]
                neg_samples.append(
                        neg_d_arr
                    )
    #? can't convert to numpy array due to different lengths of negative samples
    return neg_samples

Sample and Generate negative edges that are going to be used for evaluation of a dynamic graph learning model Negative samples are generated and saved to files ONLY once; other times, they should be loaded from file with instances of the negative_sampler.py.

THGNegativeEdgeGenerator

Bases: object

Source code in tgb/linkproppred/thg_negative_generator.py
class THGNegativeEdgeGenerator(object):
    def __init__(
        self,
        dataset_name: str,
        first_node_id: int,
        last_node_id: int,
        node_type: Union[np.ndarray, torch.Tensor],
        strategy: str = "node-type-filtered",
        num_neg_e: int = -1,  # -1 means generate all possible negatives
        rnd_seed: int = 1,
        edge_data: TemporalData = None,
    ) -> None:
        r"""
        Negative Edge Generator class for Temporal Heterogeneous Graphs
        this is a class for generating negative samples for a specific datasets
        the set of the positive samples are provided, the negative samples are generated with specific strategies 
        and are saved for consistent evaluation across different methods

        Parameters:
            dataset_name: name of the dataset
            first_node_id: the first node id
            last_node_id: the last node id
            node_type: the node type of each node
            strategy: the strategy to generate negative samples
            num_neg_e: number of negative samples to generate
            rnd_seed: random seed
            edge_data: the edge data object containing the positive edges
        Returns:
            None
        """
        self.rnd_seed = rnd_seed
        np.random.seed(self.rnd_seed)
        self.dataset_name = dataset_name
        self.first_node_id = first_node_id
        self.last_node_id = last_node_id
        if isinstance(node_type, torch.Tensor):
            node_type = node_type.cpu().numpy()
        self.node_type = node_type
        self.node_type_dict = self.get_destinations_based_on_node_type(first_node_id, last_node_id, self.node_type) # {node_type: {nid:1}}
        assert isinstance(self.node_type, np.ndarray), "node_type should be a numpy array"
        self.num_neg_e = num_neg_e  #-1 means generate all 

        assert strategy in [
            "node-type-filtered",
            "random",
        ], "The supported strategies are `node-type-filtered`"
        self.strategy = strategy
        self.edge_data = edge_data

    def get_destinations_based_on_node_type(self, 
                                            first_node_id: int,
                                            last_node_id: int,
                                            node_type: np.ndarray) -> dict:
        r"""
        get the destination node id arrays based on the node type
        Parameters:
            first_node_id: the first node id
            last_node_id: the last node id
            node_type: the node type of each node

        Returns:
            node_type_dict: a dictionary containing the destination node ids for each node type
        """
        node_type_store = {}
        assert first_node_id <= last_node_id, "Invalid destination node ids!"
        assert len(node_type) == (last_node_id - first_node_id + 1), "node type array must match the indices"
        for k in range(len(node_type)):
            nt = int(node_type[k]) #node type must be ints
            nid = k + first_node_id
            if nt not in node_type_store:
                node_type_store[nt] = {nid:1}
            else:
                node_type_store[nt][nid] = 1
        node_type_dict = {}
        for ntype in node_type_store:
            node_type_dict[ntype] = np.array(list(node_type_store[ntype].keys()))
            assert np.all(np.diff(node_type_dict[ntype]) >= 0), "Destination node ids for a given type must be sorted"
            assert np.all(node_type_dict[ntype] <= last_node_id), "Destination node ids must be less than or equal to the last destination id"
        return node_type_dict

    def generate_negative_samples(self, 
                                  pos_edges: TemporalData,
                                  split_mode: str, 
                                  partial_path: str,
                                  ) -> None:
        r"""
        Generate negative samples

        Parameters:
            pos_edges: positive edges to generate the negatives for
            split_mode: specifies whether to generate negative edges for 'validation' or 'test' splits
            partial_path: in which directory save the generated negatives
        """
        # file name for saving or loading...
        filename = (
            partial_path
            + "/"
            + self.dataset_name
            + "_"
            + split_mode
            + "_"
            + "ns"
            + ".pkl"
        )

        if self.strategy == "node-type-filtered":
            self.generate_negative_samples_nt(pos_edges, split_mode, filename)
        elif self.strategy == "random":
            self.generate_negative_samples_random(pos_edges, split_mode, filename)
        else:
            raise ValueError("Unsupported negative sample generation strategy!")

    def generate_negative_samples_nt(self, 
                                      data: TemporalData, 
                                      split_mode: str, 
                                      filename: str,
                                      ) -> None:
        r"""
        now we consider (s, d, t, edge_type) as a unique edge, also adding the node type info for the destination node for convenience so (s, d, t, edge_type): (conflict_set, d_node_type)
        Generate negative samples based on the random strategy:
            - for each positive edge, retrieve all possible destinations based on the node type of the destination node
            - filter actual positive edges at the same timestamp with the same edge type

        Parameters:
            data: an object containing positive edges information
            split_mode: specifies whether to generate negative edges for 'validation' or 'test' splits
            filename: name of the file containing the generated negative edges
        """
        print(
            f"INFO: Negative Sampling Strategy: {self.strategy}, Data Split: {split_mode}"
        )
        assert split_mode in [
            "val",
            "test",
        ], "Invalid split-mode! It should be `val` or `test`!"

        if os.path.exists(filename):
            print(
                f"INFO: negative samples for '{split_mode}' evaluation are already generated!"
            )
        else:
            print(f"INFO: Generating negative samples for '{split_mode}' evaluation!")
            # retrieve the information from the batch
            pos_src, pos_dst, pos_timestamp, edge_type = (
                data.src.cpu().numpy(),
                data.dst.cpu().numpy(),
                data.t.cpu().numpy(),
                data.edge_type.cpu().numpy(),
            )

            # generate a list of negative destinations for each positive edge
            pos_edge_tqdm = tqdm(
                zip(pos_src, pos_dst, pos_timestamp, edge_type), total=len(pos_src)
            )

            edge_t_dict = {} # {(t, u, edge_type): {v_1, v_2, ..} }
            #! iterate once to put all edges into a dictionary for reference
            for (
                pos_s,
                pos_d,
                pos_t,
                edge_type,
            ) in pos_edge_tqdm:
                if (pos_t, pos_s, edge_type) not in edge_t_dict:
                    edge_t_dict[(pos_t, pos_s, edge_type)] = {pos_d:1}
                else:
                    edge_t_dict[(pos_t, pos_s, edge_type)][pos_d] = 1

            out_dict = {}
            for key in tqdm(edge_t_dict):
                conflict_set = np.array(list(edge_t_dict[key].keys()))
                pos_d = conflict_set[0]
                #* retieve the node type of the destination node as well 
                #! assumption, same edge type = same destination node type
                d_node_type = int(self.node_type[pos_d - self.first_node_id])
                all_dst = self.node_type_dict[d_node_type]
                if (self.num_neg_e == -1):
                    filtered_all_dst = np.setdiff1d(all_dst, conflict_set)
                else:
                    #* lazy sampling
                    neg_d_arr = np.random.choice(
                        all_dst, self.num_neg_e, replace=False) #never replace negatives
                    if len(np.setdiff1d(neg_d_arr, conflict_set)) < self.num_neg_e:
                        neg_d_arr = np.random.choice(
                            np.setdiff1d(all_dst, conflict_set), self.num_neg_e, replace=False)
                    filtered_all_dst = neg_d_arr
                out_dict[key] = filtered_all_dst
            print ("ns samples for ", len(out_dict), " positive edges are generated")
            # save the generated evaluation set to disk
            save_pkl(out_dict, filename)

    def generate_negative_samples_random(self, 
                                      data: TemporalData, 
                                      split_mode: str, 
                                      filename: str,
                                      ) -> None:
        r"""
        generate random negative edges for ablation study

        Parameters:
            data: an object containing positive edges information
            split_mode: specifies whether to generate negative edges for 'validation' or 'test' splits
            filename: name of the file containing the generated negative edges
        """
        print(
            f"INFO: Negative Sampling Strategy: {self.strategy}, Data Split: {split_mode}"
        )
        assert split_mode in [
            "val",
            "test",
        ], "Invalid split-mode! It should be `val` or `test`!"

        if os.path.exists(filename):
            print(
                f"INFO: negative samples for '{split_mode}' evaluation are already generated!"
            )
        else:
            print(f"INFO: Generating negative samples for '{split_mode}' evaluation!")
            # retrieve the information from the batch
            pos_src, pos_dst, pos_timestamp, edge_type = (
                data.src.cpu().numpy(),
                data.dst.cpu().numpy(),
                data.t.cpu().numpy(),
                data.edge_type.cpu().numpy(),
            )
            first_dst_id = self.edge_data.dst.min()
            last_dst_id = self.edge_data.dst.max()
            all_dst = np.arange(first_dst_id, last_dst_id + 1)
            evaluation_set = {}
            # generate a list of negative destinations for each positive edge
            pos_edge_tqdm = tqdm(
                zip(pos_src, pos_dst, pos_timestamp, edge_type), total=len(pos_src)
            )

            for (
                pos_s,
                pos_d,
                pos_t,
                edge_type,
            ) in pos_edge_tqdm:
                t_mask = pos_timestamp == pos_t
                src_mask = pos_src == pos_s
                fn_mask = np.logical_and(t_mask, src_mask)
                pos_e_dst_same_src = pos_dst[fn_mask]
                filtered_all_dst = np.setdiff1d(all_dst, pos_e_dst_same_src)
                if (self.num_neg_e > len(filtered_all_dst)):
                    neg_d_arr = filtered_all_dst
                else:
                    neg_d_arr = np.random.choice(
                    filtered_all_dst, self.num_neg_e, replace=False) #never replace negatives
                evaluation_set[(pos_t, pos_s, edge_type)] = neg_d_arr
            save_pkl(evaluation_set, filename)

__init__(dataset_name, first_node_id, last_node_id, node_type, strategy='node-type-filtered', num_neg_e=-1, rnd_seed=1, edge_data=None)

Negative Edge Generator class for Temporal Heterogeneous Graphs this is a class for generating negative samples for a specific datasets the set of the positive samples are provided, the negative samples are generated with specific strategies and are saved for consistent evaluation across different methods

Parameters:

Name Type Description Default
dataset_name str

name of the dataset

required
first_node_id int

the first node id

required
last_node_id int

the last node id

required
node_type Union[ndarray, Tensor]

the node type of each node

required
strategy str

the strategy to generate negative samples

'node-type-filtered'
num_neg_e int

number of negative samples to generate

-1
rnd_seed int

random seed

1
edge_data TemporalData

the edge data object containing the positive edges

None

Returns: None

Source code in tgb/linkproppred/thg_negative_generator.py
def __init__(
    self,
    dataset_name: str,
    first_node_id: int,
    last_node_id: int,
    node_type: Union[np.ndarray, torch.Tensor],
    strategy: str = "node-type-filtered",
    num_neg_e: int = -1,  # -1 means generate all possible negatives
    rnd_seed: int = 1,
    edge_data: TemporalData = None,
) -> None:
    r"""
    Negative Edge Generator class for Temporal Heterogeneous Graphs
    this is a class for generating negative samples for a specific datasets
    the set of the positive samples are provided, the negative samples are generated with specific strategies 
    and are saved for consistent evaluation across different methods

    Parameters:
        dataset_name: name of the dataset
        first_node_id: the first node id
        last_node_id: the last node id
        node_type: the node type of each node
        strategy: the strategy to generate negative samples
        num_neg_e: number of negative samples to generate
        rnd_seed: random seed
        edge_data: the edge data object containing the positive edges
    Returns:
        None
    """
    self.rnd_seed = rnd_seed
    np.random.seed(self.rnd_seed)
    self.dataset_name = dataset_name
    self.first_node_id = first_node_id
    self.last_node_id = last_node_id
    if isinstance(node_type, torch.Tensor):
        node_type = node_type.cpu().numpy()
    self.node_type = node_type
    self.node_type_dict = self.get_destinations_based_on_node_type(first_node_id, last_node_id, self.node_type) # {node_type: {nid:1}}
    assert isinstance(self.node_type, np.ndarray), "node_type should be a numpy array"
    self.num_neg_e = num_neg_e  #-1 means generate all 

    assert strategy in [
        "node-type-filtered",
        "random",
    ], "The supported strategies are `node-type-filtered`"
    self.strategy = strategy
    self.edge_data = edge_data

generate_negative_samples(pos_edges, split_mode, partial_path)

Generate negative samples

Parameters:

Name Type Description Default
pos_edges TemporalData

positive edges to generate the negatives for

required
split_mode str

specifies whether to generate negative edges for 'validation' or 'test' splits

required
partial_path str

in which directory save the generated negatives

required
Source code in tgb/linkproppred/thg_negative_generator.py
def generate_negative_samples(self, 
                              pos_edges: TemporalData,
                              split_mode: str, 
                              partial_path: str,
                              ) -> None:
    r"""
    Generate negative samples

    Parameters:
        pos_edges: positive edges to generate the negatives for
        split_mode: specifies whether to generate negative edges for 'validation' or 'test' splits
        partial_path: in which directory save the generated negatives
    """
    # file name for saving or loading...
    filename = (
        partial_path
        + "/"
        + self.dataset_name
        + "_"
        + split_mode
        + "_"
        + "ns"
        + ".pkl"
    )

    if self.strategy == "node-type-filtered":
        self.generate_negative_samples_nt(pos_edges, split_mode, filename)
    elif self.strategy == "random":
        self.generate_negative_samples_random(pos_edges, split_mode, filename)
    else:
        raise ValueError("Unsupported negative sample generation strategy!")

generate_negative_samples_nt(data, split_mode, filename)

now we consider (s, d, t, edge_type) as a unique edge, also adding the node type info for the destination node for convenience so (s, d, t, edge_type): (conflict_set, d_node_type) Generate negative samples based on the random strategy: - for each positive edge, retrieve all possible destinations based on the node type of the destination node - filter actual positive edges at the same timestamp with the same edge type

Parameters:

Name Type Description Default
data TemporalData

an object containing positive edges information

required
split_mode str

specifies whether to generate negative edges for 'validation' or 'test' splits

required
filename str

name of the file containing the generated negative edges

required
Source code in tgb/linkproppred/thg_negative_generator.py
def generate_negative_samples_nt(self, 
                                  data: TemporalData, 
                                  split_mode: str, 
                                  filename: str,
                                  ) -> None:
    r"""
    now we consider (s, d, t, edge_type) as a unique edge, also adding the node type info for the destination node for convenience so (s, d, t, edge_type): (conflict_set, d_node_type)
    Generate negative samples based on the random strategy:
        - for each positive edge, retrieve all possible destinations based on the node type of the destination node
        - filter actual positive edges at the same timestamp with the same edge type

    Parameters:
        data: an object containing positive edges information
        split_mode: specifies whether to generate negative edges for 'validation' or 'test' splits
        filename: name of the file containing the generated negative edges
    """
    print(
        f"INFO: Negative Sampling Strategy: {self.strategy}, Data Split: {split_mode}"
    )
    assert split_mode in [
        "val",
        "test",
    ], "Invalid split-mode! It should be `val` or `test`!"

    if os.path.exists(filename):
        print(
            f"INFO: negative samples for '{split_mode}' evaluation are already generated!"
        )
    else:
        print(f"INFO: Generating negative samples for '{split_mode}' evaluation!")
        # retrieve the information from the batch
        pos_src, pos_dst, pos_timestamp, edge_type = (
            data.src.cpu().numpy(),
            data.dst.cpu().numpy(),
            data.t.cpu().numpy(),
            data.edge_type.cpu().numpy(),
        )

        # generate a list of negative destinations for each positive edge
        pos_edge_tqdm = tqdm(
            zip(pos_src, pos_dst, pos_timestamp, edge_type), total=len(pos_src)
        )

        edge_t_dict = {} # {(t, u, edge_type): {v_1, v_2, ..} }
        #! iterate once to put all edges into a dictionary for reference
        for (
            pos_s,
            pos_d,
            pos_t,
            edge_type,
        ) in pos_edge_tqdm:
            if (pos_t, pos_s, edge_type) not in edge_t_dict:
                edge_t_dict[(pos_t, pos_s, edge_type)] = {pos_d:1}
            else:
                edge_t_dict[(pos_t, pos_s, edge_type)][pos_d] = 1

        out_dict = {}
        for key in tqdm(edge_t_dict):
            conflict_set = np.array(list(edge_t_dict[key].keys()))
            pos_d = conflict_set[0]
            #* retieve the node type of the destination node as well 
            #! assumption, same edge type = same destination node type
            d_node_type = int(self.node_type[pos_d - self.first_node_id])
            all_dst = self.node_type_dict[d_node_type]
            if (self.num_neg_e == -1):
                filtered_all_dst = np.setdiff1d(all_dst, conflict_set)
            else:
                #* lazy sampling
                neg_d_arr = np.random.choice(
                    all_dst, self.num_neg_e, replace=False) #never replace negatives
                if len(np.setdiff1d(neg_d_arr, conflict_set)) < self.num_neg_e:
                    neg_d_arr = np.random.choice(
                        np.setdiff1d(all_dst, conflict_set), self.num_neg_e, replace=False)
                filtered_all_dst = neg_d_arr
            out_dict[key] = filtered_all_dst
        print ("ns samples for ", len(out_dict), " positive edges are generated")
        # save the generated evaluation set to disk
        save_pkl(out_dict, filename)

generate_negative_samples_random(data, split_mode, filename)

generate random negative edges for ablation study

Parameters:

Name Type Description Default
data TemporalData

an object containing positive edges information

required
split_mode str

specifies whether to generate negative edges for 'validation' or 'test' splits

required
filename str

name of the file containing the generated negative edges

required
Source code in tgb/linkproppred/thg_negative_generator.py
def generate_negative_samples_random(self, 
                                  data: TemporalData, 
                                  split_mode: str, 
                                  filename: str,
                                  ) -> None:
    r"""
    generate random negative edges for ablation study

    Parameters:
        data: an object containing positive edges information
        split_mode: specifies whether to generate negative edges for 'validation' or 'test' splits
        filename: name of the file containing the generated negative edges
    """
    print(
        f"INFO: Negative Sampling Strategy: {self.strategy}, Data Split: {split_mode}"
    )
    assert split_mode in [
        "val",
        "test",
    ], "Invalid split-mode! It should be `val` or `test`!"

    if os.path.exists(filename):
        print(
            f"INFO: negative samples for '{split_mode}' evaluation are already generated!"
        )
    else:
        print(f"INFO: Generating negative samples for '{split_mode}' evaluation!")
        # retrieve the information from the batch
        pos_src, pos_dst, pos_timestamp, edge_type = (
            data.src.cpu().numpy(),
            data.dst.cpu().numpy(),
            data.t.cpu().numpy(),
            data.edge_type.cpu().numpy(),
        )
        first_dst_id = self.edge_data.dst.min()
        last_dst_id = self.edge_data.dst.max()
        all_dst = np.arange(first_dst_id, last_dst_id + 1)
        evaluation_set = {}
        # generate a list of negative destinations for each positive edge
        pos_edge_tqdm = tqdm(
            zip(pos_src, pos_dst, pos_timestamp, edge_type), total=len(pos_src)
        )

        for (
            pos_s,
            pos_d,
            pos_t,
            edge_type,
        ) in pos_edge_tqdm:
            t_mask = pos_timestamp == pos_t
            src_mask = pos_src == pos_s
            fn_mask = np.logical_and(t_mask, src_mask)
            pos_e_dst_same_src = pos_dst[fn_mask]
            filtered_all_dst = np.setdiff1d(all_dst, pos_e_dst_same_src)
            if (self.num_neg_e > len(filtered_all_dst)):
                neg_d_arr = filtered_all_dst
            else:
                neg_d_arr = np.random.choice(
                filtered_all_dst, self.num_neg_e, replace=False) #never replace negatives
            evaluation_set[(pos_t, pos_s, edge_type)] = neg_d_arr
        save_pkl(evaluation_set, filename)

get_destinations_based_on_node_type(first_node_id, last_node_id, node_type)

get the destination node id arrays based on the node type Parameters: first_node_id: the first node id last_node_id: the last node id node_type: the node type of each node

Returns:

Name Type Description
node_type_dict dict

a dictionary containing the destination node ids for each node type

Source code in tgb/linkproppred/thg_negative_generator.py
def get_destinations_based_on_node_type(self, 
                                        first_node_id: int,
                                        last_node_id: int,
                                        node_type: np.ndarray) -> dict:
    r"""
    get the destination node id arrays based on the node type
    Parameters:
        first_node_id: the first node id
        last_node_id: the last node id
        node_type: the node type of each node

    Returns:
        node_type_dict: a dictionary containing the destination node ids for each node type
    """
    node_type_store = {}
    assert first_node_id <= last_node_id, "Invalid destination node ids!"
    assert len(node_type) == (last_node_id - first_node_id + 1), "node type array must match the indices"
    for k in range(len(node_type)):
        nt = int(node_type[k]) #node type must be ints
        nid = k + first_node_id
        if nt not in node_type_store:
            node_type_store[nt] = {nid:1}
        else:
            node_type_store[nt][nid] = 1
    node_type_dict = {}
    for ntype in node_type_store:
        node_type_dict[ntype] = np.array(list(node_type_store[ntype].keys()))
        assert np.all(np.diff(node_type_dict[ntype]) >= 0), "Destination node ids for a given type must be sorted"
        assert np.all(node_type_dict[ntype] <= last_node_id), "Destination node ids must be less than or equal to the last destination id"
    return node_type_dict

Sample negative edges for evaluation of dynamic link prediction Load already generated negative edges from file, batch them based on the positive edge, and return the evaluation set

THGNegativeEdgeSampler

Bases: object

Source code in tgb/linkproppred/thg_negative_sampler.py
class THGNegativeEdgeSampler(object):
    def __init__(
        self,
        dataset_name: str,
        first_node_id: int,
        last_node_id: int,
        node_type: np.ndarray,
        strategy: str = "node-type-filtered",
    ) -> None:
        r"""
        Negative Edge Sampler
            Loads and query the negative batches based on the positive batches provided.
            constructor for the negative edge sampler class

        Parameters:
            dataset_name: name of the dataset
            first_node_id: identity of the first node
            last_node_id: indentity of the last destination node
            node_type: the node type of each node
            strategy: will always load the pre-generated negatives

        Returns:
            None
        """
        self.dataset_name = dataset_name
        self.eval_set = {}
        self.first_node_id = first_node_id
        self.last_node_id = last_node_id
        self.node_type = node_type
        assert isinstance(self.node_type, np.ndarray), "node_type should be a numpy array"

    def load_eval_set(
        self,
        fname: str,
        split_mode: str = "val",
    ) -> None:
        r"""
        Load the evaluation set from disk, can be either val or test set ns samples
        Parameters:
            fname: the file name of the evaluation ns on disk
            split_mode: the split mode of the evaluation set, can be either `val` or `test`

        Returns:
            None
        """
        assert split_mode in [
            "val",
            "test",
        ], "Invalid split-mode! It should be `val`, `test`"
        if not os.path.exists(fname):
            raise FileNotFoundError(f"File not found at {fname}")
        self.eval_set[split_mode] = load_pkl(fname)

    def query_batch(self, 
                    pos_src: Union[Tensor, np.ndarray], 
                    pos_dst: Union[Tensor, np.ndarray], 
                    pos_timestamp: Union[Tensor, np.ndarray], 
                    edge_type: Union[Tensor, np.ndarray],
                    split_mode: str = "test") -> list:
        r"""
        For each positive edge in the `pos_batch`, return a list of negative edges
        `split_mode` specifies whether the valiation or test evaluation set should be retrieved.
        modify now to include edge type argument

        Parameters:
            pos_src: list of positive source nodes
            pos_dst: list of positive destination nodes
            pos_timestamp: list of timestamps of the positive edges
            split_mode: specifies whether to generate negative edges for 'validation' or 'test' splits

        Returns:
            neg_samples: list of numpy array; each array contains the set of negative edges that
                        should be evaluated against each positive edge.
        """
        assert split_mode in [
            "val",
            "test",
        ], "Invalid split-mode! It should be `val`, `test`!"
        if self.eval_set[split_mode] == None:
            raise ValueError(
                f"Evaluation set is None! You should load the {split_mode} evaluation set first!"
            )

        # check the argument types...
        if torch is not None and isinstance(pos_src, torch.Tensor):
            pos_src = pos_src.detach().cpu().numpy()
        if torch is not None and isinstance(pos_dst, torch.Tensor):
            pos_dst = pos_dst.detach().cpu().numpy()
        if torch is not None and isinstance(pos_timestamp, torch.Tensor):
            pos_timestamp = pos_timestamp.detach().cpu().numpy()
        if torch is not None and isinstance(edge_type, torch.Tensor):
            edge_type = edge_type.detach().cpu().numpy()

        if not isinstance(pos_src, np.ndarray) or not isinstance(pos_dst, np.ndarray) or not(pos_timestamp, np.ndarray) or not(edge_type, np.ndarray):
            raise RuntimeError(
                "pos_src, pos_dst, and pos_timestamp need to be either numpy ndarray or torch tensor!"
                )

        neg_samples = []
        for pos_s, pos_d, pos_t, e_type in zip(pos_src, pos_dst, pos_timestamp, edge_type):
            if (pos_t, pos_s, e_type) not in self.eval_set[split_mode]:
                raise ValueError(
                    f"The edge ({pos_s}, {pos_d}, {pos_t}, {e_type}) is not in the '{split_mode}' evaluation set! Please check the implementation."
                )
            else:
                filtered_dst = self.eval_set[split_mode]
                neg_d_arr = filtered_dst[(pos_t, pos_s, e_type)]
                neg_samples.append(
                        neg_d_arr
                    )

        #? can't convert to numpy array due to different lengths of negative samples
        return neg_samples

__init__(dataset_name, first_node_id, last_node_id, node_type, strategy='node-type-filtered')

Negative Edge Sampler Loads and query the negative batches based on the positive batches provided. constructor for the negative edge sampler class

Parameters:

Name Type Description Default
dataset_name str

name of the dataset

required
first_node_id int

identity of the first node

required
last_node_id int

indentity of the last destination node

required
node_type ndarray

the node type of each node

required
strategy str

will always load the pre-generated negatives

'node-type-filtered'

Returns:

Type Description
None

None

Source code in tgb/linkproppred/thg_negative_sampler.py
def __init__(
    self,
    dataset_name: str,
    first_node_id: int,
    last_node_id: int,
    node_type: np.ndarray,
    strategy: str = "node-type-filtered",
) -> None:
    r"""
    Negative Edge Sampler
        Loads and query the negative batches based on the positive batches provided.
        constructor for the negative edge sampler class

    Parameters:
        dataset_name: name of the dataset
        first_node_id: identity of the first node
        last_node_id: indentity of the last destination node
        node_type: the node type of each node
        strategy: will always load the pre-generated negatives

    Returns:
        None
    """
    self.dataset_name = dataset_name
    self.eval_set = {}
    self.first_node_id = first_node_id
    self.last_node_id = last_node_id
    self.node_type = node_type
    assert isinstance(self.node_type, np.ndarray), "node_type should be a numpy array"

load_eval_set(fname, split_mode='val')

Load the evaluation set from disk, can be either val or test set ns samples Parameters: fname: the file name of the evaluation ns on disk split_mode: the split mode of the evaluation set, can be either val or test

Returns:

Type Description
None

None

Source code in tgb/linkproppred/thg_negative_sampler.py
def load_eval_set(
    self,
    fname: str,
    split_mode: str = "val",
) -> None:
    r"""
    Load the evaluation set from disk, can be either val or test set ns samples
    Parameters:
        fname: the file name of the evaluation ns on disk
        split_mode: the split mode of the evaluation set, can be either `val` or `test`

    Returns:
        None
    """
    assert split_mode in [
        "val",
        "test",
    ], "Invalid split-mode! It should be `val`, `test`"
    if not os.path.exists(fname):
        raise FileNotFoundError(f"File not found at {fname}")
    self.eval_set[split_mode] = load_pkl(fname)

query_batch(pos_src, pos_dst, pos_timestamp, edge_type, split_mode='test')

For each positive edge in the pos_batch, return a list of negative edges split_mode specifies whether the valiation or test evaluation set should be retrieved. modify now to include edge type argument

Parameters:

Name Type Description Default
pos_src Union[Tensor, ndarray]

list of positive source nodes

required
pos_dst Union[Tensor, ndarray]

list of positive destination nodes

required
pos_timestamp Union[Tensor, ndarray]

list of timestamps of the positive edges

required
split_mode str

specifies whether to generate negative edges for 'validation' or 'test' splits

'test'

Returns:

Name Type Description
neg_samples list

list of numpy array; each array contains the set of negative edges that should be evaluated against each positive edge.

Source code in tgb/linkproppred/thg_negative_sampler.py
def query_batch(self, 
                pos_src: Union[Tensor, np.ndarray], 
                pos_dst: Union[Tensor, np.ndarray], 
                pos_timestamp: Union[Tensor, np.ndarray], 
                edge_type: Union[Tensor, np.ndarray],
                split_mode: str = "test") -> list:
    r"""
    For each positive edge in the `pos_batch`, return a list of negative edges
    `split_mode` specifies whether the valiation or test evaluation set should be retrieved.
    modify now to include edge type argument

    Parameters:
        pos_src: list of positive source nodes
        pos_dst: list of positive destination nodes
        pos_timestamp: list of timestamps of the positive edges
        split_mode: specifies whether to generate negative edges for 'validation' or 'test' splits

    Returns:
        neg_samples: list of numpy array; each array contains the set of negative edges that
                    should be evaluated against each positive edge.
    """
    assert split_mode in [
        "val",
        "test",
    ], "Invalid split-mode! It should be `val`, `test`!"
    if self.eval_set[split_mode] == None:
        raise ValueError(
            f"Evaluation set is None! You should load the {split_mode} evaluation set first!"
        )

    # check the argument types...
    if torch is not None and isinstance(pos_src, torch.Tensor):
        pos_src = pos_src.detach().cpu().numpy()
    if torch is not None and isinstance(pos_dst, torch.Tensor):
        pos_dst = pos_dst.detach().cpu().numpy()
    if torch is not None and isinstance(pos_timestamp, torch.Tensor):
        pos_timestamp = pos_timestamp.detach().cpu().numpy()
    if torch is not None and isinstance(edge_type, torch.Tensor):
        edge_type = edge_type.detach().cpu().numpy()

    if not isinstance(pos_src, np.ndarray) or not isinstance(pos_dst, np.ndarray) or not(pos_timestamp, np.ndarray) or not(edge_type, np.ndarray):
        raise RuntimeError(
            "pos_src, pos_dst, and pos_timestamp need to be either numpy ndarray or torch tensor!"
            )

    neg_samples = []
    for pos_s, pos_d, pos_t, e_type in zip(pos_src, pos_dst, pos_timestamp, edge_type):
        if (pos_t, pos_s, e_type) not in self.eval_set[split_mode]:
            raise ValueError(
                f"The edge ({pos_s}, {pos_d}, {pos_t}, {e_type}) is not in the '{split_mode}' evaluation set! Please check the implementation."
            )
        else:
            filtered_dst = self.eval_set[split_mode]
            neg_d_arr = filtered_dst[(pos_t, pos_s, e_type)]
            neg_samples.append(
                    neg_d_arr
                )

    #? can't convert to numpy array due to different lengths of negative samples
    return neg_samples