Skip to content

Ocel

OCEL

Source code in src/ocelescope/src/ocelescope/ocel/ocel.py
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
class OCEL:
    def __init__(self, ocel: PM4PYOCEL, id: Optional[str] = None):
        self._id = id if id is not None else str(uuid4())

        self.ocel: PM4PYOCEL = ocel
        # Metadata, to be set manually after creating the instance
        self.meta: dict[str, Any] = {}
        self._cache_info = {}

        # Used to distinguish multiple ocels with the same id but one is filtered form
        self.state_id = str(uuid4())

        # extensions
        self._extensions: dict[type[OCELExtension], OCELExtension] = {}

        self._init_cache()

    def _init_cache(self):
        # Instance-level cache object (using cachetools)
        self.cache = LRUCache(maxsize=128)
        self.cache_lock = Lock()

    @property
    def id(self) -> str:
        return self._id

    # ----- Pm4py Aliases ------------------------------------------------------------------------------------------
    # region

    @property
    def events(self):
        return self.ocel.events

    @property
    def objects(self):
        return self.ocel.objects

    @property
    def object_changes(self):
        return self.ocel.object_changes

    @property
    def relations(self):
        return self.ocel.relations

    # endregion
    # ----- BASIC PROPERTIES / STATS ------------------------------------------------------------------------------------------
    # region

    @property
    @instance_lru_cache()
    def activities(self) -> list[str]:
        return list(sorted(self.ocel.events["ocel:activity"].unique().tolist()))

    @property
    @instance_lru_cache()
    def activity_counts(self) -> pd.Series:
        return self.ocel.events["ocel:activity"].value_counts()

    @property
    @instance_lru_cache()
    def object_types(self) -> list[str]:
        return list(sorted(self.ocel.objects["ocel:type"].unique().tolist()))

    @property
    def otypes(self) -> list[str]:
        """Alias for object_types"""
        return self.object_types

    @property
    @instance_lru_cache()
    def otype_counts(self) -> pd.Series:
        return self.ocel.objects["ocel:type"].value_counts()

    @property
    @instance_lru_cache()
    def objects_with_otypes(
        self,
    ) -> pd.Series:
        """pandas Series containing the object type of each object"""
        return self.ocel.objects[["ocel:oid", "ocel:type"]].set_index("ocel:oid")[  # type: ignore
            "ocel:type"
        ]

    @property
    @instance_lru_cache()
    def events_with_activities(self) -> pd.Series:
        """pandas Series containing the activity of each event"""
        return self.ocel.events[["ocel:eid", "ocel:activity"]].set_index("ocel:eid")[  # type: ignore
            "ocel:activity"
        ]

    @property
    def obj_otypes(self) -> pd.Series:
        """Alias for objects_with_otypes"""
        return self.objects_with_otypes

    @property
    def event_activities(self) -> pd.Series:
        """Alias for events_with_activities"""
        return self.events_with_activities

    def has_object_types(self, otypes: Iterable[str]) -> bool:
        return all(ot in self.otypes for ot in otypes)

    def has_activities(self, activities: Iterable[str]) -> bool:
        return all(act in self.activities for act in activities)

    # endregion

    # ----- Filtering ------------------------------------------------------------------------------------------
    # region

    def apply_filter(self, filters: OCELFilter) -> OCEL:
        from .filter import apply_filters

        filtered_ocel = apply_filters(self, filters=filters)
        filtered_ocel.meta = self.meta
        filtered_ocel._extensions = self._extensions

        return filtered_ocel

    # endregion
    # ----- PROCESS DISCOVERY ------------------------------------------------------------------------------------------
    # region

    @instance_lru_cache(make_hashable=True)
    def ocpn(
        self,
        otypes: set[str] | None = None,
        inductive_miner_variant: Literal["im", "imd"] = "im",
        diagnostics_with_tbr: bool = False,
    ) -> dict[str, Any]:
        """
        Discovers an Object-centric Petri Net (OCPN), filtering for a given list of object types.
        Uses a custom cache, able to save multiple OCPNs for different object type sets.

        Wrapper for pm4py's OCPN discovery method (pm4py.discover_oc_petri_net)
        """
        # Complete parameters
        if otypes is None:
            otypes = set(self.otypes)
        sorted_otypes = sorted([ot for ot in otypes if ot in self.otypes])
        if not sorted_otypes:
            raise ValueError("OCPN Discovery received invalid or empty object type set.")

        # Discover OCPN
        # TODO might use own filter function
        filtered_ocel = pm4py.filter_ocel_object_types(self.ocel, sorted_otypes)
        ocpn = pm4py.discover_oc_petri_net(
            filtered_ocel,
            inductive_miner_variant=inductive_miner_variant,
            diagnostics_with_tbr=diagnostics_with_tbr,
        )

        return ocpn

    @instance_lru_cache()
    def flatten(self, otype: str) -> pd.DataFrame:
        if otype not in self.otypes:
            raise ValueError(f"Object type '{otype}' not found")
        return pm4py.ocel.ocel_flattening(ocel=self.ocel, object_type=otype)

    @instance_lru_cache()
    def directly_follows_graph(self, otype: str) -> dict[tuple[str, str], int]:
        dfg, _, _ = pm4py.discovery.discover_directly_follows_graph(self.flatten(otype))
        return dfg

    def dfg(self, otype: str):
        """Alias of directly_follows_graph"""
        return self.directly_follows_graph(otype)

    @instance_lru_cache()
    def eventually_follows_graph(self, otype: str) -> set[tuple[str, str]]:
        """Discovers the eventually-follows graph of the flattened log, without frequencies."""
        dfg = self.directly_follows_graph(otype=otype)
        DFG = nx.DiGraph()
        DFG.add_edges_from(dfg.keys())
        EFG = nx.transitive_closure(DFG)

        # Output graph as edge set
        # efg = {u: set(EFG.successors(u)) for u in EFG.nodes() if EFG.out_degree(u)}
        efg = set(EFG.edges())
        return efg

    def efg(self, otype: str):
        """Alias of eventually_follows_graph"""
        return self.eventually_follows_graph(otype)

    # endregion

    # ----- O2O RELATIONS ------------------------------------------------------------------------------------------
    # region

    @property
    @instance_lru_cache()
    def o2o(self):
        """O2O relationships, with object types"""
        return self.join_otypes(self.ocel.o2o.rename(columns={"ocel:oid": "ocel:oid_1"}))

    @instance_lru_cache()
    def o2o_summary(self, direction: Optional[Literal["source", "target"]] = "source"):
        return summarize_o2o_counts(self.ocel, direction=direction)

    # endregion
    # ----- E2O RELATIONS ------------------------------------------------------------------------------------------
    # region

    @instance_lru_cache()
    def e2o_summary(self, direction: Optional[Literal["source", "target"]] = "source"):
        return summarize_e2o_counts(self.ocel, direction=direction)

    # endregion
    # ----- ATTRIBUTES ------------------------------------------------------------------------------------------
    # region
    @property
    def eattr_names(self) -> list[str]:
        return sorted([col for col in self.ocel.events.columns if not col.startswith("ocel:")])

    @property
    def oattr_names_static(self) -> list[str]:
        return sorted(
            [
                col
                for col in self.ocel.objects.columns[self.ocel.objects.count() > 0]
                if not col.startswith("ocel:")
            ]
        )

    @property
    def oattr_names_dynamic(self) -> list[str]:
        return sorted(
            [
                col
                for col in self.ocel.object_changes.columns[self.ocel.object_changes.count() > 0]
                if not col.startswith("ocel:") and col != "@@cumcount"
            ]
        )

    @property
    def oattr_names(self) -> list[str]:
        return sorted(set(self.oattr_names_static + self.oattr_names_dynamic))

    @property
    @instance_lru_cache()
    def object_attribute_summary(self) -> dict[str, list[AttributeSummary]]:
        return summarize_object_attributes(self.ocel)

    @property
    @instance_lru_cache()
    def event_attribute_summary(self) -> dict[str, list[AttributeSummary]]:
        return summarize_event_attributes(self.ocel)

    # endregion

    # ----- OBJECT LIFECYCLES, ACTIVITY ORDER ------------------------------------------------------------------------------------------
    # region

    @property
    @instance_lru_cache()
    def num_events_per_object(self):
        return self.join_otype(
            self.ocel.relations.groupby("ocel:oid")["ocel:eid"]
            .count()
            .rename("num_events")
            .reset_index()
        )

    @property
    @instance_lru_cache()
    def median_num_events_per_otype(self):
        return self.num_events_per_object.groupby("ocel:type")["num_events"].median()

    @instance_lru_cache()
    def sort_otypes(self) -> list[str]:
        """A sorted list of the object types. Object types are sorted by the median number of events per object."""
        return (
            self.median_num_events_per_otype.reset_index()
            .sort_values(["num_events", "ocel:type"])["ocel:type"]
            .tolist()
        )

    # endregion

    # ----- E2O Relations ------------------------------------------------------------------------------------------
    # region

    @property
    @instance_lru_cache()
    def type_relations(self) -> pd.DataFrame:
        x: pd.Series = self.ocel.relations.groupby(
            ["ocel:activity", "ocel:type", "ocel:qualifier"]
        ).size()  # type: ignore
        return x.reset_index(name="freq")

    @property
    @instance_lru_cache()
    def type_relation_frequencies(self) -> pd.Series:
        return self.type_relations.groupby(["ocel:activity", "ocel:type"])["freq"].sum()

    @property
    @instance_lru_cache()
    def objects_per_event(self) -> pd.DataFrame:
        """Computes the number of objects per event, grouped by activity and object type, aggregated by mean, min, median, max."""
        # TODO nonzero does not work here. Due to the groupby calls, there are no zero entries, leading to nonzero being either 1 or NaN.
        type_relations: pd.DataFrame = (
            self.relations.groupby(["ocel:eid", "ocel:activity", "ocel:type"], as_index=False)
            .size()
            .rename(columns={"size": "num_objects"})  # type: ignore
            .groupby(["ocel:activity", "ocel:type"], as_index=False)["num_objects"]
            .pipe(mmmm, nonzero=False, dtype=int)  # type: ignore
        )
        type_relations["always"] = np.where(
            type_relations["min"] == type_relations["max"],
            type_relations["min"],
            np.nan,
        )
        type_relations["unique"] = type_relations["max"] == 1
        type_relations["always_unique"] = type_relations["always"] == 1
        type_relation_stats = pd.pivot(
            type_relations,
            columns="ocel:type",
            index="ocel:activity",
            values=type_relations.columns[2:],  # type: ignore
        )  # type: ignore

        return type_relation_stats

    @property
    @instance_lru_cache()
    def objects_per_activity(self) -> pd.DataFrame:
        """Counts the number of objects of each type related to events of an activity.
        Returns a DataFrame with min/max number of objects per event and the (relative) number of events that have any object.
        Counts separately for different qualifiers.
        """
        event_otypes = (
            self.relations.groupby(["ocel:eid", "ocel:type", "ocel:qualifier"], as_index=False)
            .agg({"ocel:oid": "size", "ocel:activity": "first"})
            .rename(columns={"ocel:oid": "num_objs"})
        )
        act_otype_counts = (
            event_otypes.groupby(["ocel:activity", "ocel:type", "ocel:qualifier"], as_index=False)[
                "num_objs"
            ]
            .agg(["min", "max", "mean", np.count_nonzero])
            .rename(columns={"count_nonzero": "nonzero_abs"})
        )
        act_otype_counts = act_otype_counts.join(
            self.activity_counts.rename("num_events"), on="ocel:activity"
        )
        act_otype_counts["nonzero_rel"] = (
            act_otype_counts["nonzero_abs"] / act_otype_counts["num_events"]
        )
        return act_otype_counts

    def unique_objects_per_activity(
        self,
        min_rel_freq: float = 0,
    ) -> pd.DataFrame:
        """Get unique objects per type/qualifier for given activity
        Includes the share of events that are related to at least one of the given otype/qualifier (nonzero_rel)
        Filter for max. 1 object of its type/qualifier per event, and minimum relative frequency per event as described above.
        Includes rows with qualifier=None representing otype/activity relations with any qualifier.
        """

        # Unique without qualifier filtering (sum over qualifiers of min/max/mean)
        rel_stats_overall = self.objects_per_activity.groupby(
            ["ocel:activity", "ocel:type"], as_index=False
        )[["min", "max", "nonzero_rel"]].agg("sum")
        rel_stats_overall.insert(2, "ocel:qualifier", None)

        # Unique per qualifier
        rel_stats_qual = self.objects_per_activity[rel_stats_overall.columns.tolist()]

        rel_stats = pd.concat(
            [rel_stats_overall, rel_stats_qual],
            ignore_index=True,
        ).sort_values(["ocel:activity", "ocel:type", "ocel:qualifier"], na_position="first")
        rel_stats = rel_stats[(rel_stats["max"] == 1) & (rel_stats["nonzero_rel"] >= min_rel_freq)]
        return rel_stats

    # endregion

    # ----- E2O Qualifiers ------------------------------------------------------------------------------------------
    # region

    @property
    @instance_lru_cache()
    def qualifier_frequencies(self) -> pd.DataFrame:
        return self.type_relations

    @instance_lru_cache()
    def get_qualifiers(
        self,
        otype: str | None = None,
        activity: str | None = None,
    ) -> set[str]:
        qf = self.qualifier_frequencies
        if otype:
            qf = qf[qf["ocel:type"] == otype]
        if activity:
            qf = qf[qf["ocel:activity"] == activity]
        return set(qf["ocel:qualifier"])

    @instance_lru_cache()
    def are_qualifiers_unique(self) -> bool:
        """Returns true iff e2o qualifiers are uniquely determined by activity and object type."""
        return (self.type_relations.groupby(["ocel:activity", "ocel:type"]).size() == 1).all()  # type: ignore

    # endregion

    # ----- HELPER FUNCTIONS ------------------------------------------------------------------------------------------
    # region
    def join_otype(
        self, df: pd.DataFrame, col_oid: str = "ocel:oid", col_otype: str = "ocel:type"
    ) -> pd.DataFrame:
        """Enriches a DataFrame containing an object ID column with their object types."""
        return df.join(self.obj_otypes.rename(col_otype), on=col_oid)

    def join_otypes(
        self,
        df: pd.DataFrame,
        col_oid_1: str = "ocel:oid_1",
        col_oid_2: str = "ocel:oid_2",
        col_otype_1: str = "ocel:type_1",
        col_otype_2: str = "ocel:type_2",
    ) -> pd.DataFrame:
        """Enriches a DataFrame containing two object ID columns with their object types."""
        df = df.join(self.obj_otypes.rename(col_otype_1), on=col_oid_1)
        df = df.join(self.obj_otypes.rename(col_otype_2), on=col_oid_2)
        return df

    def join_activity(
        self,
        df: pd.DataFrame,
        col_eid: str = "ocel:eid",
        col_activity: str = "ocel:activity",
    ) -> pd.DataFrame:
        """Enriches a DataFrame containing an event ID column with their event types (activities)."""
        return df.join(self.event_activities.rename(col_activity), on=col_eid)

    def join_activities(
        self,
        df: pd.DataFrame,
        col_eid_1: str = "ocel:eid_1",
        col_eid_2: str = "ocel:eid_2",
        col_activity_1: str = "ocel:activity_1",
        col_activity_2: str = "ocel:activity_2",
    ) -> pd.DataFrame:
        """Enriches a DataFrame containing two event ID columns with their event types (activities)."""
        df = df.join(self.event_activities.rename(col_activity_1), on=col_eid_1)
        df = df.join(self.event_activities.rename(col_activity_2), on=col_eid_2)
        return df

    # endregion

    # ----- OCELWrapper CLASS UTILS ------------------------------------------------------------------------------------------
    # region

    def __str__(self):
        return f"OCELWrapper [{len(self.events)} events, {len(self.objects)} objects]"

    def __repr__(self):
        return str(self)

    def __deepcopy__(self, memo: dict[int, Any]):
        # TODO revisit this. Are the underlying DataFrames mutable? If not, might optimize this
        pm4py_ocel = deepcopy(self.ocel, memo)
        ocel = OCEL(ocel=pm4py_ocel, id=str(uuid4()))
        ocel.meta = deepcopy(self.meta, memo)
        return ocel

    @property
    def cache_size(self):
        return {name: cache_info.currsize for name, cache_info in self._cache_info.items()}

    # endregion

    # ----- CONSTRUCTOR-LIKE ----------------------------------------------------------------------------------
    # region

    def event_projections(self, events: list[set[str]]) -> list[OCEL]:
        """
        Given subsets of the event IDs (not necessarily distinct or complete),
        create new OCELs, each containing the given event set.
        The new OCELs contain all objects linked to the given events.
        """
        split = []
        for C in events:
            sublog = pm4py.filter_ocel_events(self.ocel, C)
            split.append(OCEL(sublog))
        return split

    def object_projections(self, objects: list[set[str]]) -> list[OCEL]:
        """
        Given subsets of the object IDs (not necessarily distinct or complete),
        create new OCELs, each containing the given object set.
        The new OCELs contain all events linked to the given objects.
        """
        split = []
        for C in objects:
            sublog = pm4py.filter_ocel_objects(self.ocel, C)
            split.append(OCEL(sublog))
        return split

    # endregion

    # ----- IMPORT WRAPPER FUNCTIONS ------------------------------------------------------------------------------------------
    # region
    @staticmethod
    def read_ocel(
        path: Path,
        original_file_name: str | None = None,
        version_info: bool = False,
        upload_date: datetime | None = None,
    ) -> OCEL:
        report = {}

        if version_info:
            report["pythonVersion"] = platform.python_version()
            report["pm4pyVersion"] = pm4py.__version__

        with warnings.catch_warnings(record=True):
            match path.suffix:
                case ".sqlite":
                    pm4py_ocel = pm4py.read.read_ocel2_sqlite(str(path))
                case ".xmlocel":
                    pm4py_ocel = pm4py.read.read_ocel2_xml(str(path))
                case ".jsonocel":
                    pm4py_ocel = pm4py.read.read_ocel2_json(str(path))
                case _:
                    raise ValueError(f"Unsupported extension: {path.suffix}")

        ocel = OCEL(pm4py_ocel)

        report["ocelStrPm4py"] = str(pm4py_ocel)
        report["ocelStr"] = str(ocel)

        ocel.meta = {
            "path": str(path),
            "fileName": original_file_name or str(path.name),
            "importReport": report,
            "uploadDate": upload_date.isoformat() if upload_date else datetime.now().isoformat(),
        }

        return ocel

    def write_ocel(
        self,
        file_path: Path,
        ext: OCELFileExtensions,
    ):
        match ext:
            case ".xmlocel":
                pm4py.write_ocel2_xml(self.ocel, str(file_path))
            case ".jsonocel":
                pm4py.write_ocel2_json(self.ocel, str(file_path))
            case _:
                pm4py.write_ocel2_sqlite(self.ocel, str(file_path))

        for extension in self.get_extensions_list():
            if ext in extension.supported_extensions:
                try:
                    extension.export_extension(file_path)
                except Exception:
                    print("failed to write extension")

    # endregion
    #
    def rename(self, new_name: str):
        self.meta["fileName"] = new_name

    # ----- EXTENTIONS ------------------------------------------------------------------------------------------
    # region
    def load_extension(self, extensions: list[type[OCELExtension]]):
        path = self.meta.get("path")

        if not path:
            return

        path = Path(path)

        for ext_cls in extensions:
            try:
                if path.suffix in ext_cls.supported_extensions and ext_cls.has_extension(path):
                    self._extensions[ext_cls] = ext_cls.import_extension(ocel=self, path=path)
            except Exception:
                print("failed to load extension")

    def get_extension(self, extension: type[T]) -> Optional[T]:
        return cast(Optional[T], self._extensions.get(extension))

    def get_extensions_list(self) -> list[OCELExtension]:
        """Returns a list of all loaded extensions."""
        return list(self._extensions.values())

