project/tasks/tasks.py

87 lines
2.4 KiB
Python

import json
import logging
import shutil
import uuid
import os.path
import os
import redis as redis_lib
import time
from celery import Celery
from analysis import log_analyzer as la
from analysis.analyzers import KMLRender, ActivityMapperRender
from clients.webclients import CLIENTS
FLASK_DB = 1
REDIS_HOST = "redis"
DATA_PATH = "/app/data/results/"
RENDERERS = { # TODO
"KMLRender": KMLRender,
"ActivityMapper": ActivityMapperRender,
}
app = Celery('tasks', backend='redis://redis', broker='redis://redis')
redis = redis_lib.StrictRedis(host=REDIS_HOST, db=FLASK_DB)
log: logging.Logger = logging.getLogger(__name__)
def update_status(username, name, state, **kwargs):
status = json.loads(redis.get(username))
status[name][state[0]] = time.strftime("%c")
status[name]['status'] = state[1]
for i in kwargs:
status[name][i] = kwargs[i]
redis.set(username, json.dumps(status))
@app.task
def analyze(config, log_ids, **kwargs):
update_status(kwargs['username'], kwargs['name'], ('load', 'LOADING'))
try:
log.info("start analysis")
client = CLIENTS[kwargs['clientName']](host=kwargs['host'], **kwargs['cookies'])
logs = client.list()
id_urls = {str(x['@id']): x['file_url'] for x in logs}
urls = [id_urls[i] for i in log_ids]
tmpdir = client.download_files(urls)
log.info(tmpdir.name, list(os.scandir(tmpdir.name)))
update_status(kwargs['username'], kwargs['name'], ('start', 'RUNNING'))
settings = la.parse_settings(config)
store = la.run_analysis([p.path for p in os.scandir(tmpdir.name)], settings, la.LOADERS)
render = RENDERERS[settings.render[0]]() # TODO
files = render.render(store.get_all())
uid = str(uuid.uuid4())
results = []
log.error(files)
os.mkdir(os.path.join(DATA_PATH, uid))
for file in files:
try:
head, tail = os.path.split(file)
target = os.path.join(DATA_PATH, uid, tail)
shutil.move(file, target)
results.append(target)
except FileNotFoundError as e:
log.exception(e)
tmpdir.cleanup()
update_status(kwargs['username'], kwargs['name'], ('done', 'FINISHED'), results=results)
except Exception as e:
log.exception(e)
update_status(kwargs['username'], kwargs['name'], ('abort', 'ERROR'), exception=str(e))
def status_update(key, status_key, status):
record = redis.get(key)
if not record:
redis.set(key, json.dumps({status_key: status}))
else:
data = json.loads(record)
data[status_key] = status
redis.set(key, json.dumps(data))
redis.save()