#!/usr/bin/env python3 # # IRIS Source Code # Copyright (C) 2021 - Airbus CyberSecurity (SAS) # ir@cyberactionlab.net # # This program is free software; you can redistribute it and/or # modify it under the terms of the GNU Lesser General Public # License as published by the Free Software Foundation; either # version 3 of the License, or (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU # Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public License # along with this program; if not, write to the Free Software Foundation, # Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. from flask_login import current_user from sqlalchemy import and_ from app import bc from app import db from app.datamgmt.case.case_db import get_case from app.iris_engine.access_control.utils import ac_access_level_mask_from_val_list, ac_ldp_group_removal from app.iris_engine.access_control.utils import ac_access_level_to_list from app.iris_engine.access_control.utils import ac_auto_update_user_effective_access from app.iris_engine.access_control.utils import ac_get_detailed_effective_permissions_from_groups from app.iris_engine.access_control.utils import ac_remove_case_access_from_user from app.iris_engine.access_control.utils import ac_set_case_access_for_user from app.models import Cases from app.models.authorization import CaseAccessLevel from app.models.authorization import Group from app.models.authorization import Organisation from app.models.authorization import User from app.models.authorization import UserCaseAccess from app.models.authorization import UserCaseEffectiveAccess from app.models.authorization import UserGroup from app.models.authorization import UserOrganisation def get_user(user_id, id_key: str = 'id'): user = User.query.filter(getattr(User, id_key) == user_id).first() return user def get_active_user_by_login(username): user = User.query.filter( User.user == username, User.active == True ).first() return user def list_users_id(): users = User.query.with_entities(User.user_id).all() return users def get_user_effective_permissions(user_id): groups_perms = UserGroup.query.with_entities( Group.group_permissions, Group.group_name ).filter( UserGroup.user_id == user_id ).join( UserGroup.group ).all() effective_permissions = ac_get_detailed_effective_permissions_from_groups(groups_perms) return effective_permissions def get_user_groups(user_id): groups = UserGroup.query.with_entities( Group.group_name, Group.group_id, Group.group_uuid ).filter( UserGroup.user_id == user_id ).join( UserGroup.group ).all() output = [] for group in groups: output.append(group._asdict()) return output def update_user_groups(user_id, groups): cur_groups = UserGroup.query.with_entities( UserGroup.group_id ).filter(UserGroup.user_id == user_id).all() set_cur_groups = set([grp[0] for grp in cur_groups]) set_new_groups = set(int(grp) for grp in groups) groups_to_add = set_new_groups - set_cur_groups groups_to_remove = set_cur_groups - set_new_groups for group_id in groups_to_add: user_group = UserGroup() user_group.user_id = user_id user_group.group_id = group_id db.session.add(user_group) for group_id in groups_to_remove: if current_user.id == user_id and ac_ldp_group_removal(user_id=user_id, group_id=group_id): continue UserGroup.query.filter( UserGroup.user_id == user_id, UserGroup.group_id == group_id ).delete() db.session.commit() ac_auto_update_user_effective_access(user_id) def update_user_orgs(user_id, orgs): cur_orgs = UserOrganisation.query.with_entities( UserOrganisation.org_id, UserOrganisation.is_primary_org ).filter(UserOrganisation.user_id == user_id).all() updated = False primary_org = 0 for org in cur_orgs: if org.is_primary_org: primary_org = org.org_id if primary_org == 0: return False, 'User does not have primary organisation. Set one before managing its organisations' set_cur_orgs = set([org.org_id for org in cur_orgs]) set_new_orgs = set(int(org) for org in orgs) orgs_to_add = set_new_orgs - set_cur_orgs orgs_to_remove = set_cur_orgs - set_new_orgs for org in orgs_to_add: user_org = UserOrganisation() user_org.user_id = user_id user_org.org_id = org db.session.add(user_org) updated = True for org in orgs_to_remove: if org != primary_org: UserOrganisation.query.filter( UserOrganisation.user_id == user_id, UserOrganisation.org_id == org ).delete() else: db.session.rollback() return False, f'Cannot delete user from primary organisation {org}. Change it before deleting.' updated = True db.session.commit() ac_auto_update_user_effective_access(user_id) return True, f'Organisations membership updated' if updated else "Nothing changed" def change_user_primary_org(user_id, old_org_id, new_org_id): uo_old = UserOrganisation.query.filter( UserOrganisation.user_id == user_id, UserOrganisation.org_id == old_org_id ).first() uo_new = UserOrganisation.query.filter( UserOrganisation.user_id == user_id, UserOrganisation.org_id == new_org_id ).first() if uo_old: uo_old.is_primary_org = False if not uo_new: uo = UserOrganisation() uo.user_id = user_id uo.org_id = new_org_id uo.is_primary_org = True db.session.add(uo) else: uo_new.is_primary_org = True db.session.commit() return def add_user_to_organisation(user_id, org_id, make_primary=False): org_id = Organisation.query.first().org_id uo_exists = UserOrganisation.query.filter( UserOrganisation.user_id == user_id, UserOrganisation.org_id == org_id ).first() if uo_exists: uo_exists.is_primary_org = make_primary db.session.commit() return True # Check if user has a primary org already prim_org = get_user_primary_org(user_id=user_id) if make_primary: prim_org.is_primary_org = False db.session.commit() uo = UserOrganisation() uo.user_id = user_id uo.org_id = org_id uo.is_primary_org = prim_org is None db.session.add(uo) db.session.commit() return True def get_user_primary_org(user_id): uo = UserOrganisation.query.filter( and_(UserOrganisation.user_id == user_id, UserOrganisation.is_primary_org == True) ).all() if not uo: return None uoe = None index = 0 if len(uo) > 1: # Fix potential duplication for u in uo: if index == 0: uoe = u continue u.is_primary_org = False db.session.commit() else: uoe = uo[0] return uoe def add_user_to_group(user_id, group_id): exists = UserGroup.query.filter( UserGroup.user_id == user_id, UserGroup.group_id == group_id ).scalar() if exists: return True ug = UserGroup() ug.user_id = user_id ug.group_id = group_id db.session.add(ug) db.session.commit() return True def get_user_organisations(user_id): user_org = UserOrganisation.query.with_entities( Organisation.org_name, Organisation.org_id, Organisation.org_uuid, UserOrganisation.is_primary_org ).filter( UserOrganisation.user_id == user_id ).join( UserOrganisation.org ).all() output = [] for org in user_org: output.append(org._asdict()) return output def get_user_cases_access(user_id): user_accesses = UserCaseAccess.query.with_entities( UserCaseAccess.access_level, UserCaseAccess.case_id, Cases.name.label('case_name') ).join( UserCaseAccess.case ).filter( UserCaseAccess.user_id == user_id ).all() user_cases_access = [] for kuser in user_accesses: user_cases_access.append({ "access_level": kuser.access_level, "access_level_list": ac_access_level_to_list(kuser.access_level), "case_id": kuser.case_id, "case_name": kuser.case_name }) return user_cases_access def get_user_cases_fast(user_id): user_cases = UserCaseEffectiveAccess.query.with_entities( UserCaseEffectiveAccess.case_id ).where( UserCaseEffectiveAccess.user_id == user_id, UserCaseEffectiveAccess.access_level != CaseAccessLevel.deny_all.value ).all() return [c.case_id for c in user_cases] def remove_cases_access_from_user(user_id, cases_list): if not user_id or type(user_id) is not int: return False, 'Invalid user id' if not cases_list or type(cases_list[0]) is not int: return False, "Invalid cases list" UserCaseAccess.query.filter( and_( UserCaseAccess.case_id.in_(cases_list), UserCaseAccess.user_id == user_id )).delete() db.session.commit() ac_auto_update_user_effective_access(user_id) return True, 'Cases access removed' def remove_case_access_from_user(user_id, case_id): if not user_id or type(user_id) is not int: return False, 'Invalid user id' if not case_id or type(case_id) is not int: return False, "Invalid case id" UserCaseAccess.query.filter( and_( UserCaseAccess.case_id == case_id, UserCaseAccess.user_id == user_id )).delete() db.session.commit() ac_remove_case_access_from_user(user_id, case_id) return True, 'Case access removed' def set_user_case_access(user_id, case_id, access_level): if user_id is None or type(user_id) is not int: return False, 'Invalid user id' if case_id is None or type(case_id) is not int: return False, "Invalid case id" if access_level is None or type(access_level) is not int: return False, "Invalid access level" if CaseAccessLevel.has_value(access_level) is False: return False, "Invalid access level" uca = UserCaseAccess.query.filter( UserCaseAccess.user_id == user_id, UserCaseAccess.case_id == case_id ).all() if len(uca) > 1: for u in uca: db.session.delete(u) db.session.commit() uca = None if not uca: uca = UserCaseAccess() uca.user_id = user_id uca.case_id = case_id uca.access_level = access_level db.session.add(uca) else: uca[0].access_level = access_level db.session.commit() ac_set_case_access_for_user(user_id, case_id, access_level) return True, 'Case access set to {} for user {}'.format(access_level, user_id) def get_user_details(user_id, include_api_key=False): user = User.query.filter(User.id == user_id).first() if not user: return None row = {} row['user_id'] = user.id row['user_uuid'] = user.uuid row['user_name'] = user.name row['user_login'] = user.user row['user_email'] = user.email row['user_active'] = user.active row['user_is_service_account'] = user.is_service_account if include_api_key: row['user_api_key'] = user.api_key row['user_groups'] = get_user_groups(user_id) row['user_organisations'] = get_user_organisations(user_id) row['user_permissions'] = get_user_effective_permissions(user_id) row['user_cases_access'] = get_user_cases_access(user_id) upg = get_user_primary_org(user_id) row['user_primary_organisation_id'] = upg.org_id if upg else 0 return row def add_case_access_to_user(user, cases_list, access_level): if not user: return None, "Invalid user" for case_id in cases_list: case = get_case(case_id) if not case: return None, "Invalid case ID" access_level_mask = ac_access_level_mask_from_val_list([access_level]) ocas = UserCaseAccess.query.filter( and_( UserCaseAccess.case_id == case_id, UserCaseAccess.user_id == user.id )).all() if ocas: for oca in ocas: db.session.delete(oca) oca = UserCaseAccess() oca.user_id = user.id oca.access_level = access_level_mask oca.case_id = case_id db.session.add(oca) db.session.commit() ac_auto_update_user_effective_access(user.id) return user, "Updated" def get_user_by_username(username): user = User.query.filter(User.user == username).first() return user def get_users_list(): users = User.query.all() output = [] for user in users: row = {} row['user_id'] = user.id row['user_uuid'] = user.uuid row['user_name'] = user.name row['user_login'] = user.user row['user_email'] = user.email row['user_active'] = user.active row['user_is_service_account'] = user.is_service_account output.append(row) return output def get_users_list_restricted(): users = User.query.all() output = [] for user in users: row = {} row['user_id'] = user.id row['user_uuid'] = user.uuid row['user_name'] = user.name row['user_login'] = user.user row['user_active'] = user.active output.append(row) return output def get_users_view_from_user_id(user_id): organisations = get_user_organisations(user_id) orgs_id = [uo.get('org_id') for uo in organisations] users = UserOrganisation.query.with_entities( User ).filter(and_( UserOrganisation.org_id.in_(orgs_id), UserOrganisation.user_id != user_id )).join( UserOrganisation.user ).all() return users def get_users_id_view_from_user_id(user_id): organisations = get_user_organisations(user_id) orgs_id = [uo.get('org_id') for uo in organisations] users = UserOrganisation.query.with_entities( User.id ).filter(and_( UserOrganisation.org_id.in_(orgs_id), UserOrganisation.user_id != user_id )).join( UserOrganisation.user ).all() users = [u[0] for u in users] return users def get_users_list_user_view(user_id): users = get_users_view_from_user_id(user_id) output = [] for user in users: row = {} row['user_id'] = user.id row['user_uuid'] = user.uuid row['user_name'] = user.name row['user_login'] = user.user row['user_email'] = user.email row['user_active'] = user.active output.append(row) return output def get_users_list_restricted_user_view(user_id): users = get_users_view_from_user_id(user_id) output = [] for user in users: row = {} row['user_id'] = user.id row['user_uuid'] = user.uuid row['user_name'] = user.name row['user_login'] = user.user row['user_active'] = user.active output.append(row) return output def get_users_list_restricted_from_case(case_id): users = UserCaseEffectiveAccess.query.with_entities( User.id.label('user_id'), User.uuid.label('user_uuid'), User.name.label('user_name'), User.user.label('user_login'), User.active.label('user_active'), User.email.label('user_email'), UserCaseEffectiveAccess.access_level.label('user_access_level') ).filter( UserCaseEffectiveAccess.case_id == case_id ).join( UserCaseEffectiveAccess.user ).all() return [u._asdict() for u in users] def create_user(user_name: str, user_login: str, user_password: str, user_email: str, user_active: bool, user_external_id: str = None, user_is_service_account: bool = False): if user_is_service_account is True and (user_password is None or user_password == ''): pw_hash = None else: pw_hash = bc.generate_password_hash(user_password.encode('utf8')).decode('utf8') user = User(user=user_login, name=user_name, email=user_email, password=pw_hash, active=user_active, external_id=user_external_id, is_service_account=user_is_service_account) user.save() add_user_to_organisation(user.id, org_id=1) ac_auto_update_user_effective_access(user_id=user.id) return user def update_user(user: User, name: str = None, email: str = None, password: str = None): if password is not None and password != '': pw_hash = bc.generate_password_hash(password.encode('utf8')).decode('utf8') user.password = pw_hash for key, value in [('name', name,), ('email', email,)]: if value is not None: setattr(user, key, value) db.session.commit() return user def delete_user(user_id): UserCaseAccess.query.filter(UserCaseAccess.user_id == user_id).delete() UserOrganisation.query.filter(UserOrganisation.user_id == user_id).delete() UserGroup.query.filter(UserGroup.user_id == user_id).delete() UserCaseEffectiveAccess.query.filter(UserCaseEffectiveAccess.user_id == user_id).delete() User.query.filter(User.id == user_id).delete() db.session.commit() def user_exists(user_name, user_email): user = User.query.filter_by(user=user_name).first() user_by_email = User.query.filter_by(email=user_email).first() return user or user_by_email