import logging from collections import defaultdict, OrderedDict from analysis.util import json_path from . import Result, LogSettings, Analyzer, ResultStore class LocationAnalyzer(Analyzer): """ store spatial log entries """ __name__ = "Location" log = logging.getLogger(__name__) def __init__(self, settings: LogSettings): super().__init__(settings) self.entries = [] def result(self, store: ResultStore, **kwargs) -> None: self.log.debug(len(self.entries)) store.add(Result(type(self), list(self.entries), name=kwargs['name'])) def process(self, entry: dict) -> bool: if entry[self.settings.type_field] in self.settings.spatials: self.entries.append(entry) # self.log.debug(len(self.entries)) return False class LogEntryCountAnalyzer(Analyzer): #TODO: flexibler: z.b. min/max lat/long """ count occurrences of log entry types """ __name__ = "LogEntryCount" def result(self, store: ResultStore) -> None: store.add(Result(type(self), dict(self.store))) def process(self, entry: dict) -> bool: self.store[entry[self.settings.type_field]] += 1 return False def __init__(self, settings: LogSettings): super().__init__(settings) self.store = defaultdict(lambda: 0) class LogEntrySequenceAnalyzer(Analyzer): """ store sequence of all log entry types """ __name__ = "LogEntrySequence" def result(self, store: ResultStore) -> None: store.add(Result(type(self), list(self.store))) def process(self, entry: dict) -> bool: entry_type = entry[self.settings.type_field] self.store.append(entry_type) return False def __init__(self, settings: LogSettings): super().__init__(settings) self.store = [] class ActionSequenceAnalyzer(LogEntrySequenceAnalyzer): """ find sequence of non-spatial log entry types """ __name__ = "ActionSequenceAnalyzer" def process(self, entry: dict) -> bool: entry_type = entry[self.settings.type_field] if entry_type in self.settings.spatials: return False self.store.append(entry_type) return False class CategorizerStub(Analyzer): """ generate a new Category in a ResultStore """ def process(self, entry: dict) -> bool: raise NotImplementedError() __name__ = "Categorizer" def result(self, store: ResultStore, name=None) -> None: store.new_category(name if name else self.key) def __init__(self, settings: LogSettings): super().__init__(settings) self.key = "default" class SimpleCategorizer(CategorizerStub): def process(self, entry): return False class Store(Analyzer): """ Store the entire log """ __name__ = "Store" def result(self, store: ResultStore) -> None: store.add(Result(type(self), list(self.store))) def process(self, entry: dict) -> bool: self.store.append(entry) return False def __init__(self, settings: LogSettings): super().__init__(settings) self.store: list = [] class ProgressAnalyzer(Analyzer): """track spatial and ingame progress""" __name__ = "ProgressAnalyzer" def __init__(self, settings: LogSettings) -> None: super().__init__(settings) self.spatial = OrderedDict() self.board = OrderedDict() def result(self, store: ResultStore) -> None: store.add(Result(type(self), {"spatials": self.spatial, "boards": self.board})) def process(self, entry: dict) -> bool: if entry[self.settings.type_field] in self.settings.spatials: self.spatial[entry["timestamp"]] = { 'timestamp': entry['timestamp'], 'coordinates': json_path(entry, "location.coordinates"), 'accuracy': entry['accuracy'] } if entry[self.settings.type_field] in self.settings.boards: self.board[entry["timestamp"]] = entry return False class MetaDataAnalyzer(Analyzer): """collect metadata""" __name__ = "MetaDataAnalyzer" def result(self, store: ResultStore, name=None) -> None: store.add(Result(type(self), dict(self.store))) def process(self, entry: dict) -> bool: if not "metadata" in self.settings.custom: return False for mdata in self.settings.custom["metadata"]: key = self.settings.custom["metadata"] if key in entry: self.store[mdata] = json_path(entry, key) def __init__(self, settings: LogSettings) -> None: super().__init__(settings) self.store = {} def write_logentry_count_csv(LogEntryCountCSV, store, render, analyzers): global cat, data, lines, csvfile LogEntryCountCSV.summary = None for cat in store.get_categories(): data = store.get_category(cat) render(analyzers.LogEntryCountAnalyzer, data, name=cat) if LogEntryCountCSV.summary: headers = [] lines = [] for name in LogEntryCountCSV.summary: data = LogEntryCountCSV.summary[name] for head in data: if not head in headers: headers.append(head) line = [name] for head in headers: line.append(data[head]) if head in data else line.append(0) lines.append(line) import csv with open('logentrycount.csv', 'w', newline='') as csvfile: writer = csv.writer(csvfile, quoting=csv.QUOTE_NONE) writer.writerow(["name"] + [h.split(".")[-1] for h in headers]) for line in lines: writer.writerow(line) def write_simulation_flag_csv(store): global csvfile, result, i from datetime import datetime import json json.dump(store.serializable(), open("simus.json", "w"), indent=2) with open("simus.csv", "w") as csvfile: csvfile.write("instanceconfig,log,simu,answered,universe_state,selected_actions,timestamp,time\n") for key in store.get_store(): csvfile.write("{}\n".format(key)) for result in store.store[key]: csvfile.write(",{}\n".format(result.name)) for i in result.get(): csvfile.write(",,{},{},{},{},{},{}\n".format( i['answers']['@id'], i['answers']['answered'], len(i['answers']['universe_state']) if i['answers']['universe_state'] else 0, len(i['selected_actions']) if i['selected_actions'] else 0, i['timestamp'], str(datetime.fromtimestamp(i['timestamp'] / 1000)) ))