Some checks failed
Deployment Verification / deploy-and-test (push) Failing after 29s
311 lines
8.7 KiB
Python
311 lines
8.7 KiB
Python
#!/usr/bin/env python3
|
|
#
|
|
# IRIS Source Code
|
|
# contact@dfir-iris.org
|
|
#
|
|
# This program is free software; you can redistribute it and/or
|
|
# modify it under the terms of the GNU Lesser General Public
|
|
# License as published by the Free Software Foundation; either
|
|
# version 3 of the License, or (at your option) any later version.
|
|
#
|
|
# This program is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
|
|
# Lesser General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU Lesser General Public License
|
|
# along with this program; if not, write to the Free Software Foundation,
|
|
# Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
|
from flask_login import current_user
|
|
from sqlalchemy import and_
|
|
|
|
from app import db
|
|
from app.datamgmt.case.case_db import get_case
|
|
from app.datamgmt.manage.manage_cases_db import list_cases_id
|
|
from app.iris_engine.access_control.utils import ac_access_level_mask_from_val_list, ac_ldp_group_removal
|
|
from app.iris_engine.access_control.utils import ac_access_level_to_list
|
|
from app.iris_engine.access_control.utils import ac_auto_update_user_effective_access
|
|
from app.iris_engine.access_control.utils import ac_permission_to_list
|
|
from app.models import Cases
|
|
from app.models.authorization import Group
|
|
from app.models.authorization import GroupCaseAccess
|
|
from app.models.authorization import User
|
|
from app.models.authorization import UserGroup
|
|
from app.schema.marshables import AuthorizationGroupSchema
|
|
|
|
|
|
def get_groups_list():
|
|
groups = Group.query.all()
|
|
|
|
return groups
|
|
|
|
|
|
def get_groups_list_hr_perms():
|
|
groups = get_groups_list()
|
|
|
|
get_membership_list = UserGroup.query.with_entities(
|
|
UserGroup.group_id,
|
|
User.user,
|
|
User.id,
|
|
User.name
|
|
).join(UserGroup.user).all()
|
|
|
|
membership_list = {}
|
|
for member in get_membership_list:
|
|
if member.group_id not in membership_list:
|
|
membership_list[member.group_id] = [{
|
|
'user': member.user,
|
|
'name': member.name,
|
|
'id': member.id
|
|
}]
|
|
else:
|
|
membership_list[member.group_id].append({
|
|
'user': member.user,
|
|
'name': member.name,
|
|
'id': member.id
|
|
})
|
|
|
|
groups = AuthorizationGroupSchema().dump(groups, many=True)
|
|
for group in groups:
|
|
perms = ac_permission_to_list(group['group_permissions'])
|
|
group['group_permissions_list'] = perms
|
|
group['group_members'] = membership_list.get(group['group_id'], [])
|
|
|
|
return groups
|
|
|
|
|
|
def get_group(group_id):
|
|
group = Group.query.filter(Group.group_id == group_id).first()
|
|
|
|
return group
|
|
|
|
|
|
def get_group_by_name(group_name):
|
|
groups = Group.query.filter(Group.group_name == group_name)
|
|
return groups.first()
|
|
|
|
|
|
def get_group_with_members(group_id):
|
|
group = get_group(group_id)
|
|
if not group:
|
|
return None
|
|
|
|
get_membership_list = UserGroup.query.with_entities(
|
|
UserGroup.group_id,
|
|
User.user,
|
|
User.id,
|
|
User.name
|
|
).join(
|
|
UserGroup.user
|
|
).filter(
|
|
UserGroup.group_id == group_id
|
|
).all()
|
|
|
|
membership_list = {}
|
|
for member in get_membership_list:
|
|
if member.group_id not in membership_list:
|
|
|
|
membership_list[member.group_id] = [{
|
|
'user': member.user,
|
|
'name': member.name,
|
|
'id': member.id
|
|
}]
|
|
else:
|
|
membership_list[member.group_id].append({
|
|
'user': member.user,
|
|
'name': member.name,
|
|
'id': member.id
|
|
})
|
|
|
|
perms = ac_permission_to_list(group.group_permissions)
|
|
setattr(group, 'group_permissions_list', perms)
|
|
setattr(group, 'group_members', membership_list.get(group.group_id, []))
|
|
|
|
return group
|
|
|
|
|
|
def get_group_details(group_id):
|
|
group = get_group_with_members(group_id)
|
|
if not group:
|
|
return group
|
|
|
|
group_accesses = GroupCaseAccess.query.with_entities(
|
|
GroupCaseAccess.access_level,
|
|
GroupCaseAccess.case_id,
|
|
Cases.name.label('case_name')
|
|
).join(
|
|
GroupCaseAccess.case
|
|
).filter(
|
|
GroupCaseAccess.group_id == group_id
|
|
).all()
|
|
|
|
group_cases_access = []
|
|
for kgroup in group_accesses:
|
|
group_cases_access.append({
|
|
"access_level": kgroup.access_level,
|
|
"access_level_list": ac_access_level_to_list(kgroup.access_level),
|
|
"case_id": kgroup.case_id,
|
|
"case_name": kgroup.case_name
|
|
})
|
|
|
|
setattr(group, 'group_cases_access', group_cases_access)
|
|
|
|
return group
|
|
|
|
|
|
def update_group_members(group, members):
|
|
if not group:
|
|
return None
|
|
|
|
cur_groups = UserGroup.query.with_entities(
|
|
UserGroup.user_id
|
|
).filter(UserGroup.group_id == group.group_id).all()
|
|
|
|
set_cur_groups = set([grp[0] for grp in cur_groups])
|
|
set_members = set(int(mber) for mber in members)
|
|
|
|
users_to_add = set_members - set_cur_groups
|
|
users_to_remove = set_cur_groups - set_members
|
|
|
|
for uid in users_to_add:
|
|
user = User.query.filter(User.id == uid).first()
|
|
if user:
|
|
ug = UserGroup()
|
|
ug.group_id = group.group_id
|
|
ug.user_id = user.id
|
|
db.session.add(ug)
|
|
|
|
db.session.commit()
|
|
ac_auto_update_user_effective_access(uid)
|
|
|
|
for uid in users_to_remove:
|
|
if current_user.id == uid and ac_ldp_group_removal(uid, group.group_id):
|
|
continue
|
|
|
|
UserGroup.query.filter(
|
|
and_(UserGroup.group_id == group.group_id,
|
|
UserGroup.user_id == uid)
|
|
).delete()
|
|
|
|
db.session.commit()
|
|
ac_auto_update_user_effective_access(uid)
|
|
|
|
return group
|
|
|
|
|
|
def remove_user_from_group(group, member):
|
|
if not group:
|
|
return None
|
|
|
|
UserGroup.query.filter(
|
|
and_(UserGroup.group_id == group.group_id,
|
|
UserGroup.user_id == member.id)
|
|
).delete()
|
|
db.session.commit()
|
|
|
|
ac_auto_update_user_effective_access(member.id)
|
|
|
|
return group
|
|
|
|
|
|
def delete_group(group):
|
|
if not group:
|
|
return None
|
|
|
|
UserGroup.query.filter(UserGroup.group_id == group.group_id).delete()
|
|
GroupCaseAccess.query.filter(GroupCaseAccess.group_id == group.group_id).delete()
|
|
|
|
db.session.delete(group)
|
|
db.session.commit()
|
|
|
|
|
|
def add_case_access_to_group(group, cases_list, access_level):
|
|
if not group:
|
|
return None, "Invalid group"
|
|
|
|
for case_id in cases_list:
|
|
case = get_case(case_id)
|
|
if not case:
|
|
return None, "Invalid case ID"
|
|
|
|
access_level_mask = ac_access_level_mask_from_val_list([access_level])
|
|
|
|
ocas = GroupCaseAccess.query.filter(
|
|
and_(
|
|
GroupCaseAccess.case_id == case_id,
|
|
GroupCaseAccess.group_id == group.group_id
|
|
)).all()
|
|
if ocas:
|
|
for oca in ocas:
|
|
db.session.delete(oca)
|
|
|
|
oca = GroupCaseAccess()
|
|
oca.group_id = group.group_id
|
|
oca.access_level = access_level_mask
|
|
oca.case_id = case_id
|
|
db.session.add(oca)
|
|
|
|
db.session.commit()
|
|
|
|
return group, "Updated"
|
|
|
|
|
|
def add_all_cases_access_to_group(group, access_level):
|
|
if not group:
|
|
return None, "Invalid group"
|
|
|
|
for case_id in list_cases_id():
|
|
access_level_mask = ac_access_level_mask_from_val_list([access_level])
|
|
|
|
ocas = GroupCaseAccess.query.filter(
|
|
and_(
|
|
GroupCaseAccess.case_id == case_id,
|
|
GroupCaseAccess.group_id == group.group_id
|
|
)).all()
|
|
if ocas:
|
|
for oca in ocas:
|
|
db.session.delete(oca)
|
|
|
|
oca = GroupCaseAccess()
|
|
oca.group_id = group.group_id
|
|
oca.access_level = access_level_mask
|
|
oca.case_id = case_id
|
|
db.session.add(oca)
|
|
|
|
db.session.commit()
|
|
return group, "Updated"
|
|
|
|
|
|
def remove_case_access_from_group(group_id, case_id):
|
|
if not group_id or type(group_id) is not int:
|
|
return
|
|
|
|
if not case_id or type(case_id) is not int:
|
|
return
|
|
|
|
GroupCaseAccess.query.filter(
|
|
and_(
|
|
GroupCaseAccess.case_id == case_id,
|
|
GroupCaseAccess.group_id == group_id
|
|
)).delete()
|
|
|
|
db.session.commit()
|
|
return
|
|
|
|
|
|
def remove_cases_access_from_group(group_id, cases_list):
|
|
if not group_id or type(group_id) is not int:
|
|
return False, "Invalid group"
|
|
|
|
if not cases_list or type(cases_list[0]) is not int:
|
|
return False, "Invalid cases list"
|
|
|
|
GroupCaseAccess.query.filter(
|
|
and_(
|
|
GroupCaseAccess.case_id.in_(cases_list),
|
|
GroupCaseAccess.group_id == group_id
|
|
)).delete()
|
|
|
|
db.session.commit()
|
|
|
|
return True, "Updated" |