otypes property

otypes

Alias for object_types

objects_with_otypes property

objects_with_otypes

pandas Series containing the object type of each object

events_with_activities property

events_with_activities

pandas Series containing the activity of each event

obj_otypes property

obj_otypes

Alias for objects_with_otypes

event_activities property

event_activities

Alias for events_with_activities

o2o property

o2o

O2O relationships, with object types

objects_per_event property

objects_per_event

Computes the number of objects per event, grouped by activity and object type, aggregated by mean, min, median, max.

objects_per_activity property

objects_per_activity

Counts the number of objects of each type related to events of an activity. Returns a DataFrame with min/max number of objects per event and the (relative) number of events that have any object. Counts separately for different qualifiers.

ocpn

ocpn(otypes=None, inductive_miner_variant='im', diagnostics_with_tbr=False)

Discovers an Object-centric Petri Net (OCPN), filtering for a given list of object types. Uses a custom cache, able to save multiple OCPNs for different object type sets.

Wrapper for pm4py's OCPN discovery method (pm4py.discover_oc_petri_net)

Source code in src/ocelescope/src/ocelescope/ocel/ocel.py
@instance_lru_cache(make_hashable=True)
def ocpn(
    self,
    otypes: set[str] | None = None,
    inductive_miner_variant: Literal["im", "imd"] = "im",
    diagnostics_with_tbr: bool = False,
) -> dict[str, Any]:
    """
    Discovers an Object-centric Petri Net (OCPN), filtering for a given list of object types.
    Uses a custom cache, able to save multiple OCPNs for different object type sets.

    Wrapper for pm4py's OCPN discovery method (pm4py.discover_oc_petri_net)
    """
    # Complete parameters
    if otypes is None:
        otypes = set(self.otypes)
    sorted_otypes = sorted([ot for ot in otypes if ot in self.otypes])
    if not sorted_otypes:
        raise ValueError("OCPN Discovery received invalid or empty object type set.")

    # Discover OCPN
    # TODO might use own filter function
    filtered_ocel = pm4py.filter_ocel_object_types(self.ocel, sorted_otypes)
    ocpn = pm4py.discover_oc_petri_net(
        filtered_ocel,
        inductive_miner_variant=inductive_miner_variant,
        diagnostics_with_tbr=diagnostics_with_tbr,
    )

    return ocpn

