nirdizati_light.encoding.feature_encoder.binary_features
1from collections import Counter 2from datetime import timedelta 3from functools import reduce 4 5from pandas import DataFrame 6from pm4py.objects.log.obj import EventLog, Trace, Event 7 8from nirdizati_light.encoding.constants import PrefixLengthStrategy, get_max_prefix_length, get_prefix_length, TaskGenerationType 9from nirdizati_light.labeling.common import add_label_column 10 11PREFIX_ = 'prefix_' 12 13 14def binary_features(log: EventLog, prefix_length, padding, prefix_length_strategy: str, labeling_type, generation_type, feature_list: list = None, target_event: str = None) -> DataFrame: 15 if feature_list is None: 16 max_prefix_length = get_max_prefix_length(log, prefix_length, prefix_length_strategy, target_event) 17 feature_list,additional_columns = _compute_columns(log, max_prefix_length, padding) 18 encoded_data = [] 19 for trace in log: 20 trace_prefix_length = get_prefix_length(trace, prefix_length, prefix_length_strategy, target_event) 21 if len(trace) <= prefix_length - 1 and not padding: 22 # trace too short and no zero padding 23 continue 24 if generation_type == TaskGenerationType.ALL_IN_ONE.value: 25 for event_index in range(1, min(trace_prefix_length + 1, len(trace) + 1)): 26 encoded_data.append(_trace_to_row(trace, event_index, feature_list, padding, labeling_type)) 27 else: 28 encoded_data.append(_trace_to_row(trace, trace_prefix_length, feature_list, padding, labeling_type)) 29 30 return DataFrame(columns=feature_list, data=encoded_data) 31 32 33def _data_complex(trace: Trace, prefix_length: int, additional_columns: dict) -> list: 34 """Creates list in form [1, value1, value2, 2, ...] 35 36 Appends values in additional_columns 37 """ 38 data = [trace.attributes.get(att, 0) for att in additional_columns['trace_attributes']] 39 return data 40 41 42def _trace_to_row(trace: Trace, prefix_length: int, columns: list, padding: bool = True, labeling_type: str = None,additional_columns: list = None) -> list: 43 """Row in data frame""" 44 45 trace_row = [ trace.attributes['concept:name'] ] 46 trace_row += _data_complex(trace, prefix_length, additional_columns) 47 48 if len(trace) <= prefix_length - 1 and not padding: 49 pass 50 trace += [ 51 Event({ 52 'concept:name': '0', 53 'time:timestamp': trace[len(trace)] + timedelta(hours=i) 54 }) 55 for i in range(len(trace), prefix_length + 1) 56 ] 57 58 occurences = Counter([ 59 event['concept:name'] 60 for event in trace[:prefix_length] 61 ]) 62 cleaned_comumns = columns[1:-1] 63 trace_row += [ occurences[col] for col in cleaned_comumns ] 64 trace_row += [ add_label_column(trace, labeling_type, prefix_length) ] 65 return trace_row 66 67 68# def _trace_to_row(trace: Trace, prefix_length: int, additional_columns, prefix_length_strategy: str, padding, columns: list, labeling_type) -> list: 69# trace_row = [trace.attributes["concept:name"]] 70# trace_row += _data_complex(trace, prefix_length, additional_columns) 71# if padding or prefix_length_strategy == PrefixLengthStrategy.PERCENTAGE.value: 72# trace_row += [0 for _ in range(len(trace_row), len(columns) - 1)] 73# trace_row += [add_label_column(trace, labeling_type, prefix_length)] 74# return trace_row 75 76 77def _get_global_trace_attributes(log: EventLog): 78 # retrieves all traces in the log and returns their intersection 79 attributes = list(reduce(set.intersection, [set(trace._get_attributes().keys()) for trace in log])) 80 trace_attributes = [attr for attr in attributes if attr not in ["concept:name", "time:timestamp", "label"]] 81 return sorted(trace_attributes) 82 83 84def _compute_columns(log: EventLog, prefix_length: int, padding: bool) -> list: 85 """trace_id, prefixes, any other columns, label 86 87 """ 88 additional_columns = _compute_additional_columns(log) 89 columns = ['trace_id'] 90 columns += additional_columns['trace_attributes'] 91 ret_val = [] 92 ret_val += sorted(list({ 93 event['concept:name'] 94 for trace in log 95 for event in trace[:prefix_length] 96 })) 97 ret_val += ['0'] if padding else [] 98 ret_val += ['label'] 99 100 return ret_val,additional_columns 101 102 103def _compute_additional_columns(log) -> dict: 104 return {'trace_attributes': _get_global_trace_attributes(log)} 105 106 107def _get_global_trace_attributes(log: EventLog): 108 # retrieves all traces in the log and returns their intersection 109 attributes = list(reduce(set.intersection, [set(trace._get_attributes().keys()) for trace in log])) 110 trace_attributes = [attr for attr in attributes if attr not in ["concept:name", "time:timestamp", "label"]] 111 return sorted(trace_attributes)
PREFIX_ =
'prefix_'
def
binary_features( log: pm4py.objects.log.obj.EventLog, prefix_length, padding, prefix_length_strategy: str, labeling_type, generation_type, feature_list: list = None, target_event: str = None) -> pandas.core.frame.DataFrame:
15def binary_features(log: EventLog, prefix_length, padding, prefix_length_strategy: str, labeling_type, generation_type, feature_list: list = None, target_event: str = None) -> DataFrame: 16 if feature_list is None: 17 max_prefix_length = get_max_prefix_length(log, prefix_length, prefix_length_strategy, target_event) 18 feature_list,additional_columns = _compute_columns(log, max_prefix_length, padding) 19 encoded_data = [] 20 for trace in log: 21 trace_prefix_length = get_prefix_length(trace, prefix_length, prefix_length_strategy, target_event) 22 if len(trace) <= prefix_length - 1 and not padding: 23 # trace too short and no zero padding 24 continue 25 if generation_type == TaskGenerationType.ALL_IN_ONE.value: 26 for event_index in range(1, min(trace_prefix_length + 1, len(trace) + 1)): 27 encoded_data.append(_trace_to_row(trace, event_index, feature_list, padding, labeling_type)) 28 else: 29 encoded_data.append(_trace_to_row(trace, trace_prefix_length, feature_list, padding, labeling_type)) 30 31 return DataFrame(columns=feature_list, data=encoded_data)