nirdizati_light.encoding.feature_encoder.loreley_features

  1from functools import reduce
  2
  3from pandas import DataFrame
  4from pm4py.objects.log.obj import Trace, EventLog
  5
  6from nirdizati_light.encoding.constants import get_max_prefix_length, get_prefix_length, TaskGenerationType, PrefixLengthStrategy
  7from nirdizati_light.labeling.common import add_label_column
  8
  9ATTRIBUTE_CLASSIFIER = None
 10
 11PREFIX_ = 'prefix_'
 12PREFIX = 'prefix'
 13single_prefix = True
 14
 15
 16def loreley_features(log: EventLog, prefix_length, padding, prefix_length_strategy: str, labeling_type, generation_type, feature_list: list = None, target_event: str = None) -> DataFrame:
 17    max_prefix_length = get_max_prefix_length(log, prefix_length, prefix_length_strategy, target_event)
 18    columns, additional_columns = _columns_complex(log, max_prefix_length, feature_list)
 19    columns_number = len(columns)
 20    encoded_data = []
 21    for trace in log:
 22        trace_prefix_length = get_prefix_length(trace, prefix_length, prefix_length_strategy, target_event)
 23        if len(trace) <= prefix_length - 1 and not padding:
 24            # trace too short and no zero padding
 25            continue
 26        if generation_type == TaskGenerationType.ALL_IN_ONE.value:
 27            for event_index in range(1, min(trace_prefix_length + 1, len(trace) + 1)):
 28                encoded_data.append(_trace_to_row(trace, event_index, additional_columns, prefix_length_strategy, padding, columns, labeling_type))
 29        else:
 30            encoded_data.append(_trace_to_row(trace, trace_prefix_length, additional_columns, prefix_length_strategy, padding, columns, labeling_type))
 31    #change prefiz_i to prefix and update feature lsit
 32    return DataFrame(columns=columns, data=encoded_data)
 33
 34
 35def _get_global_trace_attributes(log: EventLog):
 36    # retrieves all traces in the log and returns their intersection
 37    attributes = list(reduce(set.intersection, [set(trace._get_attributes().keys()) for trace in log]))
 38    trace_attributes = [attr for attr in attributes if attr not in ["concept:name", "time:timestamp", "label"]]
 39    return sorted(trace_attributes)
 40
 41
 42def _get_global_event_attributes(log):
 43    """Get log event attributes that are not name or time
 44    """
 45    # retrieves all events in the log and returns their intersection
 46    attributes = list(reduce(set.intersection, [set(event._dict.keys()) for trace in log for event in trace]))
 47    event_attributes = [attr for attr in attributes if attr not in ["concept:name"]]
 48    return sorted(event_attributes)
 49
 50
 51def _compute_additional_columns(log) -> dict:
 52    return {'trace_attributes': _get_global_trace_attributes(log)}
 53
 54
 55def _columns_complex(log, prefix_length: int, feature_list: list = None) -> tuple:
 56    additional_columns = _compute_additional_columns(log)
 57    columns = ['trace_id']
 58    columns += additional_columns['trace_attributes']
 59    if single_prefix == True:
 60       # for i in range(1, prefix_length + 1):
 61            #for additional_column in additional_columns['event_attributes']:
 62            #    columns.append(additional_column + "_" + str(i))
 63        columns.insert(len(columns), PREFIX)
 64    else:
 65        for i in range(1, prefix_length + 1):
 66            columns.append(PREFIX_ + str(i))
 67            #for additional_column in additional_columns['event_attributes']:
 68            #    columns.append(additional_column + "_" + str(i))
 69
 70    columns += ['label']
 71    if feature_list is not None:
 72        assert(list(feature_list) == columns)
 73    return columns, additional_columns
 74
 75
 76def _data_complex(trace: Trace, prefix_length: int, additional_columns: dict) -> list:
 77    """Creates list in form [1, value1, value2, 2, ...]
 78
 79    Appends values in additional_columns
 80    """
 81    event_list = []
 82    data = [trace.attributes.get(att, 0) for att in additional_columns['trace_attributes']]
 83    for idx, event in enumerate(trace):
 84        if idx == prefix_length:
 85            break
 86        event_name = event["concept:name"]
 87        if single_prefix == True:
 88            event_list.append(event_name)
 89        else:
 90            data.append(event_name)
 91
 92#        for att in additional_columns['event_attributes']:
 93#            data.append(event.get(att, '0'))
 94    if single_prefix == True:
 95        data.append(event_list)
 96    return data
 97
 98
 99def _trace_to_row(trace: Trace, prefix_length: int, additional_columns, prefix_length_strategy: str, padding, columns: list, labeling_type) -> list:
100    trace_row = [trace.attributes["concept:name"]]
101    trace_row += _data_complex(trace, prefix_length, additional_columns)
102    if padding or prefix_length_strategy == PrefixLengthStrategy.PERCENTAGE.value:
103        trace_row += [0 for _ in range(len(trace_row), len(columns) - 1)]
104    trace_row += [add_label_column(trace, labeling_type, prefix_length)]
105    return trace_row
ATTRIBUTE_CLASSIFIER = None
PREFIX_ = 'prefix_'
PREFIX = 'prefix'
single_prefix = True
def loreley_features( log: pm4py.objects.log.obj.EventLog, prefix_length, padding, prefix_length_strategy: str, labeling_type, generation_type, feature_list: list = None, target_event: str = None) -> pandas.core.frame.DataFrame:
17def loreley_features(log: EventLog, prefix_length, padding, prefix_length_strategy: str, labeling_type, generation_type, feature_list: list = None, target_event: str = None) -> DataFrame:
18    max_prefix_length = get_max_prefix_length(log, prefix_length, prefix_length_strategy, target_event)
19    columns, additional_columns = _columns_complex(log, max_prefix_length, feature_list)
20    columns_number = len(columns)
21    encoded_data = []
22    for trace in log:
23        trace_prefix_length = get_prefix_length(trace, prefix_length, prefix_length_strategy, target_event)
24        if len(trace) <= prefix_length - 1 and not padding:
25            # trace too short and no zero padding
26            continue
27        if generation_type == TaskGenerationType.ALL_IN_ONE.value:
28            for event_index in range(1, min(trace_prefix_length + 1, len(trace) + 1)):
29                encoded_data.append(_trace_to_row(trace, event_index, additional_columns, prefix_length_strategy, padding, columns, labeling_type))
30        else:
31            encoded_data.append(_trace_to_row(trace, trace_prefix_length, additional_columns, prefix_length_strategy, padding, columns, labeling_type))
32    #change prefiz_i to prefix and update feature lsit
33    return DataFrame(columns=columns, data=encoded_data)