dfg

dfg(otype)

Alias of directly_follows_graph

Source code in src/ocelescope/src/ocelescope/ocel/ocel.py
def dfg(self, otype: str):
    """Alias of directly_follows_graph"""
    return self.directly_follows_graph(otype)

eventually_follows_graph

eventually_follows_graph(otype)

Discovers the eventually-follows graph of the flattened log, without frequencies.

Source code in src/ocelescope/src/ocelescope/ocel/ocel.py
@instance_lru_cache()
def eventually_follows_graph(self, otype: str) -> set[tuple[str, str]]:
    """Discovers the eventually-follows graph of the flattened log, without frequencies."""
    dfg = self.directly_follows_graph(otype=otype)
    DFG = nx.DiGraph()
    DFG.add_edges_from(dfg.keys())
    EFG = nx.transitive_closure(DFG)

    # Output graph as edge set
    # efg = {u: set(EFG.successors(u)) for u in EFG.nodes() if EFG.out_degree(u)}
    efg = set(EFG.edges())
    return efg

efg

efg(otype)

Alias of eventually_follows_graph

Source code in src/ocelescope/src/ocelescope/ocel/ocel.py
def efg(self, otype: str):
    """Alias of eventually_follows_graph"""
    return self.eventually_follows_graph(otype)

