soc-fortress/iris-web/source/app/configuration.py
hcornet 506716e703
Some checks failed
Deployment Verification / deploy-and-test (push) Failing after 29s
first sync
2025-03-04 07:59:21 +01:00

466 lines
18 KiB
Python

#!/usr/bin/env python3
#
# IRIS Source Code
# Copyright (C) 2021 - Airbus CyberSecurity (SAS)
# ir@cyberactionlab.net
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 3 of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program; if not, write to the Free Software Foundation,
# Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
import configparser
import logging as log
import os
import ssl
# --------- Configuration ---------
# read the private configuration file
from datetime import timedelta
from enum import Enum
from pathlib import Path
import requests
# --------- Configuration ---------
# read the private configuration file
from azure.core.exceptions import ResourceNotFoundError
from azure.identity import DefaultAzureCredential
from azure.keyvault.secrets import SecretClient
class IrisConfigException(Exception):
pass
class IrisConfig(configparser.ConfigParser):
""" From https://gist.github.com/jeffersfp/586c2570cd2bdb8385693a744aa13122 - @jeffersfp """
def __init__(self):
super(IrisConfig, self).__init__()
# Azure Key Vault
self.key_vault_name = self.load('AZURE', 'KEY_VAULT_NAME')
if self.key_vault_name:
self.az_credential = DefaultAzureCredential()
self.az_client = SecretClient(vault_url=f"https://{self.key_vault_name}.vault.azure.net/",
credential=self.az_credential)
log.getLogger('azure.core.pipeline.policies.http_logging_policy').setLevel(log.WARNING)
def validate_config(self):
required_values = {
'POSTGRES': {
},
'IRIS': {
},
'CELERY': {
},
'DEVELOPMENT': {
}
}
for section, keys in required_values.items():
if section not in self:
raise IrisConfigException(
'Missing section %s in the configuration file' % section)
def config_key_vault(self):
"""
Load the settings to connect to Azure Key Vault
"""
def load(self, section, option, fallback=None):
"""
Load variable from different sources. Uses the following order
1. Azure Key Vault
2. Environment Variable
3. Environment Variable deprecated
3. Configuration File
"""
loaders = [self._load_azure_key_vault,
self._load_env, self._load_env_deprecated,
self._load_file, self._load_file_deprecated]
for loader in loaders:
value = loader(section, option)
if value:
return value
return fallback
def _load_azure_key_vault(self, section, option):
if not (hasattr(self, 'key_vault_name') and self.key_vault_name):
return
key = f"{section}-{option}".replace('_', '-')
try:
return self.az_client.get_secret(key).value
except ResourceNotFoundError:
return None
def _load_env(self, section, option):
return os.environ.get(f"{section}_{option}")
def _load_env_deprecated(self, section, option):
# Specify new_value : old_value
mapping = {
'POSTGRES_ADMIN_USER': 'DB_USER',
'POSTGRES_ADMIN_PASSWORD': 'DB_PASS',
'POSTGRES_SERVER': 'DB_HOST',
'POSTGRES_PORT': 'DB_PORT',
'IRIS_SECRET_KEY': 'SECRET_KEY',
'IRIS_SECURITY_PASSWORD_SALT': 'SECURITY_PASSWORD_SALT',
'IRIS_UPSTREAM_SERVER': 'APP_HOST',
'IRIS_UPSTREAM_PORT': 'APP_PORT'
}
new_key = f"{section}_{option}"
old_key = mapping.get(new_key)
if not old_key:
return
value = os.environ.get(old_key)
if value:
log.warning(f"Environment variable {old_key} used which is deprecated. Please use {new_key}.")
return value
def _load_file(self, section, option):
return self.get(section, option, fallback=None)
def _load_file_deprecated(self, section, option):
# Specify new_value : old_value
mapping = {
('POSTGRES', 'USER'): ('POSTGRES', 'PG_ACCOUNT'),
('POSTGRES', 'PASSWORD'): ('POSTGRES', 'PG_PASSWD'),
('POSTGRES', 'ADMIN_USER'): ('POSTGRES', 'PGA_ACCOUNT'),
('POSTGRES', 'ADMIN_PASSWORD'): ('POSTGRES', 'PGA_PASSWD'),
('POSTGRES', 'SERVER'): ('POSTGRES', 'PG_SERVER'),
('POSTGRES', 'PORT'): ('POSTGRES', 'PG_PORT')
}
new_key = (section, option)
old_key = mapping.get(new_key)
if not old_key:
return
value = self.get(old_key[0], old_key[1], fallback=None)
if value:
log.warning(
f"Configuration {old_key[0]}.{old_key[1]} found in configuration file. "
f"This is a deprecated configuration. Please use {new_key[0]}.{new_key[1]}")
return value
# --------- Configuration ---------
config = IrisConfig()
# Fetch the values
PG_ACCOUNT_ = config.load('POSTGRES', 'USER')
PG_PASSWD_ = config.load('POSTGRES', 'PASSWORD')
PGA_ACCOUNT_ = config.load('POSTGRES', 'ADMIN_USER')
PGA_PASSWD_ = config.load('POSTGRES', 'ADMIN_PASSWORD')
PG_SERVER_ = config.load('POSTGRES', 'SERVER')
PG_PORT_ = config.load('POSTGRES', 'PORT')
PG_DB_ = config.load('POSTGRES', 'DB', fallback='iris_db')
CELERY_BROKER_ = config.load('CELERY', 'BROKER',
fallback=f"amqp://{config.load('CELERY', 'HOST', fallback='rabbitmq')}")
# Grabs the folder where the script runs.
basedir = os.path.abspath(os.path.dirname(__file__))
# Build of SQLAlchemy connectors. One is admin and the other is only for iris. Admin is needed to create new DB
SQLALCHEMY_BASE_URI = "postgresql+psycopg2://{user}:{passwd}@{server}:{port}/".format(
user=PG_ACCOUNT_,
passwd=PG_PASSWD_,
server=PG_SERVER_,
port=PG_PORT_
)
SQLALCHEMY_BASEA_URI = "postgresql+psycopg2://{user}:{passwd}@{server}:{port}/".format(
user=PGA_ACCOUNT_,
passwd=PGA_PASSWD_,
server=PG_SERVER_,
port=PG_PORT_
)
SQLALCHEMY_BASE_ADMIN_URI = "postgresql+psycopg2://{user}:{passwd}@{server}:{port}/".format(user=PGA_ACCOUNT_,
passwd=PGA_PASSWD_,
server=PG_SERVER_,
port=PG_PORT_)
class AuthenticationType(Enum):
local = 1
oidc_proxy = 2
authentication_type = os.environ.get('IRIS_AUTHENTICATION_TYPE',
config.get('IRIS', 'AUTHENTICATION_TYPE', fallback="local"))
authentication_create_user_if_not_exists = config.load('IRIS', 'AUTHENTICATION_CREATE_USER_IF_NOT_EXIST')
tls_root_ca = os.environ.get('TLS_ROOT_CA',
config.get('IRIS', 'TLS_ROOT_CA', fallback=None))
authentication_logout_url = None
authentication_account_service_url = None
authentication_token_introspection_url = None
authentication_client_id = None
authentication_client_secret = None
authentication_app_admin_role_name = None
authentication_jwks_url = None
if authentication_type == 'oidc_proxy':
oidc_discovery_url = config.load('OIDC', 'IRIS_DISCOVERY_URL', fallback="")
try:
oidc_discovery_response = requests.get(oidc_discovery_url, verify=tls_root_ca)
if oidc_discovery_response.status_code == 200:
response_json = oidc_discovery_response.json()
authentication_logout_url = response_json.get('end_session_endpoint')
authentication_account_service_url = f"{response_json.get('issuer')}/account"
authentication_token_introspection_url = response_json.get('introspection_endpoint')
authentication_jwks_url = response_json.get('jwks_uri')
else:
raise IrisConfigException("Unsuccessful authN server discovery")
authentication_client_id = config.load('OIDC', 'IRIS_CLIENT_ID', fallback="")
authentication_client_secret = config.load('OIDC', 'IRIS_CLIENT_SECRET', fallback="")
authentication_app_admin_role_name = config.load('OIDC', 'IRIS_ADMIN_ROLE_NAME', fallback="")
except Exception as e:
log.error(f"OIDC ERROR - {e}")
exit(0)
pass
else:
log.info("OIDC configuration properly parsed")
# --------- CELERY ---------
class CeleryConfig:
result_backend = "db+" + SQLALCHEMY_BASE_URI + "iris_tasks" # use database as storage
broker_url = CELERY_BROKER_
result_extended = True
result_serializer = "json"
worker_pool_restarts = True
# --------- APP ---------
class Config:
# Handled by bumpversion
IRIS_VERSION = "v2.3.3"
API_MIN_VERSION = "2.0.0"
API_MAX_VERSION = "2.0.3"
MODULES_INTERFACE_MIN_VERSION = '1.1'
MODULES_INTERFACE_MAX_VERSION = '1.2.0'
if os.environ.get('IRIS_WORKER') is None:
CSRF_ENABLED = True
SECRET_KEY = config.load('IRIS', 'SECRET_KEY')
SECURITY_PASSWORD_SALT = config.load('IRIS', 'SECURITY_PASSWORD_SALT')
SECURITY_LOGIN_USER_TEMPLATE = 'login.html'
IRIS_ADM_EMAIL = config.load('IRIS', 'ADM_EMAIL')
IRIS_ADM_PASSWORD = config.load('IRIS', 'ADM_PASSWORD')
IRIS_ADM_USERNAME = config.load('IRIS', 'ADM_USERNAME')
IRIS_ADM_API_KEY = config.load('IRIS', 'ADM_API_KEY')
PERMANENT_SESSION_LIFETIME = timedelta(hours=24)
SESSION_COOKIE_SAMESITE = 'Lax'
SESSION_COOKIE_SECURE = True
PG_ACCOUNT = PG_ACCOUNT_
PG_PASSWD = PG_PASSWD_
PGA_ACCOUNT = PGA_ACCOUNT_
PGA_PASSWD = PGA_PASSWD_
PG_SERVER = PG_SERVER_
PG_PORT = PG_PORT_
PG_DB = PG_DB_
DB_RETRY_COUNT = config.load('DB', 'RETRY_COUNT', fallback=3)
DB_RETRY_DELAY = config.load('DB', 'RETRY_DELAY', fallback=0.5)
DEMO_MODE_ENABLED = config.load('IRIS_DEMO', 'ENABLED', fallback=False)
if DEMO_MODE_ENABLED == 'True':
DEMO_DOMAIN = config.load('IRIS_DEMO', 'DOMAIN', fallback=None)
DEMO_USERS_SEED = config.load('IRIS_DEMO', 'USERS_SEED', fallback=0)
DEMO_ADM_SEED = config.load('IRIS_DEMO', 'ADM_SEED', fallback=0)
MAX_CONTENT_LENGTH = 200000
WTF_CSRF_TIME_LIMIT = None
""" SqlAlchemy configuration
"""
SQLALCHEMY_TRACK_MODIFICATIONS = False
SQLALCHEMY_DATABASE_URI = SQLALCHEMY_BASE_URI + PG_DB_
SQLALCHEMY_BINDS = {
'iris_tasks': SQLALCHEMY_BASE_URI + 'iris_tasks'
}
SQALCHEMY_PIGGER_URI = SQLALCHEMY_BASE_URI
""" Dropzone configuration
Set download path, max file upload size and timeout
"""
APP_PATH = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
UPLOADED_PATH = config.load('IRIS', 'UPLOADED_PATH', fallback="/home/iris/downloads")
TEMPLATES_PATH = config.load('IRIS', 'TEMPLATES_PATH', fallback="/home/iris/user_templates")
BACKUP_PATH = config.load('IRIS', 'BACKUP_PATH', fallback="/home/iris/server_data/backup")
UPDATES_PATH = os.path.join(BACKUP_PATH, 'updates')
RELEASE_URL = config.load('IRIS', 'RELEASE_URL',
fallback="https://api.github.com/repos/dfir-iris/iris-web/releases")
RELEASE_SIGNATURE_KEY = config.load('IRIS', 'RELEASE_SIGNATURE_KEY', fallback="dependencies/DFIR-IRIS_pkey.asc")
PG_CLIENT_PATH = config.load('IRIS', 'PG_CLIENT_PATH', fallback="/usr/bin")
ASSET_STORE_PATH = config.load('IRIS', 'ASSET_STORE_PATH', fallback="/home/iris/server_data/custom_assets")
DATASTORE_PATH = config.load('IRIS', 'DATASTORE_PATH', fallback="/home/iris/server_data/datastore")
ASSET_SHOW_PATH = "/static/assets/img/graph"
ORGANISATION_NAME = config.load('IRIS', 'ORGANISATION_NAME', fallback='')
LOGIN_BANNER_TEXT = config.load('IRIS', 'LOGIN_BANNER_TEXT', fallback='')
LOGIN_PTFM_CONTACT = config.load('IRIS', 'LOGIN_PTFM_CONTACT', fallback='Please contact the platform administrator')
UPDATE_DIR_NAME = '_updates_'
DROPZONE_MAX_FILE_SIZE = 1024 * 1024 * 1024 * 10 # 10 GB
DROPZONE_TIMEOUT = 15 * 60 * 10000 # 15 Minutes of uploads per file
""" Celery configuration
Configure URL and backend
"""
CELERY = CeleryConfig
if os.getenv('IRIS_DEV'):
DEVELOPMENT = True
else:
DEVELOPMENT = config.load('DEVELOPMENT', 'IS_DEV_INSTANCE') == "True"
"""
Authentication configuration
"""
TLS_ROOT_CA = tls_root_ca
AUTHENTICATION_TYPE = authentication_type
AUTHENTICATION_CREATE_USER_IF_NOT_EXIST = (authentication_create_user_if_not_exists == "True")
IRIS_NEW_USERS_DEFAULT_GROUP = config.load('IRIS', 'NEW_USERS_DEFAULT_GROUP', fallback='Analysts')
AUTHENTICATION_LOCAL_FALLBACK = config.load('IRIS', 'AUTHENTICATION_LOCAL_FALLBACK', fallback="True") == "True"
if authentication_type == 'oidc_proxy':
AUTHENTICATION_LOGOUT_URL = authentication_logout_url
AUTHENTICATION_ACCOUNT_SERVICE_URL = authentication_account_service_url
AUTHENTICATION_PROXY_LOGOUT_URL = f"/oauth2/sign_out?rd={AUTHENTICATION_LOGOUT_URL}?redirect_uri=/dashboard"
AUTHENTICATION_TOKEN_INTROSPECTION_URL = authentication_token_introspection_url
AUTHENTICATION_JWKS_URL = authentication_jwks_url
AUTHENTICATION_CLIENT_ID = authentication_client_id
AUTHENTICATION_CLIENT_SECRET = authentication_client_secret
AUTHENTICATION_AUDIENCE = config.load('OIDC', 'IRIS_AUDIENCE', fallback="")
AUTHENTICATION_VERIFY_TOKEN_EXP = config.load('OIDC', 'IRIS_VERIFY_TOKEN_EXPIRATION',
fallback=True)
AUTHENTICATION_TOKEN_VERIFY_MODE = config.load('OIDC', 'IRIS_TOKEN_VERIFY_MODE',
fallback='signature')
AUTHENTICATION_INIT_ADMINISTRATOR_EMAIL = config.load('OIDC', 'IRIS_INIT_ADMINISTRATOR_EMAIL',
fallback="")
AUTHENTICATION_APP_ADMIN_ROLE_NAME = authentication_app_admin_role_name
elif authentication_type == 'ldap':
LDAP_SERVER = config.load('LDAP', 'SERVER')
if LDAP_SERVER is None:
raise Exception('LDAP enabled and no server configured')
LDAP_PORT = config.load('LDAP', 'PORT')
if LDAP_PORT is None:
raise Exception('LDAP enabled and no server configured')
LDAP_USER_PREFIX = config.load('LDAP', 'USER_PREFIX', '')
if LDAP_USER_PREFIX is None:
raise Exception('LDAP enabled and no user prefix configured')
LDAP_USER_SUFFIX = config.load('LDAP', 'USER_SUFFIX', '')
if LDAP_USER_SUFFIX is None:
raise Exception('LDAP enabled and no user suffix configured')
LDAP_AUTHENTICATION_TYPE = config.load('LDAP', 'AUTHENTICATION_TYPE')
LDAP_SEARCH_DN = config.load('LDAP', 'SEARCH_DN')
if authentication_create_user_if_not_exists and LDAP_SEARCH_DN is None:
raise Exception('LDAP enabled with user provisioning: LDAP_SEARCH_DN should be set')
LDAP_ATTRIBUTE_IDENTIFIER = config.load('LDAP', 'ATTRIBUTE_IDENTIFIER')
if authentication_create_user_if_not_exists and LDAP_ATTRIBUTE_IDENTIFIER is None:
raise Exception('LDAP enabled with user provisioning: LDAP_ATTRIBUTE_IDENTIFIER should be set')
LDAP_ATTRIBUTE_DISPLAY_NAME = config.load('LDAP', 'ATTRIBUTE_DISPLAY_NAME')
LDAP_ATTRIBUTE_MAIL = config.load('LDAP', 'ATTRIBUTE_MAIL')
LDAP_USE_SSL = config.load('LDAP', 'USE_SSL', fallback='True')
LDAP_USE_SSL = (LDAP_USE_SSL == 'True')
LDAP_VALIDATE_CERTIFICATE = config.load('LDAP', 'VALIDATE_CERTIFICATE', fallback='True')
LDAP_VALIDATE_CERTIFICATE = (LDAP_VALIDATE_CERTIFICATE == 'True')
ldap_tls_v = config.load('LDAP', 'TLS_VERSION', '1.2')
if ldap_tls_v not in ['1.0', '1.1', '1.2']:
raise Exception(f'Unsupported LDAP TLS version {ldap_tls_v}')
if ldap_tls_v == '1.1':
LDAP_TLS_VERSION = ssl.PROTOCOL_TLSv1_1
elif ldap_tls_v == '1.2':
LDAP_TLS_VERSION = ssl.PROTOCOL_TLSv1_2
elif ldap_tls_v == '1.0':
LDAP_TLS_VERSION = ssl.PROTOCOL_TLSv1
proto = 'ldaps' if LDAP_USE_SSL else 'ldap'
LDAP_CONNECT_STRING = f'{proto}://{LDAP_SERVER}:{LDAP_PORT}'
if LDAP_USE_SSL:
LDAP_SERVER_CERTIFICATE = config.load('LDAP', 'SERVER_CERTIFICATE')
if not Path(f'certificates/ldap/{LDAP_SERVER_CERTIFICATE}').is_file():
log.error(f'Unable to read LDAP certificate file certificates/ldap/{LDAP_SERVER_CERTIFICATE}')
raise Exception(f'Unable to read LDAP certificate file certificates/ldap/{LDAP_SERVER_CERTIFICATE}')
LDAP_PRIVATE_KEY = config.load('LDAP', 'PRIVATE_KEY')
if LDAP_PRIVATE_KEY and not Path(f'certificates/ldap/{LDAP_PRIVATE_KEY}').is_file():
log.error(f'Unable to read LDAP certificate file certificates/ldap/{LDAP_PRIVATE_KEY}')
raise Exception(f'Unable to read LDAP certificate file certificates/ldap/{LDAP_PRIVATE_KEY}')
PRIVATE_KEY_PASSWORD = config.load('LDAP', 'PRIVATE_KEY_PASSWORD', fallback=None)
LDAP_CA_CERTIFICATE = config.load('LDAP', 'CA_CERTIFICATE')
if LDAP_CA_CERTIFICATE and not Path(f'certificates/ldap/{LDAP_CA_CERTIFICATE}').is_file():
log.error(f'Unable to read LDAP certificate file certificates/ldap/{LDAP_CA_CERTIFICATE}')
raise Exception(f'Unable to read LDAP certificate file certificates/ldap/{LDAP_CA_CERTIFICATE}')
LDAP_CUSTOM_TLS_CONFIG = config.load('LDAP', 'CUSTOM_TLS_CONFIG', fallback='True')
LDAP_CUSTOM_TLS_CONFIG = (LDAP_CUSTOM_TLS_CONFIG == 'True')
""" Caching
"""
CACHE_TYPE = "SimpleCache"
CACHE_DEFAULT_TIMEOUT = 300