Skip to main content
Glama
by cycodehq
configuration_manager.py9.28 kB
import os from functools import cache from pathlib import Path from typing import Any, Optional from uuid import uuid4 from cycode.cli import consts from cycode.cli.user_settings.config_file_manager import ConfigFileManager class ConfigurationManager: global_config_file_manager: ConfigFileManager local_config_file_manager: ConfigFileManager def __init__(self) -> None: self.global_config_file_manager = ConfigFileManager(Path.home()) self.local_config_file_manager = ConfigFileManager(os.getcwd()) def get_cycode_api_url(self) -> str: api_url = self.get_api_url_from_environment_variables() if api_url is not None: return api_url api_url = self.local_config_file_manager.get_api_url() if api_url is not None: return api_url api_url = self.global_config_file_manager.get_api_url() if api_url is not None: return api_url return consts.DEFAULT_CYCODE_API_URL def get_cycode_app_url(self) -> str: app_url = self.get_app_url_from_environment_variables() if app_url is not None: return app_url app_url = self.local_config_file_manager.get_app_url() if app_url is not None: return app_url app_url = self.global_config_file_manager.get_app_url() if app_url is not None: return app_url return consts.DEFAULT_CYCODE_APP_URL def get_debug_flag_from_environment_variables(self) -> bool: value = self._get_value_from_environment_variables(consts.DEBUG_ENV_VAR_NAME, '') return value.lower() in {'true', '1'} def get_debug_flag(self) -> bool: return self.get_debug_flag_from_environment_variables() def get_verbose_flag(self) -> bool: verbose_flag_env_var = self.get_verbose_flag_from_environment_variables() verbose_flag_local_config = self.local_config_file_manager.get_verbose_flag() verbose_flag_global_config = self.global_config_file_manager.get_verbose_flag() return verbose_flag_env_var or verbose_flag_local_config or verbose_flag_global_config def get_api_url_from_environment_variables(self) -> Optional[str]: return self._get_value_from_environment_variables(consts.CYCODE_API_URL_ENV_VAR_NAME) def get_app_url_from_environment_variables(self) -> Optional[str]: return self._get_value_from_environment_variables(consts.CYCODE_APP_URL_ENV_VAR_NAME) def get_verbose_flag_from_environment_variables(self) -> bool: value = self._get_value_from_environment_variables(consts.VERBOSE_ENV_VAR_NAME, '') return value.lower() in ('true', '1') @cache # noqa: B019 def get_exclusions_by_scan_type(self, scan_type: str) -> dict: local_exclusions = self.local_config_file_manager.get_exclusions_by_scan_type(scan_type) global_exclusions = self.global_config_file_manager.get_exclusions_by_scan_type(scan_type) return self._merge_exclusions(local_exclusions, global_exclusions) def add_exclusion(self, scope: str, scan_type: str, exclusion_type: str, value: str) -> None: config_file_manager = self.get_config_file_manager(scope) config_file_manager.add_exclusion(scan_type, exclusion_type, value) @staticmethod def _merge_exclusions(local_exclusions: dict, global_exclusions: dict) -> dict: keys = set(list(local_exclusions.keys()) + list(global_exclusions.keys())) return {key: (local_exclusions.get(key) or []) + (global_exclusions.get(key) or []) for key in keys} def get_or_create_installation_id(self) -> str: config_file_manager = self.get_config_file_manager() installation_id = config_file_manager.get_installation_id() if installation_id is None: installation_id = uuid4().hex config_file_manager.update_installation_id(installation_id) return installation_id def get_config_file_manager(self, scope: Optional[str] = None) -> ConfigFileManager: if scope == 'local': return self.local_config_file_manager return self.global_config_file_manager def get_scan_polling_timeout_in_seconds(self) -> int: return int( self._get_value_from_environment_variables( consts.SCAN_POLLING_TIMEOUT_IN_SECONDS_ENV_VAR_NAME, consts.DEFAULT_SCAN_POLLING_TIMEOUT_IN_SECONDS ) ) def get_sync_scan_timeout_in_seconds(self) -> int: return int( self._get_value_from_environment_variables( consts.SYNC_SCAN_TIMEOUT_IN_SECONDS_ENV_VAR_NAME, consts.DEFAULT_SYNC_SCAN_TIMEOUT_IN_SECONDS ) ) def get_ai_remediation_timeout_in_seconds(self) -> int: return int( self._get_value_from_environment_variables( consts.AI_REMEDIATION_TIMEOUT_IN_SECONDS_ENV_VAR_NAME, consts.DEFAULT_AI_REMEDIATION_TIMEOUT_IN_SECONDS ) ) def get_report_polling_timeout_in_seconds(self) -> int: return int( self._get_value_from_environment_variables( consts.REPORT_POLLING_TIMEOUT_IN_SECONDS_ENV_VAR_NAME, consts.DEFAULT_REPORT_POLLING_TIMEOUT_IN_SECONDS ) ) def get_sca_pre_commit_timeout_in_seconds(self) -> int: return int( self._get_value_from_environment_variables( consts.SCA_PRE_COMMIT_TIMEOUT_IN_SECONDS_ENV_VAR_NAME, consts.DEFAULT_SCA_PRE_COMMIT_TIMEOUT_IN_SECONDS ) ) def _get_git_hook_max_commits_to_scan_count( self, command_scan_type: str, env_var_name: str, default_count: int ) -> int: max_commits = self._get_value_from_environment_variables(env_var_name) if max_commits is not None: return int(max_commits) max_commits = self.local_config_file_manager.get_max_commits(command_scan_type) if max_commits is not None: return max_commits max_commits = self.global_config_file_manager.get_max_commits(command_scan_type) if max_commits is not None: return max_commits return default_count def get_pre_receive_max_commits_to_scan_count(self, command_scan_type: str) -> int: return self._get_git_hook_max_commits_to_scan_count( command_scan_type, consts.PRE_RECEIVE_MAX_COMMITS_TO_SCAN_COUNT_ENV_VAR_NAME, consts.DEFAULT_PRE_RECEIVE_MAX_COMMITS_TO_SCAN_COUNT, ) def get_pre_push_max_commits_to_scan_count(self, command_scan_type: str) -> int: return self._get_git_hook_max_commits_to_scan_count( command_scan_type, consts.PRE_PUSH_MAX_COMMITS_TO_SCAN_COUNT_ENV_VAR_NAME, consts.DEFAULT_PRE_PUSH_MAX_COMMITS_TO_SCAN_COUNT, ) def _get_git_hook_command_timeout(self, command_scan_type: str, env_var_name: str, default_timeout: int) -> int: command_timeout = self._get_value_from_environment_variables(env_var_name) if command_timeout is not None: return int(command_timeout) command_timeout = self.local_config_file_manager.get_command_timeout(command_scan_type) if command_timeout is not None: return command_timeout command_timeout = self.global_config_file_manager.get_command_timeout(command_scan_type) if command_timeout is not None: return command_timeout return default_timeout def get_pre_receive_command_timeout(self, command_scan_type: str) -> int: return self._get_git_hook_command_timeout( command_scan_type, consts.PRE_RECEIVE_COMMAND_TIMEOUT_ENV_VAR_NAME, consts.DEFAULT_PRE_RECEIVE_COMMAND_TIMEOUT_IN_SECONDS, ) def get_pre_push_command_timeout(self, command_scan_type: str) -> int: return self._get_git_hook_command_timeout( command_scan_type, consts.PRE_PUSH_COMMAND_TIMEOUT_ENV_VAR_NAME, consts.DEFAULT_PRE_PUSH_COMMAND_TIMEOUT_IN_SECONDS, ) def get_should_exclude_detections_in_deleted_lines(self, command_scan_type: str) -> bool: exclude_detections_in_deleted_lines = self._get_value_from_environment_variables( consts.EXCLUDE_DETECTIONS_IN_DELETED_LINES_ENV_VAR_NAME ) if exclude_detections_in_deleted_lines is not None: return exclude_detections_in_deleted_lines.lower() in ('true', '1') exclude_detections_in_deleted_lines = self.local_config_file_manager.get_exclude_detections_in_deleted_lines( command_scan_type ) if exclude_detections_in_deleted_lines is not None: return exclude_detections_in_deleted_lines exclude_detections_in_deleted_lines = self.global_config_file_manager.get_exclude_detections_in_deleted_lines( command_scan_type ) if exclude_detections_in_deleted_lines is not None: return exclude_detections_in_deleted_lines return consts.DEFAULT_EXCLUDE_DETECTIONS_IN_DELETED_LINES @staticmethod def _get_value_from_environment_variables(env_var_name: str, default: Optional[Any] = None) -> Optional[Any]: return os.getenv(env_var_name, default)

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/cycodehq/cycode-cli'

If you have feedback or need assistance with the MCP directory API, please join our Discord server