hcornet 506716e703
Some checks failed
Deployment Verification / deploy-and-test (push) Failing after 29s
first sync
2025-03-04 07:59:21 +01:00

133 lines
3.6 KiB
Python

#!/usr/bin/env python3
#
# IRIS Source Code
# Copyright (C) 2021 - Airbus CyberSecurity (SAS)
# ir@cyberactionlab.net
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 3 of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program; if not, write to the Free Software Foundation,
# Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
from datetime import datetime
from flask_login import current_user
from sqlalchemy import and_
from app import db
from app.models import ObjectState
def update_object_state(object_name, caseid, userid=None) -> ObjectState:
"""
Expects a db commit soon after
Args:
object_name: name of the object to update
caseid: case id
userid: user id
Returns:
ObjectState object
"""
if not userid:
userid = current_user.id
os = ObjectState.query.filter(and_(
ObjectState.object_name == object_name,
ObjectState.object_case_id == caseid
)).first()
if os:
os.object_last_update = datetime.utcnow()
os.object_state = os.object_state + 1
os.object_updated_by_id = userid
os.object_case_id = caseid
else:
os = ObjectState()
os.object_name = object_name
os.object_state = 0
os.object_last_update = datetime.utcnow()
os.object_updated_by_id = userid
os.object_case_id = caseid
db.session.add(os)
return os
def get_object_state(object_name, caseid):
os = ObjectState.query.with_entities(
ObjectState.object_state,
ObjectState.object_last_update
).filter(and_(
ObjectState.object_name == object_name,
ObjectState.object_case_id == caseid
)).first()
if os:
return os._asdict()
else:
return None
def delete_case_states(caseid):
ObjectState.query.filter(
ObjectState.object_case_id == caseid
).delete()
def update_timeline_state(caseid, userid=None):
return update_object_state('timeline', caseid=caseid, userid=userid)
def get_timeline_state(caseid):
return get_object_state('timeline', caseid=caseid)
def update_tasks_state(caseid, userid=None):
return update_object_state('tasks', caseid=caseid, userid=userid)
def get_tasks_state(caseid):
return get_object_state('tasks', caseid=caseid)
def update_evidences_state(caseid, userid=None):
return update_object_state('evidences', caseid=caseid, userid=userid)
def get_evidences_state(caseid):
return get_object_state('evidences', caseid=caseid)
def update_ioc_state(caseid, userid=None):
return update_object_state('ioc', caseid=caseid, userid=userid)
def get_ioc_state(caseid):
return get_object_state('ioc', caseid=caseid)
def update_assets_state(caseid, userid=None):
return update_object_state('assets', caseid=caseid, userid=userid)
def get_assets_state(caseid):
return get_object_state('assets', caseid=caseid)
def update_notes_state(caseid, userid=None):
return update_object_state('notes', caseid=caseid, userid=userid)
def get_notes_state(caseid):
return get_object_state('notes', caseid=caseid)