sort_otypes

sort_otypes()

A sorted list of the object types. Object types are sorted by the median number of events per object.

Source code in src/ocelescope/src/ocelescope/ocel/ocel.py
@instance_lru_cache()
def sort_otypes(self) -> list[str]:
    """A sorted list of the object types. Object types are sorted by the median number of events per object."""
    return (
        self.median_num_events_per_otype.reset_index()
        .sort_values(["num_events", "ocel:type"])["ocel:type"]
        .tolist()
    )

unique_objects_per_activity

unique_objects_per_activity(min_rel_freq=0)

Get unique objects per type/qualifier for given activity Includes the share of events that are related to at least one of the given otype/qualifier (nonzero_rel) Filter for max. 1 object of its type/qualifier per event, and minimum relative frequency per event as described above. Includes rows with qualifier=None representing otype/activity relations with any qualifier.

Source code in src/ocelescope/src/ocelescope/ocel/ocel.py
def unique_objects_per_activity(
    self,
    min_rel_freq: float = 0,
) -> pd.DataFrame:
    """Get unique objects per type/qualifier for given activity
    Includes the share of events that are related to at least one of the given otype/qualifier (nonzero_rel)
    Filter for max. 1 object of its type/qualifier per event, and minimum relative frequency per event as described above.
    Includes rows with qualifier=None representing otype/activity relations with any qualifier.
    """

    # Unique without qualifier filtering (sum over qualifiers of min/max/mean)
    rel_stats_overall = self.objects_per_activity.groupby(
        ["ocel:activity", "ocel:type"], as_index=False
    )[["min", "max", "nonzero_rel"]].agg("sum")
    rel_stats_overall.insert(2, "ocel:qualifier", None)

    # Unique per qualifier
    rel_stats_qual = self.objects_per_activity[rel_stats_overall.columns.tolist()]

    rel_stats = pd.concat(
        [rel_stats_overall, rel_stats_qual],
        ignore_index=True,
    ).sort_values(["ocel:activity", "ocel:type", "ocel:qualifier"], na_position="first")
    rel_stats = rel_stats[(rel_stats["max"] == 1) & (rel_stats["nonzero_rel"] >= min_rel_freq)]
    return rel_stats

