nirdizati_light.encoding.feature_encoder.loreley_complex_features
1from functools import reduce 2 3from pandas import DataFrame 4from pm4py.objects.log.obj import Trace, EventLog 5 6from nirdizati_light.encoding.constants import get_max_prefix_length, get_prefix_length, TaskGenerationType, PrefixLengthStrategy 7from nirdizati_light.labeling.common import add_label_column 8 9ATTRIBUTE_CLASSIFIER = None 10 11PREFIX_ = 'prefix_' 12PREFIX = 'prefix' 13single_prefix = True 14 15 16def loreley_complex_features(log: EventLog, prefix_length, padding, prefix_length_strategy: str, labeling_type, generation_type, feature_list: list = None, target_event: str = None) -> DataFrame: 17 max_prefix_length = get_max_prefix_length(log, prefix_length, prefix_length_strategy, target_event) 18 columns, additional_columns = _columns_complex(log, max_prefix_length, feature_list) 19 columns_number = len(columns) 20 encoded_data = [] 21 for trace in log: 22 trace_prefix_length = get_prefix_length(trace, prefix_length, prefix_length_strategy, target_event) 23 if len(trace) <= prefix_length - 1 and not padding: 24 # trace too short and no zero padding 25 continue 26 if generation_type == TaskGenerationType.ALL_IN_ONE.value: 27 for event_index in range(1, min(trace_prefix_length + 1, len(trace) + 1)): 28 encoded_data.append(_trace_to_row(trace, event_index, additional_columns, prefix_length_strategy, padding, columns, labeling_type)) 29 else: 30 encoded_data.append(_trace_to_row(trace, trace_prefix_length, additional_columns, prefix_length_strategy, padding, columns, labeling_type)) 31 #change prefiz_i to prefix and update feature lsit 32 return DataFrame(columns=columns, data=encoded_data) 33 34 35def _get_global_trace_attributes(log: EventLog): 36 # retrieves all traces in the log and returns their intersection 37 attributes = list(reduce(set.intersection, [set(trace._get_attributes().keys()) for trace in log])) 38 trace_attributes = [attr for attr in attributes if attr not in ["concept:name", "time:timestamp", "label"]] 39 return sorted(trace_attributes) 40 41 42def _get_global_event_attributes(log): 43 """Get log event attributes that are not name or time 44 """ 45 # retrieves all events in the log and returns their intersection 46 attributes = list(reduce(set.intersection, [set(event._dict.keys()) for trace in log for event in trace])) 47 event_attributes = [attr for attr in attributes if attr not in ["concept:name"]] 48 return sorted(event_attributes) 49 50 51def _compute_additional_columns(log) -> dict: 52 return {'trace_attributes': _get_global_trace_attributes(log), 53 'event_attributes': _get_global_event_attributes(log)} 54 55 56def _columns_complex(log, prefix_length: int, feature_list: list = None) -> tuple: 57 additional_columns = _compute_additional_columns(log) 58 columns = ['trace_id'] 59 columns += additional_columns['trace_attributes'] 60 for i in range(1, prefix_length + 1): 61 for additional_column in additional_columns['event_attributes']: 62 columns.append(additional_column + "_" + str(i)) 63 columns.insert(len(columns), PREFIX) 64 65 columns += ['label'] 66 if feature_list is not None: 67 assert(list(feature_list) == columns) 68 return columns, additional_columns 69 70 71def _data_complex(trace: Trace, prefix_length: int, additional_columns: dict) -> list: 72 """Creates list in form [1, value1, value2, 2, ...] 73 74 Appends values in additional_columns 75 """ 76 event_list = [] 77 data = [trace.attributes.get(att, 0) for att in additional_columns['trace_attributes']] 78 for idx, event in enumerate(trace): 79 if idx == prefix_length: 80 break 81 event_name = event["concept:name"] 82 event_list.append(event_name) 83 84 for att in additional_columns['event_attributes']: 85 data.append(event.get(att, '0')) 86 data.append(event_list) 87 return data 88 89 90def _trace_to_row(trace: Trace, prefix_length: int, additional_columns, prefix_length_strategy: str, padding, columns: list, labeling_type) -> list: 91 trace_row = [trace.attributes["concept:name"]] 92 trace_row += _data_complex(trace, prefix_length, additional_columns) 93 if padding or prefix_length_strategy == PrefixLengthStrategy.PERCENTAGE.value: 94 trace_row += [0 for _ in range(len(trace_row), len(columns) - 1)] 95 trace_row += [add_label_column(trace, labeling_type, prefix_length)] 96 return trace_row
ATTRIBUTE_CLASSIFIER =
None
PREFIX_ =
'prefix_'
PREFIX =
'prefix'
single_prefix =
True
def
loreley_complex_features( log: pm4py.objects.log.obj.EventLog, prefix_length, padding, prefix_length_strategy: str, labeling_type, generation_type, feature_list: list = None, target_event: str = None) -> pandas.core.frame.DataFrame:
17def loreley_complex_features(log: EventLog, prefix_length, padding, prefix_length_strategy: str, labeling_type, generation_type, feature_list: list = None, target_event: str = None) -> DataFrame: 18 max_prefix_length = get_max_prefix_length(log, prefix_length, prefix_length_strategy, target_event) 19 columns, additional_columns = _columns_complex(log, max_prefix_length, feature_list) 20 columns_number = len(columns) 21 encoded_data = [] 22 for trace in log: 23 trace_prefix_length = get_prefix_length(trace, prefix_length, prefix_length_strategy, target_event) 24 if len(trace) <= prefix_length - 1 and not padding: 25 # trace too short and no zero padding 26 continue 27 if generation_type == TaskGenerationType.ALL_IN_ONE.value: 28 for event_index in range(1, min(trace_prefix_length + 1, len(trace) + 1)): 29 encoded_data.append(_trace_to_row(trace, event_index, additional_columns, prefix_length_strategy, padding, columns, labeling_type)) 30 else: 31 encoded_data.append(_trace_to_row(trace, trace_prefix_length, additional_columns, prefix_length_strategy, padding, columns, labeling_type)) 32 #change prefiz_i to prefix and update feature lsit 33 return DataFrame(columns=columns, data=encoded_data)