Some checks failed
Deployment Verification / deploy-and-test (push) Failing after 29s
376 lines
12 KiB
Python
376 lines
12 KiB
Python
#!/usr/bin/env python3
|
|
#
|
|
# IRIS Source Code
|
|
# Copyright (C) 2021 - Airbus CyberSecurity (SAS)
|
|
# ir@cyberactionlab.net
|
|
#
|
|
# This program is free software; you can redistribute it and/or
|
|
# modify it under the terms of the GNU Lesser General Public
|
|
# License as published by the Free Software Foundation; either
|
|
# version 3 of the License, or (at your option) any later version.
|
|
#
|
|
# This program is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
|
|
# Lesser General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU Lesser General Public License
|
|
# along with this program; if not, write to the Free Software Foundation,
|
|
# Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
|
from pathlib import Path
|
|
|
|
from datetime import datetime
|
|
from sqlalchemy import and_
|
|
from sqlalchemy.orm import aliased, contains_eager, subqueryload
|
|
|
|
from app import db
|
|
from app.datamgmt.alerts.alerts_db import search_alert_resolution_by_name
|
|
from app.datamgmt.case.case_db import get_case_tags
|
|
from app.datamgmt.manage.manage_case_classifications_db import get_case_classification_by_id
|
|
from app.datamgmt.manage.manage_case_state_db import get_case_state_by_name
|
|
from app.datamgmt.states import delete_case_states
|
|
from app.models import CaseAssets, CaseClassification, alert_assets_association, CaseStatus, TaskAssignee, TaskComments
|
|
from app.models import CaseEventCategory
|
|
from app.models import CaseEventsAssets
|
|
from app.models import CaseEventsIoc
|
|
from app.models import CaseReceivedFile
|
|
from app.models import CaseTasks
|
|
from app.models import Cases
|
|
from app.models import CasesEvent
|
|
from app.models import Client
|
|
from app.models import DataStoreFile
|
|
from app.models import DataStorePath
|
|
from app.models import IocAssetLink
|
|
from app.models import IocLink
|
|
from app.models import Notes
|
|
from app.models import NotesGroup
|
|
from app.models import NotesGroupLink
|
|
from app.models.alerts import Alert, AlertCaseAssociation
|
|
from app.models.authorization import CaseAccessLevel
|
|
from app.models.authorization import GroupCaseAccess
|
|
from app.models.authorization import OrganisationCaseAccess
|
|
from app.models.authorization import User
|
|
from app.models import UserActivity
|
|
from app.models.authorization import UserCaseAccess
|
|
from app.models.authorization import UserCaseEffectiveAccess
|
|
from app.models.cases import CaseProtagonist, CaseTags, CaseState
|
|
from app.schema.marshables import CaseDetailsSchema
|
|
|
|
|
|
def list_cases_id():
|
|
res = Cases.query.with_entities(
|
|
Cases.case_id
|
|
).all()
|
|
|
|
return [r.case_id for r in res]
|
|
|
|
|
|
def list_cases_dict_unrestricted():
|
|
|
|
owner_alias = aliased(User)
|
|
user_alias = aliased(User)
|
|
|
|
res = Cases.query.with_entities(
|
|
Cases.name.label('case_name'),
|
|
Cases.description.label('case_description'),
|
|
Client.name.label('client_name'),
|
|
Cases.open_date.label('case_open_date'),
|
|
Cases.close_date.label('case_close_date'),
|
|
Cases.soc_id.label('case_soc_id'),
|
|
Cases.user_id.label('opened_by_user_id'),
|
|
user_alias.user.label('opened_by'),
|
|
Cases.owner_id,
|
|
owner_alias.name.label('owner'),
|
|
Cases.case_id
|
|
).join(
|
|
Cases.client
|
|
).join(
|
|
user_alias, and_(Cases.user_id == user_alias.id)
|
|
).join(
|
|
owner_alias, and_(Cases.owner_id == owner_alias.id)
|
|
).order_by(
|
|
Cases.open_date
|
|
).all()
|
|
|
|
data = []
|
|
for row in res:
|
|
row = row._asdict()
|
|
row['case_open_date'] = row['case_open_date'].strftime("%m/%d/%Y")
|
|
row['case_close_date'] = row['case_close_date'].strftime("%m/%d/%Y") if row["case_close_date"] else ""
|
|
data.append(row)
|
|
|
|
return data
|
|
|
|
|
|
def list_cases_dict(user_id):
|
|
owner_alias = aliased(User)
|
|
user_alias = aliased(User)
|
|
|
|
res = UserCaseEffectiveAccess.query.with_entities(
|
|
Cases.name.label('case_name'),
|
|
Cases.description.label('case_description'),
|
|
Client.name.label('client_name'),
|
|
Cases.open_date.label('case_open_date'),
|
|
Cases.close_date.label('case_close_date'),
|
|
Cases.soc_id.label('case_soc_id'),
|
|
Cases.user_id.label('opened_by_user_id'),
|
|
user_alias.user.label('opened_by'),
|
|
Cases.owner_id,
|
|
owner_alias.name.label('owner'),
|
|
Cases.case_id,
|
|
Cases.case_uuid,
|
|
Cases.classification_id,
|
|
CaseClassification.name.label('classification'),
|
|
Cases.state_id,
|
|
CaseState.state_name,
|
|
UserCaseEffectiveAccess.access_level
|
|
).join(
|
|
UserCaseEffectiveAccess.case,
|
|
Cases.client,
|
|
Cases.user
|
|
).outerjoin(
|
|
Cases.classification,
|
|
Cases.state
|
|
).join(
|
|
user_alias, and_(Cases.user_id == user_alias.id)
|
|
).join(
|
|
owner_alias, and_(Cases.owner_id == owner_alias.id)
|
|
).filter(
|
|
UserCaseEffectiveAccess.user_id == user_id
|
|
).order_by(
|
|
Cases.open_date
|
|
).all()
|
|
|
|
data = []
|
|
for row in res:
|
|
if row.access_level & CaseAccessLevel.deny_all.value == CaseAccessLevel.deny_all.value:
|
|
continue
|
|
|
|
row = row._asdict()
|
|
row['case_open_date'] = row['case_open_date'].strftime("%m/%d/%Y")
|
|
row['case_close_date'] = row['case_close_date'].strftime("%m/%d/%Y") if row["case_close_date"] else ""
|
|
data.append(row)
|
|
|
|
return data
|
|
|
|
|
|
def user_list_cases_view(user_id):
|
|
res = UserCaseEffectiveAccess.query.with_entities(
|
|
UserCaseEffectiveAccess.case_id
|
|
).filter(and_(
|
|
UserCaseEffectiveAccess.user_id == user_id,
|
|
UserCaseEffectiveAccess.access_level != CaseAccessLevel.deny_all.value
|
|
)).all()
|
|
|
|
return [r.case_id for r in res]
|
|
|
|
|
|
def close_case(case_id):
|
|
res = Cases.query.filter(
|
|
Cases.case_id == case_id
|
|
).first()
|
|
|
|
if res:
|
|
res.close_date = datetime.utcnow()
|
|
|
|
res.state_id = get_case_state_by_name('Closed').state_id
|
|
|
|
db.session.commit()
|
|
return res
|
|
|
|
return None
|
|
|
|
|
|
def map_alert_resolution_to_case_status(case_status_id):
|
|
|
|
if case_status_id == CaseStatus.false_positive.value:
|
|
ares = search_alert_resolution_by_name('False Positive', exact_match=True)
|
|
|
|
elif case_status_id == CaseStatus.true_positive_with_impact.value:
|
|
ares = search_alert_resolution_by_name('True Positive With Impact', exact_match=True)
|
|
|
|
elif case_status_id == CaseStatus.true_positive_without_impact.value:
|
|
ares = search_alert_resolution_by_name('True Positive Without Impact', exact_match=True)
|
|
|
|
else:
|
|
ares = search_alert_resolution_by_name('Not Applicable', exact_match=True)
|
|
|
|
if ares:
|
|
return ares.resolution_status_id
|
|
|
|
return None
|
|
|
|
|
|
def reopen_case(case_id):
|
|
res = Cases.query.filter(
|
|
Cases.case_id == case_id
|
|
).first()
|
|
|
|
if res:
|
|
res.close_date = None
|
|
|
|
res.state_id = get_case_state_by_name('Open').state_id
|
|
|
|
db.session.commit()
|
|
return res
|
|
|
|
return None
|
|
|
|
|
|
def get_case_protagonists(case_id):
|
|
protagonists = CaseProtagonist.query.with_entities(
|
|
CaseProtagonist.role,
|
|
CaseProtagonist.name,
|
|
CaseProtagonist.contact,
|
|
User.name.label('user_name'),
|
|
User.user.label('user_login')
|
|
).filter(
|
|
CaseProtagonist.case_id == case_id
|
|
).outerjoin(
|
|
CaseProtagonist.user
|
|
).all()
|
|
|
|
return protagonists
|
|
|
|
|
|
def get_case_details_rt(case_id):
|
|
case = Cases.query.filter(Cases.case_id == case_id).first()
|
|
if case:
|
|
owner_alias = aliased(User)
|
|
user_alias = aliased(User)
|
|
review_alias = aliased(User)
|
|
|
|
res = db.session.query(Cases, Client, user_alias, owner_alias).with_entities(
|
|
Cases.name.label('case_name'),
|
|
Cases.description.label('case_description'),
|
|
Cases.open_date, Cases.close_date,
|
|
Cases.soc_id.label('case_soc_id'),
|
|
Cases.case_id,
|
|
Cases.case_uuid,
|
|
Client.name.label('customer_name'),
|
|
Cases.client_id.label('customer_id'),
|
|
Cases.user_id.label('open_by_user_id'),
|
|
user_alias.user.label('open_by_user'),
|
|
Cases.owner_id,
|
|
owner_alias.name.label('owner'),
|
|
Cases.status_id,
|
|
Cases.state_id,
|
|
CaseState.state_name,
|
|
Cases.custom_attributes,
|
|
Cases.modification_history,
|
|
Cases.initial_date,
|
|
Cases.classification_id,
|
|
CaseClassification.name.label('classification'),
|
|
Cases.reviewer_id,
|
|
review_alias.name.label('reviewer'),
|
|
).filter(and_(
|
|
Cases.case_id == case_id
|
|
)).join(
|
|
user_alias, and_(Cases.user_id == user_alias.id)
|
|
).outerjoin(
|
|
owner_alias, and_(Cases.owner_id == owner_alias.id)
|
|
).outerjoin(
|
|
review_alias, and_(Cases.reviewer_id == review_alias.id)
|
|
).join(
|
|
Cases.client,
|
|
).outerjoin(
|
|
Cases.classification,
|
|
Cases.state
|
|
).first()
|
|
|
|
if res is None:
|
|
return None
|
|
|
|
res = res._asdict()
|
|
|
|
res['case_tags'] = ",".join(get_case_tags(case_id))
|
|
res['status_name'] = CaseStatus(res['status_id']).name.replace("_", " ").title()
|
|
|
|
res['protagonists'] = [r._asdict() for r in get_case_protagonists(case_id)]
|
|
|
|
else:
|
|
res = None
|
|
|
|
return res
|
|
|
|
|
|
def delete_case(case_id):
|
|
if not Cases.query.filter(Cases.case_id == case_id).first():
|
|
return False
|
|
|
|
delete_case_states(caseid=case_id)
|
|
UserActivity.query.filter(UserActivity.case_id == case_id).delete()
|
|
CaseReceivedFile.query.filter(CaseReceivedFile.case_id == case_id).delete()
|
|
IocLink.query.filter(IocLink.case_id == case_id).delete()
|
|
|
|
CaseTags.query.filter(CaseTags.case_id == case_id).delete()
|
|
CaseProtagonist.query.filter(CaseProtagonist.case_id == case_id).delete()
|
|
AlertCaseAssociation.query.filter(AlertCaseAssociation.case_id == case_id).delete()
|
|
|
|
dsf_list = DataStoreFile.query.filter(DataStoreFile.file_case_id == case_id).all()
|
|
|
|
for dsf_list_item in dsf_list:
|
|
|
|
fln = Path(dsf_list_item.file_local_name)
|
|
if fln.is_file():
|
|
fln.unlink(missing_ok=True)
|
|
|
|
db.session.delete(dsf_list_item)
|
|
db.session.commit()
|
|
|
|
DataStorePath.query.filter(DataStorePath.path_case_id == case_id).delete()
|
|
|
|
da = CaseAssets.query.with_entities(CaseAssets.asset_id).filter(CaseAssets.case_id == case_id).all()
|
|
for asset in da:
|
|
IocAssetLink.query.filter(asset.asset_id == asset.asset_id).delete()
|
|
|
|
CaseEventsAssets.query.filter(CaseEventsAssets.case_id == case_id).delete()
|
|
CaseEventsIoc.query.filter(CaseEventsIoc.case_id == case_id).delete()
|
|
|
|
CaseAssetsAlias = aliased(CaseAssets)
|
|
|
|
# Query for CaseAssets that are not referenced in alerts and match the case_id
|
|
assets_to_delete = db.session.query(CaseAssets).filter(
|
|
and_(
|
|
CaseAssets.case_id == case_id,
|
|
~db.session.query(alert_assets_association).filter(
|
|
alert_assets_association.c.asset_id == CaseAssetsAlias.asset_id
|
|
).exists()
|
|
)
|
|
)
|
|
|
|
# Delete the assets
|
|
assets_to_delete.delete(synchronize_session='fetch')
|
|
|
|
# Get all alerts associated with assets in the case
|
|
alerts_to_update = db.session.query(CaseAssets).filter(CaseAssets.case_id == case_id)
|
|
|
|
# Update case_id for the alerts
|
|
alerts_to_update.update({CaseAssets.case_id: None}, synchronize_session='fetch')
|
|
db.session.commit()
|
|
|
|
NotesGroupLink.query.filter(NotesGroupLink.case_id == case_id).delete()
|
|
NotesGroup.query.filter(NotesGroup.group_case_id == case_id).delete()
|
|
Notes.query.filter(Notes.note_case_id == case_id).delete()
|
|
|
|
tasks = CaseTasks.query.filter(CaseTasks.task_case_id == case_id).all()
|
|
for task in tasks:
|
|
TaskAssignee.query.filter(TaskAssignee.task_id == task.id).delete()
|
|
CaseTasks.query.filter(CaseTasks.id == task.id).delete()
|
|
|
|
da = CasesEvent.query.with_entities(CasesEvent.event_id).filter(CasesEvent.case_id == case_id).all()
|
|
for event in da:
|
|
CaseEventCategory.query.filter(CaseEventCategory.event_id == event.event_id).delete()
|
|
|
|
CasesEvent.query.filter(CasesEvent.case_id == case_id).delete()
|
|
|
|
UserCaseAccess.query.filter(UserCaseAccess.case_id == case_id).delete()
|
|
UserCaseEffectiveAccess.query.filter(UserCaseEffectiveAccess.case_id == case_id).delete()
|
|
GroupCaseAccess.query.filter(GroupCaseAccess.case_id == case_id).delete()
|
|
OrganisationCaseAccess.query.filter(OrganisationCaseAccess.case_id == case_id).delete()
|
|
|
|
Cases.query.filter(Cases.case_id == case_id).delete()
|
|
db.session.commit()
|
|
|
|
return True
|