Some checks failed
Deployment Verification / deploy-and-test (push) Failing after 29s
133 lines
3.6 KiB
Python
133 lines
3.6 KiB
Python
#!/usr/bin/env python3
|
|
#
|
|
# IRIS Source Code
|
|
# Copyright (C) 2021 - Airbus CyberSecurity (SAS)
|
|
# ir@cyberactionlab.net
|
|
#
|
|
# This program is free software; you can redistribute it and/or
|
|
# modify it under the terms of the GNU Lesser General Public
|
|
# License as published by the Free Software Foundation; either
|
|
# version 3 of the License, or (at your option) any later version.
|
|
#
|
|
# This program is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
|
|
# Lesser General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU Lesser General Public License
|
|
# along with this program; if not, write to the Free Software Foundation,
|
|
# Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
|
|
|
from datetime import datetime
|
|
from flask_login import current_user
|
|
from sqlalchemy import and_
|
|
|
|
from app import db
|
|
from app.models import ObjectState
|
|
|
|
|
|
def update_object_state(object_name, caseid, userid=None) -> ObjectState:
|
|
"""
|
|
Expects a db commit soon after
|
|
|
|
Args:
|
|
object_name: name of the object to update
|
|
caseid: case id
|
|
userid: user id
|
|
|
|
Returns:
|
|
ObjectState object
|
|
"""
|
|
if not userid:
|
|
userid = current_user.id
|
|
|
|
os = ObjectState.query.filter(and_(
|
|
ObjectState.object_name == object_name,
|
|
ObjectState.object_case_id == caseid
|
|
)).first()
|
|
if os:
|
|
os.object_last_update = datetime.utcnow()
|
|
os.object_state = os.object_state + 1
|
|
os.object_updated_by_id = userid
|
|
os.object_case_id = caseid
|
|
|
|
else:
|
|
os = ObjectState()
|
|
os.object_name = object_name
|
|
os.object_state = 0
|
|
os.object_last_update = datetime.utcnow()
|
|
os.object_updated_by_id = userid
|
|
os.object_case_id = caseid
|
|
|
|
db.session.add(os)
|
|
|
|
return os
|
|
|
|
|
|
def get_object_state(object_name, caseid):
|
|
os = ObjectState.query.with_entities(
|
|
ObjectState.object_state,
|
|
ObjectState.object_last_update
|
|
).filter(and_(
|
|
ObjectState.object_name == object_name,
|
|
ObjectState.object_case_id == caseid
|
|
)).first()
|
|
|
|
if os:
|
|
return os._asdict()
|
|
else:
|
|
return None
|
|
|
|
|
|
def delete_case_states(caseid):
|
|
ObjectState.query.filter(
|
|
ObjectState.object_case_id == caseid
|
|
).delete()
|
|
|
|
|
|
def update_timeline_state(caseid, userid=None):
|
|
return update_object_state('timeline', caseid=caseid, userid=userid)
|
|
|
|
|
|
def get_timeline_state(caseid):
|
|
return get_object_state('timeline', caseid=caseid)
|
|
|
|
|
|
def update_tasks_state(caseid, userid=None):
|
|
return update_object_state('tasks', caseid=caseid, userid=userid)
|
|
|
|
|
|
def get_tasks_state(caseid):
|
|
return get_object_state('tasks', caseid=caseid)
|
|
|
|
|
|
def update_evidences_state(caseid, userid=None):
|
|
return update_object_state('evidences', caseid=caseid, userid=userid)
|
|
|
|
|
|
def get_evidences_state(caseid):
|
|
return get_object_state('evidences', caseid=caseid)
|
|
|
|
|
|
def update_ioc_state(caseid, userid=None):
|
|
return update_object_state('ioc', caseid=caseid, userid=userid)
|
|
|
|
|
|
def get_ioc_state(caseid):
|
|
return get_object_state('ioc', caseid=caseid)
|
|
|
|
|
|
def update_assets_state(caseid, userid=None):
|
|
return update_object_state('assets', caseid=caseid, userid=userid)
|
|
|
|
|
|
def get_assets_state(caseid):
|
|
return get_object_state('assets', caseid=caseid)
|
|
|
|
|
|
def update_notes_state(caseid, userid=None):
|
|
return update_object_state('notes', caseid=caseid, userid=userid)
|
|
|
|
|
|
def get_notes_state(caseid):
|
|
return get_object_state('notes', caseid=caseid) |