are_qualifiers_unique

are_qualifiers_unique()

Returns true iff e2o qualifiers are uniquely determined by activity and object type.

Source code in src/ocelescope/src/ocelescope/ocel/ocel.py
@instance_lru_cache()
def are_qualifiers_unique(self) -> bool:
    """Returns true iff e2o qualifiers are uniquely determined by activity and object type."""
    return (self.type_relations.groupby(["ocel:activity", "ocel:type"]).size() == 1).all()  # type: ignore

join_otype

join_otype(df, col_oid='ocel:oid', col_otype='ocel:type')

Enriches a DataFrame containing an object ID column with their object types.

Source code in src/ocelescope/src/ocelescope/ocel/ocel.py
def join_otype(
    self, df: pd.DataFrame, col_oid: str = "ocel:oid", col_otype: str = "ocel:type"
) -> pd.DataFrame:
    """Enriches a DataFrame containing an object ID column with their object types."""
    return df.join(self.obj_otypes.rename(col_otype), on=col_oid)

join_otypes

join_otypes(df, col_oid_1='ocel:oid_1', col_oid_2='ocel:oid_2', col_otype_1='ocel:type_1', col_otype_2='ocel:type_2')

Enriches a DataFrame containing two object ID columns with their object types.

Source code in src/ocelescope/src/ocelescope/ocel/ocel.py
def join_otypes(
    self,
    df: pd.DataFrame,
    col_oid_1: str = "ocel:oid_1",
    col_oid_2: str = "ocel:oid_2",
    col_otype_1: str = "ocel:type_1",
    col_otype_2: str = "ocel:type_2",
) -> pd.DataFrame:
    """Enriches a DataFrame containing two object ID columns with their object types."""
    df = df.join(self.obj_otypes.rename(col_otype_1), on=col_oid_1)
    df = df.join(self.obj_otypes.rename(col_otype_2), on=col_oid_2)
    return df

