nirdizati_light.encoding.feature_encoder.loreley_complex_features

 1from functools import reduce
 2
 3from pandas import DataFrame
 4from pm4py.objects.log.obj import Trace, EventLog
 5
 6from nirdizati_light.encoding.constants import get_max_prefix_length, get_prefix_length, TaskGenerationType, PrefixLengthStrategy
 7from nirdizati_light.labeling.common import add_label_column
 8
 9ATTRIBUTE_CLASSIFIER = None
10
11PREFIX_ = 'prefix_'
12PREFIX = 'prefix'
13single_prefix = True
14
15
16def loreley_complex_features(log: EventLog, prefix_length, padding, prefix_length_strategy: str, labeling_type, generation_type, feature_list: list = None, target_event: str = None) -> DataFrame:
17    max_prefix_length = get_max_prefix_length(log, prefix_length, prefix_length_strategy, target_event)
18    columns, additional_columns = _columns_complex(log, max_prefix_length, feature_list)
19    columns_number = len(columns)
20    encoded_data = []
21    for trace in log:
22        trace_prefix_length = get_prefix_length(trace, prefix_length, prefix_length_strategy, target_event)
23        if len(trace) <= prefix_length - 1 and not padding:
24            # trace too short and no zero padding
25            continue
26        if generation_type == TaskGenerationType.ALL_IN_ONE.value:
27            for event_index in range(1, min(trace_prefix_length + 1, len(trace) + 1)):
28                encoded_data.append(_trace_to_row(trace, event_index, additional_columns, prefix_length_strategy, padding, columns, labeling_type))
29        else:
30            encoded_data.append(_trace_to_row(trace, trace_prefix_length, additional_columns, prefix_length_strategy, padding, columns, labeling_type))
31    #change prefiz_i to prefix and update feature lsit
32    return DataFrame(columns=columns, data=encoded_data)
33
34
35def _get_global_trace_attributes(log: EventLog):
36    # retrieves all traces in the log and returns their intersection
37    attributes = list(reduce(set.intersection, [set(trace._get_attributes().keys()) for trace in log]))
38    trace_attributes = [attr for attr in attributes if attr not in ["concept:name", "time:timestamp", "label"]]
39    return sorted(trace_attributes)
40
41
42def _get_global_event_attributes(log):
43    """Get log event attributes that are not name or time
44    """
45    # retrieves all events in the log and returns their intersection
46    attributes = list(reduce(set.intersection, [set(event._dict.keys()) for trace in log for event in trace]))
47    event_attributes = [attr for attr in attributes if attr not in ["concept:name"]]
48    return sorted(event_attributes)
49
50
51def _compute_additional_columns(log) -> dict:
52    return {'trace_attributes': _get_global_trace_attributes(log),
53            'event_attributes': _get_global_event_attributes(log)}
54
55
56def _columns_complex(log, prefix_length: int, feature_list: list = None) -> tuple:
57    additional_columns = _compute_additional_columns(log)
58    columns = ['trace_id']
59    columns += additional_columns['trace_attributes']
60    for i in range(1, prefix_length + 1):
61        for additional_column in additional_columns['event_attributes']:
62            columns.append(additional_column + "_" + str(i))
63    columns.insert(len(columns), PREFIX)
64
65    columns += ['label']
66    if feature_list is not None:
67        assert(list(feature_list) == columns)
68    return columns, additional_columns
69
70
71def _data_complex(trace: Trace, prefix_length: int, additional_columns: dict) -> list:
72    """Creates list in form [1, value1, value2, 2, ...]
73
74    Appends values in additional_columns
75    """
76    event_list = []
77    data = [trace.attributes.get(att, 0) for att in additional_columns['trace_attributes']]
78    for idx, event in enumerate(trace):
79        if idx == prefix_length:
80            break
81        event_name = event["concept:name"]
82        event_list.append(event_name)
83
84        for att in additional_columns['event_attributes']:
85            data.append(event.get(att, '0'))
86    data.append(event_list)
87    return data
88
89
90def _trace_to_row(trace: Trace, prefix_length: int, additional_columns, prefix_length_strategy: str, padding, columns: list, labeling_type) -> list:
91    trace_row = [trace.attributes["concept:name"]]
92    trace_row += _data_complex(trace, prefix_length, additional_columns)
93    if padding or prefix_length_strategy == PrefixLengthStrategy.PERCENTAGE.value:
94        trace_row += [0 for _ in range(len(trace_row), len(columns) - 1)]
95    trace_row += [add_label_column(trace, labeling_type, prefix_length)]
96    return trace_row
ATTRIBUTE_CLASSIFIER = None
PREFIX_ = 'prefix_'
PREFIX = 'prefix'
single_prefix = True
def loreley_complex_features( log: pm4py.objects.log.obj.EventLog, prefix_length, padding, prefix_length_strategy: str, labeling_type, generation_type, feature_list: list = None, target_event: str = None) -> pandas.core.frame.DataFrame:
17def loreley_complex_features(log: EventLog, prefix_length, padding, prefix_length_strategy: str, labeling_type, generation_type, feature_list: list = None, target_event: str = None) -> DataFrame:
18    max_prefix_length = get_max_prefix_length(log, prefix_length, prefix_length_strategy, target_event)
19    columns, additional_columns = _columns_complex(log, max_prefix_length, feature_list)
20    columns_number = len(columns)
21    encoded_data = []
22    for trace in log:
23        trace_prefix_length = get_prefix_length(trace, prefix_length, prefix_length_strategy, target_event)
24        if len(trace) <= prefix_length - 1 and not padding:
25            # trace too short and no zero padding
26            continue
27        if generation_type == TaskGenerationType.ALL_IN_ONE.value:
28            for event_index in range(1, min(trace_prefix_length + 1, len(trace) + 1)):
29                encoded_data.append(_trace_to_row(trace, event_index, additional_columns, prefix_length_strategy, padding, columns, labeling_type))
30        else:
31            encoded_data.append(_trace_to_row(trace, trace_prefix_length, additional_columns, prefix_length_strategy, padding, columns, labeling_type))
32    #change prefiz_i to prefix and update feature lsit
33    return DataFrame(columns=columns, data=encoded_data)