Skip to content
v0.2.1

Filters

class ObjectTypeFilter(BaseFilter):
Source
class ObjectTypeFilter(BaseFilter):
object_types: Annotated[list[str], Field(json_schema_extra={"fieldType": "object_type"})]
mode: Literal["exclude", "include"] = "include"
def filter(self, ocel):
mask = cast(pd.Series, ocel.objects.df[OTYPE_COL].isin(self.object_types))
if self.mode == "exclude":
mask = ~mask
return FilterResult(objects=mask)
class EventTypeFilter(BaseFilter):
Source
class EventTypeFilter(BaseFilter):
event_types: Annotated[list[str], Field(json_schema_extra={"fieldType": "event_type"})]
mode: Literal["exclude", "include"] = "include"
def filter(self, ocel):
mask = cast(pd.Series, ocel.events.df[ACTIVITY_COL].isin(self.event_types))
if self.mode == "exclude":
mask = ~mask
return FilterResult(events=mask)
class ObjectTypeFrequencyFilter(BaseFilter, FrequencyFilterConfig):
Source
class ObjectTypeFrequencyFilter(BaseFilter, FrequencyFilterConfig):
def filter(self, ocel):
objects_df = ocel.objects.df
total_objects = len(objects_df)
value_counts = objects_df[OTYPE_COL].value_counts()
min_count = int(total_objects * (self.threshold_percentage / 100))
qualifying = value_counts[value_counts >= min_count].index
mask = cast(Series, objects_df[OTYPE_COL].isin(qualifying))
if self.mode == "exclude":
mask = ~mask
return FilterResult(objects=mask)
class EventTypeFrequencyFilter(BaseFilter, FrequencyFilterConfig):
Source
class EventTypeFrequencyFilter(BaseFilter, FrequencyFilterConfig):
def filter(self, ocel):
events_df = ocel.events.df
total_events = len(events_df)
value_counts = events_df[ACTIVITY_COL].value_counts()
min_count = int(total_events * (self.threshold_percentage / 100))
qualifying = set(value_counts[value_counts >= min_count].index)
mask = cast(Series, events_df[ACTIVITY_COL].isin(qualifying))
if self.mode == "exclude":
mask = ~mask
return FilterResult(events=mask)
class ObjectAttributeFilter(BaseFilter, AttributeFilterConfig):
Source
class ObjectAttributeFilter(BaseFilter, AttributeFilterConfig):
def filter(self, ocel):
enriched_objects = get_objects_with_object_changes(ocel.ocel)
filtered_rows = enriched_objects[
filter_by_attribute(
enriched_objects,
ocel.ocel.object_type_column,
config=AttributeFilterConfig(**self.model_dump()),
)
]
valid_ids = filtered_rows[ocel.ocel.object_id_column].unique() # type:ignore
return FilterResult(
objects=ocel.ocel.objects[ocel.ocel.object_id_column].isin(valid_ids) # type:ignore
)
class EventAttributeFilter(BaseFilter, AttributeFilterConfig):
Source
class EventAttributeFilter(BaseFilter, AttributeFilterConfig):
def filter(self, ocel):
return FilterResult(
events=filter_by_attribute(
ocel.events.df,
ocel.ocel.event_activity,
config=AttributeFilterConfig(**self.model_dump()),
)
)
class O2OCountFilter(BaseFilter, RelationCountFilterConfig):
Source
class O2OCountFilter(BaseFilter, RelationCountFilterConfig):
direction: Literal["source", "target"] = "source"
def filter(self, ocel):
is_source = self.direction == "source"
mask = filter_by_relation_counts(
entity_df=ocel.objects.df,
relation_df=ocel.o2o.typed_df,
source=self.source,
target=self.target,
qualifier=self.qualifier,
source_type_name=O2O_SOURCE_TYPE,
target_type_name=O2O_TARGET_TYPE,
qualifier_field_name=O2O_QUALIFIER,
counted_id_name=O2O_SOURCE_ID if is_source else O2O_TARGET_ID,
counted_type_value=self.source if is_source else self.target,
id_name=OID_COL,
type_name=OTYPE_COL,
min_count=self.range[0],
max_count=self.range[1],
)
return FilterResult(objects=mask)
class E2OCountFilter(BaseFilter, RelationCountFilterConfig):
Source
class E2OCountFilter(BaseFilter, RelationCountFilterConfig):
direction: Literal["source", "target"] = "source"
def filter(self, ocel):
is_source = self.direction == "source"
mask = filter_by_relation_counts(
entity_df=ocel.events.df if is_source else ocel.objects.df,
relation_df=ocel.e2o.df,
source=self.source,
target=self.target,
qualifier=self.qualifier,
source_type_name=E2O_ACTIVITY,
target_type_name=E2O_OBJECT_TYPE,
qualifier_field_name=E2O_QUALIFIER,
counted_id_name=E2O_EVENT_ID if is_source else E2O_OBJECT_ID,
counted_type_value=self.source if is_source else self.target,
id_name=EID_COL if is_source else OID_COL,
type_name=ACTIVITY_COL if is_source else OTYPE_COL,
min_count=self.range[0],
max_count=self.range[1],
)
return FilterResult(
events=mask if is_source else None, objects=mask if not is_source else None
)
class TimeFrameFilter(BaseFilter):
Source
class TimeFrameFilter(BaseFilter):
time_range: tuple[Optional[str], Optional[str]]
mode: Literal["exclude", "include"] = "include"
def filter(self, ocel):
start_time, end_time = self.time_range
if start_time is not None:
start_time = pd.Timestamp(start_time, tz="UTC")
if end_time is not None:
end_time = pd.Timestamp(end_time, tz="UTC")
events_df = ocel.events.df
mask = pd.Series([True] * len(events_df), index=events_df.index)
if start_time is not None:
mask &= events_df["ocel:timestamp"] >= start_time
if end_time is not None:
mask &= events_df["ocel:timestamp"] <= end_time
if self.mode == "exclude":
mask = ~mask
return FilterResult(events=mask)
class BaseFilter(ABC, BaseModel):
Source
class BaseFilter(ABC, BaseModel):
@abstractmethod
def filter(self, ocel: "OCEL") -> FilterResult:
pass
class FilterResult:
Source
@dataclass()
class FilterResult:
events: Optional[Series] = None
objects: Optional[Series] = None
def and_merge(self, other: "FilterResult") -> "FilterResult":
def _and(a, b):
if a is not None and b is not None:
return a & b
elif a is not None:
return a
elif b is not None:
return b
else:
return None
return FilterResult(
events=_and(self.events, other.events),
objects=_and(self.objects, other.objects),
)