join_activity

join_activity(df, col_eid='ocel:eid', col_activity='ocel:activity')

Enriches a DataFrame containing an event ID column with their event types (activities).

Source code in src/ocelescope/src/ocelescope/ocel/ocel.py
def join_activity(
    self,
    df: pd.DataFrame,
    col_eid: str = "ocel:eid",
    col_activity: str = "ocel:activity",
) -> pd.DataFrame:
    """Enriches a DataFrame containing an event ID column with their event types (activities)."""
    return df.join(self.event_activities.rename(col_activity), on=col_eid)

join_activities

join_activities(df, col_eid_1='ocel:eid_1', col_eid_2='ocel:eid_2', col_activity_1='ocel:activity_1', col_activity_2='ocel:activity_2')

Enriches a DataFrame containing two event ID columns with their event types (activities).

Source code in src/ocelescope/src/ocelescope/ocel/ocel.py
def join_activities(
    self,
    df: pd.DataFrame,
    col_eid_1: str = "ocel:eid_1",
    col_eid_2: str = "ocel:eid_2",
    col_activity_1: str = "ocel:activity_1",
    col_activity_2: str = "ocel:activity_2",
) -> pd.DataFrame:
    """Enriches a DataFrame containing two event ID columns with their event types (activities)."""
    df = df.join(self.event_activities.rename(col_activity_1), on=col_eid_1)
    df = df.join(self.event_activities.rename(col_activity_2), on=col_eid_2)
    return df

