hcornet 506716e703
Some checks failed
Deployment Verification / deploy-and-test (push) Failing after 29s
first sync
2025-03-04 07:59:21 +01:00

224 lines
7.9 KiB
Python

#!/usr/bin/env python3
#
# IRIS Source Code
# Copyright (C) 2021 - Airbus CyberSecurity (SAS)
# ir@cyberactionlab.net
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 3 of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program; if not, write to the Free Software Foundation,
# Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
import uuid
from datetime import datetime
from flask_login import current_user
# IMPORTS ------------------------------------------------
from sqlalchemy import BigInteger, Table
from sqlalchemy import Boolean
from sqlalchemy import Column
from sqlalchemy import Date
from sqlalchemy import DateTime
from sqlalchemy import ForeignKey
from sqlalchemy import Integer
from sqlalchemy import String
from sqlalchemy import Text
from sqlalchemy import UniqueConstraint
from sqlalchemy import text
from sqlalchemy.dialects.postgresql import JSON
from sqlalchemy.dialects.postgresql import JSONB
from sqlalchemy.dialects.postgresql import UUID
from sqlalchemy.orm import relationship
from app import db
from app.datamgmt.states import update_assets_state
from app.datamgmt.states import update_evidences_state
from app.datamgmt.states import update_ioc_state
from app.datamgmt.states import update_notes_state
from app.datamgmt.states import update_tasks_state
from app.datamgmt.states import update_timeline_state
from app.models.models import Client, Base
class Cases(db.Model):
__tablename__ = 'cases'
case_id = Column(BigInteger, primary_key=True)
soc_id = Column(String(256))
client_id = Column(ForeignKey('client.client_id'), nullable=False)
name = Column(String(256))
description = Column(Text)
open_date = Column(Date)
close_date = Column(Date)
initial_date = Column(DateTime, nullable=False, server_default=text("now()"))
closing_note = Column(Text)
user_id = Column(ForeignKey('user.id'))
owner_id = Column(ForeignKey('user.id'))
status_id = Column(Integer, nullable=False, server_default=text("0"))
state_id = Column(ForeignKey('case_state.state_id'), nullable=True)
custom_attributes = Column(JSON)
case_uuid = Column(UUID(as_uuid=True), default=uuid.uuid4, server_default=text("gen_random_uuid()"),
nullable=False)
classification_id = Column(ForeignKey('case_classification.id'))
reviewer_id = Column(ForeignKey('user.id'), nullable=True)
review_status_id = Column(ForeignKey('review_status.id'), nullable=True)
modification_history = Column(JSON)
client = relationship('Client')
user = relationship('User', foreign_keys=[user_id])
owner = relationship('User', foreign_keys=[owner_id])
classification = relationship('CaseClassification')
reviewer = relationship('User', foreign_keys=[reviewer_id])
alerts = relationship('Alert', secondary="alert_case_association", back_populates='cases', viewonly=True)
tags = relationship('Tags', secondary="case_tags", back_populates='cases')
state = relationship('CaseState', back_populates='cases')
review_status = relationship('ReviewStatus')
def __init__(self,
name=None,
soc_id=None,
client_id=None,
description=None,
user=None,
custom_attributes=None,
classification_id=None,
state_id=None
):
self.name = name[:200] if name else None,
self.soc_id = soc_id,
self.client_id = client_id,
self.description = description,
self.user_id = current_user.id if current_user else user.id
self.owner_id = self.user_id
self.author = current_user.user if current_user else user.user
self.description = description
self.open_date = datetime.utcnow()
self.close_date = None
self.initial_date = datetime.utcnow()
self.custom_attributes = custom_attributes
self.case_uuid = uuid.uuid4()
self.status_id = 0
self.classification_id = classification_id
self.state_id = state_id
def save(self):
"""
Save the current case in database
:return:
"""
# Inject self into db
db.session.add(self)
# Commit the changes
db.session.commit()
# Rename case with the ID
self.name = "#{id} - {name}".format(id=self.case_id, name=self.name)
# Create the states
update_timeline_state(caseid=self.case_id, userid=self.user_id)
update_tasks_state(caseid=self.case_id, userid=self.user_id)
update_evidences_state(caseid=self.case_id, userid=self.user_id)
update_ioc_state(caseid=self.case_id, userid=self.user_id)
update_assets_state(caseid=self.case_id, userid=self.user_id)
update_notes_state(caseid=self.case_id, userid=self.user_id)
db.session.commit()
return self
def validate_on_build(self):
"""
Execute an autocheck of the case metadata and validate it
:return: True if valid else false; Tuple with errors
"""
# TODO : Check if case name already exists
res = Client.query\
.with_entities(Client.client_id)\
.filter(Client.client_id == self.client_id)\
.first()
self.client_id = res[0]
return True, []
class CaseTags(db.Model):
__tablename__ = 'case_tags'
__table_args__ = (
UniqueConstraint('case_id', 'tag_id', name='case_tags_case_id_tag_id_key'),
)
case_id = Column(ForeignKey('cases.case_id'), primary_key=True, nullable=False)
tag_id = Column(ForeignKey('tags.id'), primary_key=True, nullable=False, index=True)
class CasesEvent(db.Model):
__tablename__ = "cases_events"
event_id = Column(BigInteger, primary_key=True)
event_uuid = Column(UUID(as_uuid=True), default=uuid.uuid4, server_default=text("gen_random_uuid()"),
nullable=False)
case_id = Column(ForeignKey('cases.case_id'))
event_title = Column(Text)
event_source = Column(Text)
event_content = Column(Text)
event_raw = Column(Text)
event_date = Column(DateTime)
event_added = Column(DateTime)
event_in_graph = Column(Boolean)
event_in_summary = Column(Boolean)
user_id = Column(ForeignKey('user.id'))
modification_history = Column(JSONB)
event_color = Column(Text)
event_tags = Column(Text)
event_tz = Column(Text)
event_date_wtz = Column(DateTime)
event_is_flagged = Column(Boolean, default=False)
custom_attributes = Column(JSONB)
case = relationship('Cases')
user = relationship('User')
category = relationship('EventCategory', secondary="case_events_category",
primaryjoin="CasesEvent.event_id==CaseEventCategory.event_id",
secondaryjoin="CaseEventCategory.category_id==EventCategory.id",
viewonly=True)
class CaseState(db.Model):
__tablename__ = 'case_state'
state_id = Column(Integer, primary_key=True)
state_name = Column(Text, nullable=False)
state_description = Column(Text)
protected = Column(Boolean, default=False)
cases = relationship('Cases', back_populates='state')
class CaseProtagonist(db.Model):
__tablename__ = "case_protagonist"
id = Column(Integer, primary_key=True)
case_id = Column(ForeignKey('cases.case_id'))
user_id = Column(ForeignKey('user.id'))
name = Column(Text)
contact = Column(Text)
role = Column(Text)
case = relationship('Cases')
user = relationship('User')