hcornet 506716e703
Some checks failed
Deployment Verification / deploy-and-test (push) Failing after 29s
first sync
2025-03-04 07:59:21 +01:00

382 lines
12 KiB
Python

#!/usr/bin/env python3
#
# IRIS Source Code
# Copyright (C) 2021 - Airbus CyberSecurity (SAS)
# ir@cyberactionlab.net
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 3 of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program; if not, write to the Free Software Foundation,
# Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
import marshmallow
# IMPORTS ------------------------------------------------
from datetime import datetime
from datetime import timedelta
from flask import Blueprint
from flask import redirect
from flask import render_template
from flask import request
from flask import session
from flask import url_for
from flask_login import current_user
from flask_login import logout_user
from flask_wtf import FlaskForm
from sqlalchemy import distinct
from app import app
from app import db
from app.datamgmt.dashboard.dashboard_db import get_global_task, list_user_cases, list_user_reviews
from app.datamgmt.dashboard.dashboard_db import get_tasks_status
from app.datamgmt.dashboard.dashboard_db import list_global_tasks
from app.datamgmt.dashboard.dashboard_db import list_user_tasks
from app.forms import CaseGlobalTaskForm
from app.iris_engine.module_handler.module_handler import call_modules_hook
from app.iris_engine.utils.tracker import track_activity
from app.models.authorization import User
from app.models.cases import Cases
from app.models.models import CaseTasks
from app.models.models import GlobalTasks
from app.models.models import TaskStatus
from app.models.models import UserActivity
from app.schema.marshables import CaseTaskSchema, CaseSchema, CaseDetailsSchema
from app.schema.marshables import GlobalTasksSchema
from app.util import ac_api_requires
from app.util import ac_requires
from app.util import not_authenticated_redirection_url
from app.util import response_error
from app.util import response_success
# CONTENT ------------------------------------------------
dashboard_blueprint = Blueprint(
'index',
__name__,
template_folder='templates'
)
# Logout user
@dashboard_blueprint.route('/logout')
def logout():
"""
Logout function. Erase its session and redirect to index i.e login
:return: Page
"""
if session['current_case']:
current_user.ctx_case = session['current_case']['case_id']
current_user.ctx_human_case = session['current_case']['case_name']
db.session.commit()
track_activity("user '{}' has been logged-out".format(current_user.user), ctx_less=True, display_in_ui=False)
logout_user()
return redirect(not_authenticated_redirection_url(request_url='/'))
@dashboard_blueprint.route('/dashboard/case_charts', methods=['GET'])
@ac_api_requires()
def get_cases_charts(caseid):
"""
Get case charts
:return: JSON
"""
res = Cases.query.with_entities(
Cases.open_date
).filter(
Cases.open_date > (datetime.utcnow() - timedelta(days=365))
).order_by(
Cases.open_date
).all()
retr = [[], []]
rk = {}
for case in res:
month = "{}/{}/{}".format(case.open_date.day, case.open_date.month, case.open_date.year)
if month in rk:
rk[month] += 1
else:
rk[month] = 1
retr = [list(rk.keys()), list(rk.values())]
return response_success("", retr)
@dashboard_blueprint.route('/')
def root():
if app.config['DEMO_MODE_ENABLED'] == 'True':
return redirect(url_for('demo-landing.demo_landing'))
return redirect(url_for('index.index'))
@dashboard_blueprint.route('/dashboard')
@ac_requires()
def index(caseid, url_redir):
"""
Index page. Load the dashboard data, create the add customer form
:return: Page
"""
if url_redir:
return redirect(url_for('index.index', cid=caseid if caseid is not None else 1, redirect=True))
msg = None
# Retrieve the dashboard data from multiple sources.
# Quite fast as it is only counts.
user_open_case = Cases.query.filter(
Cases.owner_id == current_user.id,
Cases.close_date == None
).count()
data = {
"user_open_count": user_open_case,
"cases_open_count": Cases.query.filter(Cases.close_date == None).count(),
"cases_count": Cases.query.with_entities(distinct(Cases.case_id)).count(),
}
# Create the customer form to be able to quickly add a customer
form = FlaskForm()
return render_template('index.html', data=data, form=form, msg=msg)
@dashboard_blueprint.route('/global/tasks/list', methods=['GET'])
@ac_api_requires()
def get_gtasks(caseid):
tasks_list = list_global_tasks()
if tasks_list:
output = [c._asdict() for c in tasks_list]
else:
output = []
ret = {
"tasks_status": get_tasks_status(),
"tasks": output
}
return response_success("", data=ret)
@dashboard_blueprint.route('/user/cases/list', methods=['GET'])
@ac_api_requires()
def list_own_cases(caseid):
cases = list_user_cases(
request.args.get('show_closed', 'false', type=str).lower() == 'true'
)
return response_success("", data=CaseDetailsSchema(many=True).dump(cases))
@dashboard_blueprint.route('/global/tasks/<int:cur_id>', methods=['GET'])
@ac_api_requires()
def view_gtask(cur_id, caseid):
task = get_global_task(task_id=cur_id)
if not task:
return response_error(f'Global task ID {cur_id} not found')
return response_success("", data=task._asdict())
@dashboard_blueprint.route('/user/tasks/list', methods=['GET'])
@ac_api_requires()
def get_utasks(caseid):
ct = list_user_tasks()
if ct:
output = [c._asdict() for c in ct]
else:
output = []
ret = {
"tasks_status": get_tasks_status(),
"tasks": output
}
return response_success("", data=ret)
@dashboard_blueprint.route('/user/reviews/list', methods=['GET'])
@ac_api_requires()
def get_reviews(caseid):
ct = list_user_reviews()
if ct:
output = [c._asdict() for c in ct]
else:
output = []
return response_success("", data=output)
@dashboard_blueprint.route('/user/tasks/status/update', methods=['POST'])
@ac_api_requires()
def utask_statusupdate(caseid):
jsdata = request.get_json()
if not jsdata:
return response_error("Invalid request")
jsdata = request.get_json()
if not jsdata:
return response_error("Invalid request")
case_id = jsdata.get('case_id') if jsdata.get('case_id') else caseid
task_id = jsdata.get('task_id')
task = CaseTasks.query.filter(CaseTasks.id == task_id, CaseTasks.task_case_id == case_id).first()
if not task:
return response_error(f"Invalid case task ID {task_id} for case {case_id}")
status_id = jsdata.get('task_status_id')
status = TaskStatus.query.filter(TaskStatus.id == status_id).first()
if not status:
return response_error(f"Invalid task status ID {status_id}")
task.task_status_id = status_id
try:
db.session.commit()
except Exception as e:
return response_error(f"Unable to update task. Error {e}")
task_schema = CaseTaskSchema()
return response_success("Updated", data=task_schema.dump(task))
@dashboard_blueprint.route('/global/tasks/add/modal', methods=['GET'])
@ac_api_requires()
def add_gtask_modal(caseid):
task = GlobalTasks()
form = CaseGlobalTaskForm()
form.task_assignee_id.choices = [(user.id, user.name) for user in User.query.filter(User.active == True).order_by(User.name).all()]
form.task_status_id.choices = [(a.id, a.status_name) for a in get_tasks_status()]
return render_template("modal_add_global_task.html", form=form, task=task, uid=current_user.id, user_name=None)
@dashboard_blueprint.route('/global/tasks/add', methods=['POST'])
@ac_api_requires()
def add_gtask(caseid):
try:
gtask_schema = GlobalTasksSchema()
request_data = call_modules_hook('on_preload_global_task_create', data=request.get_json(), caseid=caseid)
gtask = gtask_schema.load(request_data)
except marshmallow.exceptions.ValidationError as e:
return response_error(msg="Data error", data=e.messages, status=400)
gtask.task_userid_update = current_user.id
gtask.task_open_date = datetime.utcnow()
gtask.task_last_update = datetime.utcnow()
gtask.task_last_update = datetime.utcnow()
try:
db.session.add(gtask)
db.session.commit()
except Exception as e:
return response_error(msg="Data error", data=e.__str__(), status=400)
gtask = call_modules_hook('on_postload_global_task_create', data=gtask, caseid=caseid)
track_activity("created new global task \'{}\'".format(gtask.task_title), caseid=caseid)
return response_success('Task added', data=gtask_schema.dump(gtask))
@dashboard_blueprint.route('/global/tasks/update/<int:cur_id>/modal', methods=['GET'])
@ac_api_requires()
def edit_gtask_modal(cur_id, caseid):
form = CaseGlobalTaskForm()
task = GlobalTasks.query.filter(GlobalTasks.id == cur_id).first()
form.task_assignee_id.choices = [(user.id, user.name) for user in
User.query.filter(User.active == True).order_by(User.name).all()]
form.task_status_id.choices = [(a.id, a.status_name) for a in get_tasks_status()]
# Render the task
form.task_title.render_kw = {'value': task.task_title}
form.task_description.data = task.task_description
user_name, = User.query.with_entities(User.name).filter(User.id == task.task_userid_update).first()
return render_template("modal_add_global_task.html", form=form, task=task,
uid=task.task_assignee_id, user_name=user_name)
@dashboard_blueprint.route('/global/tasks/update/<int:cur_id>', methods=['POST'])
@ac_api_requires()
def edit_gtask(cur_id, caseid):
form = CaseGlobalTaskForm()
task = GlobalTasks.query.filter(GlobalTasks.id == cur_id).first()
form.task_assignee_id.choices = [(user.id, user.name) for user in User.query.filter(User.active == True).order_by(User.name).all()]
form.task_status_id.choices = [(a.id, a.status_name) for a in get_tasks_status()]
if not task:
return response_error(msg="Data error", data="Invalid task ID", status=400)
try:
gtask_schema = GlobalTasksSchema()
request_data = call_modules_hook('on_preload_global_task_update', data=request.get_json(),
caseid=caseid)
gtask = gtask_schema.load(request_data, instance=task)
gtask.task_userid_update = current_user.id
gtask.task_last_update = datetime.utcnow()
db.session.commit()
gtask = call_modules_hook('on_postload_global_task_update', data=gtask, caseid=caseid)
except marshmallow.exceptions.ValidationError as e:
return response_error(msg="Data error", data=e.messages, status=400)
track_activity("updated global task {} (status {})".format(task.task_title, task.task_status_id), caseid=caseid)
return response_success('Task updated', data=gtask_schema.dump(gtask))
@dashboard_blueprint.route('/global/tasks/delete/<int:cur_id>', methods=['POST'])
@ac_api_requires()
def gtask_delete(cur_id, caseid):
call_modules_hook('on_preload_global_task_delete', data=cur_id, caseid=caseid)
if not cur_id:
return response_error("Missing parameter")
data = GlobalTasks.query.filter(GlobalTasks.id == cur_id).first()
if not data:
return response_error("Invalid global task ID")
GlobalTasks.query.filter(GlobalTasks.id == cur_id).delete()
db.session.commit()
call_modules_hook('on_postload_global_task_delete', data=request.get_json(), caseid=caseid)
track_activity("deleted global task ID {}".format(cur_id), caseid=caseid)
return response_success("Task deleted")