event_projections

event_projections(events)

Given subsets of the event IDs (not necessarily distinct or complete), create new OCELs, each containing the given event set. The new OCELs contain all objects linked to the given events.

Source code in src/ocelescope/src/ocelescope/ocel/ocel.py
def event_projections(self, events: list[set[str]]) -> list[OCEL]:
    """
    Given subsets of the event IDs (not necessarily distinct or complete),
    create new OCELs, each containing the given event set.
    The new OCELs contain all objects linked to the given events.
    """
    split = []
    for C in events:
        sublog = pm4py.filter_ocel_events(self.ocel, C)
        split.append(OCEL(sublog))
    return split

object_projections

object_projections(objects)

Given subsets of the object IDs (not necessarily distinct or complete), create new OCELs, each containing the given object set. The new OCELs contain all events linked to the given objects.

Source code in src/ocelescope/src/ocelescope/ocel/ocel.py
def object_projections(self, objects: list[set[str]]) -> list[OCEL]:
    """
    Given subsets of the object IDs (not necessarily distinct or complete),
    create new OCELs, each containing the given object set.
    The new OCELs contain all events linked to the given objects.
    """
    split = []
    for C in objects:
        sublog = pm4py.filter_ocel_objects(self.ocel, C)
        split.append(OCEL(sublog))
    return split

