Some checks failed
Deployment Verification / deploy-and-test (push) Failing after 29s
175 lines
5.1 KiB
Python
175 lines
5.1 KiB
Python
#!/usr/bin/env python3
|
|
#
|
|
# IRIS Source Code
|
|
# Copyright (C) 2021 - Airbus CyberSecurity (SAS)
|
|
# ir@cyberactionlab.net
|
|
#
|
|
# This program is free software; you can redistribute it and/or
|
|
# modify it under the terms of the GNU Lesser General Public
|
|
# License as published by the Free Software Foundation; either
|
|
# version 3 of the License, or (at your option) any later version.
|
|
#
|
|
# This program is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
|
|
# Lesser General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU Lesser General Public License
|
|
# along with this program; if not, write to the Free Software Foundation,
|
|
# Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
|
|
|
import datetime
|
|
from flask_login import current_user
|
|
from sqlalchemy import and_
|
|
from sqlalchemy import desc
|
|
|
|
from app import db
|
|
from app.datamgmt.manage.manage_attribute_db import get_default_custom_attributes
|
|
from app.datamgmt.states import update_evidences_state
|
|
from app.models import CaseReceivedFile
|
|
from app.models import Comments
|
|
from app.models import EvidencesComments
|
|
from app.models.authorization import User
|
|
|
|
|
|
def get_rfiles(caseid):
|
|
crf = CaseReceivedFile.query.with_entities(
|
|
CaseReceivedFile.id,
|
|
CaseReceivedFile.file_uuid,
|
|
CaseReceivedFile.filename,
|
|
CaseReceivedFile.date_added,
|
|
CaseReceivedFile.file_hash,
|
|
CaseReceivedFile.file_description,
|
|
CaseReceivedFile.file_size,
|
|
User.name.label('username')
|
|
).filter(
|
|
CaseReceivedFile.case_id == caseid
|
|
).join(CaseReceivedFile.user).order_by(desc(CaseReceivedFile.date_added)).all()
|
|
|
|
return crf
|
|
|
|
|
|
def add_rfile(evidence, caseid, user_id):
|
|
|
|
evidence.date_added = datetime.datetime.now()
|
|
evidence.case_id = caseid
|
|
evidence.user_id = user_id
|
|
|
|
evidence.custom_attributes = get_default_custom_attributes('evidence')
|
|
|
|
db.session.add(evidence)
|
|
|
|
update_evidences_state(caseid=caseid, userid=user_id)
|
|
|
|
db.session.commit()
|
|
|
|
return evidence
|
|
|
|
|
|
def get_rfile(rfile_id, caseid):
|
|
return CaseReceivedFile.query.filter(
|
|
CaseReceivedFile.id == rfile_id,
|
|
CaseReceivedFile.case_id == caseid
|
|
).first()
|
|
|
|
|
|
def update_rfile(evidence, user_id, caseid):
|
|
|
|
evidence.user_id = user_id
|
|
|
|
update_evidences_state(caseid=caseid, userid=user_id)
|
|
db.session.commit()
|
|
return evidence
|
|
|
|
|
|
def delete_rfile(rfile_id, caseid):
|
|
with db.session.begin_nested():
|
|
com_ids = EvidencesComments.query.with_entities(
|
|
EvidencesComments.comment_id
|
|
).filter(
|
|
EvidencesComments.comment_evidence_id == rfile_id
|
|
).all()
|
|
|
|
com_ids = [c.comment_id for c in com_ids]
|
|
EvidencesComments.query.filter(EvidencesComments.comment_id.in_(com_ids)).delete()
|
|
|
|
Comments.query.filter(Comments.comment_id.in_(com_ids)).delete()
|
|
|
|
CaseReceivedFile.query.filter(and_(
|
|
CaseReceivedFile.id == rfile_id,
|
|
CaseReceivedFile.case_id == caseid,
|
|
)).delete()
|
|
|
|
update_evidences_state(caseid=caseid)
|
|
|
|
db.session.commit()
|
|
|
|
|
|
def get_case_evidence_comments(evidence_id):
|
|
return Comments.query.filter(
|
|
EvidencesComments.comment_evidence_id == evidence_id
|
|
).join(
|
|
EvidencesComments,
|
|
Comments.comment_id == EvidencesComments.comment_id
|
|
).order_by(
|
|
Comments.comment_date.asc()
|
|
).all()
|
|
|
|
|
|
def add_comment_to_evidence(evidence_id, comment_id):
|
|
ec = EvidencesComments()
|
|
ec.comment_evidence_id = evidence_id
|
|
ec.comment_id = comment_id
|
|
|
|
db.session.add(ec)
|
|
db.session.commit()
|
|
|
|
|
|
def get_case_evidence_comments_count(evidences_list):
|
|
return EvidencesComments.query.filter(
|
|
EvidencesComments.comment_evidence_id.in_(evidences_list)
|
|
).with_entities(
|
|
EvidencesComments.comment_evidence_id,
|
|
EvidencesComments.comment_id
|
|
).group_by(
|
|
EvidencesComments.comment_evidence_id,
|
|
EvidencesComments.comment_id
|
|
).all()
|
|
|
|
|
|
def get_case_evidence_comment(evidence_id, comment_id):
|
|
return EvidencesComments.query.filter(
|
|
EvidencesComments.comment_evidence_id == evidence_id,
|
|
EvidencesComments.comment_id == comment_id
|
|
).with_entities(
|
|
Comments.comment_id,
|
|
Comments.comment_text,
|
|
Comments.comment_date,
|
|
Comments.comment_update_date,
|
|
Comments.comment_uuid,
|
|
User.name,
|
|
User.user
|
|
).join(
|
|
EvidencesComments.comment,
|
|
Comments.user
|
|
).first()
|
|
|
|
|
|
def delete_evidence_comment(evidence_id, comment_id):
|
|
comment = Comments.query.filter(
|
|
Comments.comment_id == comment_id,
|
|
Comments.comment_user_id == current_user.id
|
|
).first()
|
|
if not comment:
|
|
return False, "You are not allowed to delete this comment"
|
|
|
|
EvidencesComments.query.filter(
|
|
EvidencesComments.comment_evidence_id == evidence_id,
|
|
EvidencesComments.comment_id == comment_id
|
|
).delete()
|
|
|
|
db.session.delete(comment)
|
|
db.session.commit()
|
|
|
|
return True, "Comment deleted"
|