Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = "poetry.core.masonry.api"

[tool.poetry]
name = "simod"
version = "5.1.4"
version = "5.1.5"
authors = [
"Ihar Suvorau <ihar.suvorau@gmail.com>",
"David Chapela <david.chapela@ut.ee>",
Expand Down
79 changes: 78 additions & 1 deletion src/simod/control_flow/discovery.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
import uuid
from dataclasses import dataclass
from pathlib import Path
from typing import List, Tuple

from lxml import etree

from simod.cli_formatter import print_step
from simod.control_flow.settings import HyperoptIterationParams
from simod.settings.control_flow_settings import (
Expand Down Expand Up @@ -49,8 +52,82 @@ def discover_process_model(log_path: Path, output_model_path: Path, params: Hype
discover_process_model_with_split_miner_v2(SplitMinerV2Settings(log_path, output_model_path, params.epsilon))
else:
raise ValueError(f"Unknown process model discovery algorithm: {params.mining_algorithm}")

# Assert that model file was created
assert output_model_path.exists(), f"Error trying to discover the process model in '{output_model_path}'."
# Post-process to transform implicit activity self-loops into explicit (modeled through gateways)
print(f"Post-processing discovered process model to explicitly model self-loops through gateways.")
post_process_bpmn_self_loops(output_model_path)


def _generate_node_id():
return f"node_{uuid.uuid4()}"


def post_process_bpmn_self_loops(bpmn_model_path: Path):
tree = etree.parse(bpmn_model_path)
root = tree.getroot()
nsmap = root.nsmap

bpmn_namespace = nsmap.get(None, "http://www.omg.org/spec/BPMN/20100524/MODEL")
ns = {"bpmn": bpmn_namespace}

tasks = root.findall(".//bpmn:task", namespaces=ns)
sequence_flows = root.findall(".//bpmn:sequenceFlow", namespaces=ns)
process = root.find(".//bpmn:process", namespaces=ns)

for task in tasks:
loop_characteristics = task.find("bpmn:standardLoopCharacteristics", namespaces=ns)
if loop_characteristics is not None:
# Task with self-loop
task_id = task.get("id")
# Remove loop characteristics
task.remove(loop_characteristics)
# Generate unique IDs
gt1_id = _generate_node_id()
gt2_id = _generate_node_id()
sf1_id = _generate_node_id()
sf2_id = _generate_node_id()
sf3_id = _generate_node_id()
# Create exclusive gateways with attributes
gt1 = etree.Element("{%s}exclusiveGateway" % bpmn_namespace, id=gt1_id, gatewayDirection="Converging")
gt2 = etree.Element("{%s}exclusiveGateway" % bpmn_namespace, id=gt2_id, gatewayDirection="Diverging")
process.append(gt1)
process.append(gt2)
# Modify existing sequence flows
incoming_gt1_1, outgoing_gt2_1 = None, None
for sf in sequence_flows:
if sf.get("targetRef") == task_id:
sf.set("targetRef", gt1_id)
incoming_gt1_1 = etree.Element("{%s}incoming" % bpmn_namespace)
incoming_gt1_1.text = sf.get("id")
if sf.get("sourceRef") == task_id:
sf.set("sourceRef", gt2_id)
outgoing_gt2_1 = etree.Element("{%s}outgoing" % bpmn_namespace)
outgoing_gt2_1.text = sf.get("id")
# Create new sequence flows
sf1 = etree.Element("{%s}sequenceFlow" % bpmn_namespace, id=sf1_id, sourceRef=gt1_id, targetRef=task_id)
process.append(sf1)
sf2 = etree.Element("{%s}sequenceFlow" % bpmn_namespace, id=sf2_id, sourceRef=task_id, targetRef=gt2_id)
process.append(sf2)
sf3 = etree.Element("{%s}sequenceFlow" % bpmn_namespace, id=sf3_id, sourceRef=gt2_id, targetRef=gt1_id)
process.append(sf3)
# Add incoming and outgoing elements for gateways
outgoing_gt1_1 = etree.Element("{%s}outgoing" % bpmn_namespace)
outgoing_gt1_1.text = sf1_id
incoming_gt1_2 = etree.Element("{%s}incoming" % bpmn_namespace)
incoming_gt1_2.text = sf3_id
incoming_gt2_1 = etree.Element("{%s}incoming" % bpmn_namespace)
incoming_gt2_1.text = sf2_id
outgoing_gt2_2 = etree.Element("{%s}outgoing" % bpmn_namespace)
outgoing_gt2_2.text = sf3_id
gt1.append(incoming_gt1_1)
gt1.append(incoming_gt1_2)
gt1.append(outgoing_gt1_1)
gt2.append(incoming_gt2_1)
gt2.append(outgoing_gt2_1)
gt2.append(outgoing_gt2_2)
# Write to file
tree.write(bpmn_model_path, xml_declaration=True, encoding="UTF-8", pretty_print=True)


def add_bpmn_diagram_to_model(bpmn_model_path: Path):
Expand Down
11 changes: 8 additions & 3 deletions src/simod/control_flow/optimizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,10 @@ def __init__(self, event_log: EventLog, bps_model: BPSModel, settings: ControlFl
# Not provided, create path to best discovered model
self._need_to_discover_model = True
# Export training log (XES format) for SplitMiner
self._xes_train_log_path = self.base_directory / (self.event_log.process_name + ".xes")
self.event_log.train_to_xes(self._xes_train_log_path)
self._xes_train_both_timestamps_log_path = self.base_directory / (self.event_log.process_name + ".xes")
self.event_log.train_to_xes(self._xes_train_both_timestamps_log_path)
self._xes_train_only_end_log_path = self.base_directory / (self.event_log.process_name + "_only_end.xes")
self.event_log.train_to_xes(self._xes_train_only_end_log_path, only_complete_events=True)
else:
# Process model provided
self._need_to_discover_model = False
Expand Down Expand Up @@ -360,7 +362,10 @@ def _process_measurements(self, params: HyperoptIterationParams, status, evaluat
def _discover_process_model(self, params: HyperoptIterationParams) -> Path:
print_step(f"Discovering Process Model with {params.mining_algorithm.value}")
output_model_path = get_process_model_path(params.output_dir, self.event_log.process_name)
discover_process_model(self._xes_train_log_path, output_model_path, params)
if params.mining_algorithm is ProcessModelDiscoveryAlgorithm.SPLIT_MINER_V1:
discover_process_model(self._xes_train_only_end_log_path, output_model_path, params)
else:
discover_process_model(self._xes_train_both_timestamps_log_path, output_model_path, params)
return output_model_path

def _discover_branch_rules(self, process_model: Path, params: HyperoptIterationParams) -> List[BranchRules]:
Expand Down
71 changes: 46 additions & 25 deletions src/simod/event_log/event_log.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,12 @@
from typing import Optional

import pandas as pd
import pendulum
from openxes_cli.lib import csv_to_xes
from pix_framework.io.event_log import DEFAULT_XES_IDS, EventLogIDs, read_csv_log
from pix_framework.io.event_log import split_log_training_validation_trace_wise as split_log

from .preprocessor import Preprocessor
from .utilities import convert_df_to_xes
from ..settings.preprocessing_settings import PreprocessingSettings
from ..utilities import get_process_name_from_log_path

Expand Down Expand Up @@ -150,79 +151,99 @@ def from_path(
process_name=get_process_name_from_log_path(train_log_path) if process_name is None else process_name,
)

def train_to_xes(self, path: Path):
def train_to_xes(self, path: Path, only_complete_events: bool = False):
"""
Saves the training log to an XES file.

Parameters
----------
path : :class:`pathlib.Path`
Destination path for the XES file.
only_complete_events : bool
If true, generate XES file containing only events corresponding to
the end of each activity instance.
"""
write_xes(self.train_partition, self.log_ids, path)
write_xes(self.train_partition, self.log_ids, path, only_complete_events=only_complete_events)

def validation_to_xes(self, path: Path):
def validation_to_xes(self, path: Path, only_complete_events: bool = False):
"""
Saves the validation log to an XES file.

Parameters
----------
path : :class:`pathlib.Path`
Destination path for the XES file.
only_complete_events : bool
If true, generate XES file containing only events corresponding to
the end of each activity instance.
"""
write_xes(self.validation_partition, self.log_ids, path)
write_xes(self.validation_partition, self.log_ids, path, only_complete_events=only_complete_events)

def train_validation_to_xes(self, path: Path):
def train_validation_to_xes(self, path: Path, only_complete_events: bool = False):
"""
Saves the combined training and validation log to an XES file.

Parameters
----------
path : :class:`pathlib.Path`
Destination path for the XES file.
only_complete_events : bool
If true, generate XES file containing only events corresponding to
the end of each activity instance.
"""
write_xes(self.train_validation_partition, self.log_ids, path)
write_xes(self.train_validation_partition, self.log_ids, path, only_complete_events=only_complete_events)

def test_to_xes(self, path: Path):
def test_to_xes(self, path: Path, only_complete_events: bool = False):
"""
Saves the test log to an XES file.

Parameters
----------
path : :class:`pathlib.Path`
Destination path for the XES file.
only_complete_events : bool
If true, generate XES file containing only events corresponding to
the end of each activity instance.
"""
write_xes(self.test_partition, self.log_ids, path)
write_xes(self.test_partition, self.log_ids, path, only_complete_events=only_complete_events)


def write_xes(
log: pd.DataFrame,
event_log: pd.DataFrame,
log_ids: EventLogIDs,
output_path: Path,
only_complete_events: bool = False,
):
"""
Writes the log to a file in XES format.
"""
df = log.rename(
# Copy event log to modify
df = event_log.copy()
# Transform timestamps to expected format
xes_datetime_format = "YYYY-MM-DDTHH:mm:ss.SSSZ"
# Start time
if only_complete_events:
df[log_ids.start_time] = ""
else:
df[log_ids.start_time] = df[log_ids.start_time].apply(
lambda x: pendulum.parse(x.isoformat()).format(xes_datetime_format)
)
# End time
df[log_ids.end_time] = df[log_ids.end_time].apply(
lambda x: pendulum.parse(x.isoformat()).format(xes_datetime_format)
)
# Rename columns to XES expected
df = df.rename(
columns={
log_ids.activity: "concept:name",
log_ids.case: "case:concept:name",
log_ids.resource: "org:resource",
log_ids.start_time: "start_timestamp",
log_ids.end_time: "time:timestamp",
}
)

df = df[
[
"case:concept:name",
"concept:name",
"org:resource",
"start_timestamp",
"time:timestamp",
]
]

)[["case:concept:name", "concept:name", "org:resource", "start_timestamp", "time:timestamp", ]]
# Fill null values
df.fillna("UNDEFINED", inplace=True)

convert_df_to_xes(df, DEFAULT_XES_IDS, output_path)
# Write and convert
df.to_csv(output_path, index=False)
csv_to_xes(output_path, output_path)
18 changes: 0 additions & 18 deletions src/simod/event_log/utilities.py

This file was deleted.

2 changes: 1 addition & 1 deletion src/simod/settings/common_settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ def _from_str(cls, value: str) -> "Metric":
return cls.THREE_GRAM_DISTANCE
elif value.lower() in ["circadian_event_distribution", "circadian_emd"]:
return cls.CIRCADIAN_EMD
elif value.lower() in ["circadian_workforce_distribution", "workforce_emd", "workforce_distribution"]:
elif value.lower() in ["circadian_workforce_distribution", "workforce_emd", "circadian_workforce"]:
return cls.CIRCADIAN_WORKFORCE_EMD
elif value.lower() in ["arrival_event_distribution", "arrival_emd"]:
return cls.ARRIVAL_EMD
Expand Down
Loading