get_extensions_list

get_extensions_list()

Returns a list of all loaded extensions.

Source code in src/ocelescope/src/ocelescope/ocel/ocel.py
def get_extensions_list(self) -> list[OCELExtension]:
    """Returns a list of all loaded extensions."""
    return list(self._extensions.values())

OCELExtension

Bases: ABC

Abstract base class for OCEL extensions that can be imported/exported from a file path.

Source code in src/ocelescope/src/ocelescope/ocel/extension.py
class OCELExtension(ABC):
    """
    Abstract base class for OCEL extensions that can be imported/exported from a file path.
    """

    name: str
    description: str
    version: str
    supported_extensions: list[OCELFileExtensions]

    @staticmethod
    @abstractmethod
    def has_extension(path: Path) -> bool:
        """
        Check if the extension data exists at the given path.
        """
        pass

    @classmethod
    @abstractmethod
    def import_extension(cls: type[T], ocel: "OCEL", path: Path) -> T:
        """
        Create the extension by reading from the given path.
        """
        pass

    @abstractmethod
    def export_extension(self, path: Path) -> None:
        """
        Write the extension data to the given path.
        """
        pass

has_extension abstractmethod staticmethod

has_extension(path)

Check if the extension data exists at the given path.

Source code in src/ocelescope/src/ocelescope/ocel/extension.py
@staticmethod
@abstractmethod
def has_extension(path: Path) -> bool:
    """
    Check if the extension data exists at the given path.
    """
    pass

import_extension abstractmethod classmethod

import_extension(ocel, path)

Create the extension by reading from the given path.

Source code in src/ocelescope/src/ocelescope/ocel/extension.py
@classmethod
@abstractmethod
def import_extension(cls: type[T], ocel: "OCEL", path: Path) -> T:
    """
    Create the extension by reading from the given path.
    """
    pass

export_extension abstractmethod

export_extension(path)

Write the extension data to the given path.

Source code in src/ocelescope/src/ocelescope/ocel/extension.py
@abstractmethod
def export_extension(self, path: Path) -> None:
    """
    Write the extension data to the given path.
    """
    pass