Skip to main content
Glama

Adversary MCP Server

by brettbergin
credentials.py42.3 kB
"""Secure credential management for Adversary MCP server.""" import getpass import json import os import socket import stat from base64 import b64decode, b64encode from dataclasses import asdict from pathlib import Path from typing import Any import keyring from cryptography.fernet import Fernet, InvalidToken from cryptography.hazmat.primitives import hashes from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC from keyring.errors import KeyringError from .config import SecurityConfig from .logger import get_logger logger = get_logger("credentials") # Module-level singleton instance to prevent repeated keychain access _credential_manager_instance = None def get_credential_manager(config_dir: Path | None = None) -> "CredentialManager": """Get or create the singleton CredentialManager instance. This function ensures only one CredentialManager instance exists per process, preventing repeated keychain access prompts. Args: config_dir: Configuration directory (only used on first call) Returns: The singleton CredentialManager instance """ global _credential_manager_instance if _credential_manager_instance is None: _credential_manager_instance = CredentialManager(config_dir) return _credential_manager_instance def reset_credential_manager(): """Reset the singleton instance. Used primarily for testing. WARNING: This will clear all cached credentials and force a reload from keychain on next access. """ global _credential_manager_instance if _credential_manager_instance is not None: # Clear the instance's cache _credential_manager_instance._config_cache = None _credential_manager_instance._cache_loaded = False _credential_manager_instance = None CredentialManager._instance = None CredentialManager._initialized = False class CredentialError(Exception): """Base exception for credential errors.""" pass class CredentialNotFoundError(CredentialError): """Exception raised when credentials are not found.""" pass class CredentialStorageError(CredentialError): """Exception raised when credential storage fails.""" pass class CredentialDecryptionError(CredentialError): """Exception raised when credential decryption fails.""" pass class CredentialManager: """Secure credential manager for Adversary MCP server configuration. Implements singleton pattern to prevent repeated keychain access. """ _instance = None _initialized = False def __new__(cls, config_dir: Path | None = None): """Create or return the singleton instance.""" if cls._instance is None: logger.info("Creating new CredentialManager singleton instance") cls._instance = super().__new__(cls) else: logger.debug("Returning existing CredentialManager singleton instance") return cls._instance def __init__(self, config_dir: Path | None = None) -> None: """Initialize credential manager. Args: config_dir: Configuration directory (default: ~/.local/share/adversary-mcp-server) """ # Only initialize once to prevent repeated keychain access if CredentialManager._initialized: logger.debug("CredentialManager already initialized, skipping") return logger.info("Initializing CredentialManager for the first time") CredentialManager._initialized = True if config_dir is None: config_dir = Path.home() / ".local" / "share" / "adversary-mcp-server" logger.debug(f"Using default config directory: {config_dir}") else: logger.debug(f"Using custom config directory: {config_dir}") self.config_dir = config_dir self.config_file = config_dir / "config.json" self.keyring_service = "adversary-mcp-server" logger.debug(f"Config file path: {self.config_file}") logger.debug(f"Keyring service name: {self.keyring_service}") # In-memory cache to reduce keychain access self._config_cache: SecurityConfig | None = None self._cache_loaded = False logger.debug("Initialized credential cache") # Ensure config directory exists with proper permissions self._ensure_config_dir() logger.info("CredentialManager initialization complete") def _ensure_config_dir(self) -> None: """Ensure configuration directory exists with proper permissions.""" logger.debug(f"Ensuring config directory exists: {self.config_dir}") if not self.config_dir.exists(): logger.info(f"Creating config directory: {self.config_dir}") self.config_dir.mkdir(parents=True, exist_ok=True) else: logger.debug("Config directory already exists") # Set restrictive permissions (owner only) try: self.config_dir.chmod(stat.S_IRWXU) # 700 logger.debug("Set restrictive permissions (700) on config directory") except OSError as e: # May fail on some systems, but not critical logger.warning( f"Failed to set restrictive permissions on config directory: {e}" ) def _derive_key(self, password: bytes, salt: bytes) -> bytes: """Derive encryption key from password and salt.""" kdf = PBKDF2HMAC( algorithm=hashes.SHA256(), length=32, salt=salt, iterations=100000, ) return b64encode(kdf.derive(password)) def _encrypt_data(self, data: str, password: str) -> dict[str, str]: """Encrypt data with password. Args: data: Data to encrypt password: Password for encryption Returns: Dictionary with encrypted data and salt """ salt = os.urandom(16) key = self._derive_key(password.encode(), salt) f = Fernet(key) encrypted_data = f.encrypt(data.encode()) return { "encrypted_data": b64encode(encrypted_data).decode(), "salt": b64encode(salt).decode(), } def _decrypt_data(self, encrypted_data: str, salt: str, password: str) -> str: """Decrypt data with password. Args: encrypted_data: Encrypted data salt: Salt used for encryption password: Password for decryption Returns: Decrypted data Raises: CredentialDecryptionError: If decryption fails """ try: salt_bytes = b64decode(salt.encode()) key = self._derive_key(password.encode(), salt_bytes) f = Fernet(key) encrypted_bytes = b64decode(encrypted_data.encode()) decrypted_data = f.decrypt(encrypted_bytes) return decrypted_data.decode() except (InvalidToken, ValueError, UnicodeDecodeError) as e: raise CredentialDecryptionError(f"Failed to decrypt data: {e}") def _get_machine_id(self) -> str: """Get a machine-specific identifier for encryption.""" # Try to get a machine-specific ID machine_id = None # Try /etc/machine-id (Linux) if os.path.exists("/etc/machine-id"): try: with open("/etc/machine-id") as f: machine_id = f.read().strip() except OSError: pass # Try /var/lib/dbus/machine-id (Linux) if not machine_id and os.path.exists("/var/lib/dbus/machine-id"): try: with open("/var/lib/dbus/machine-id") as f: machine_id = f.read().strip() except OSError: pass # Fallback to hostname + username if not machine_id: machine_id = f"{socket.gethostname()}-{getpass.getuser()}" return machine_id def _try_keyring_storage(self, config: SecurityConfig) -> bool: """Try to store configuration using keyring. Args: config: Security configuration to store Returns: True if storage succeeded, False otherwise """ logger.debug("Attempting to store configuration in keyring") logger.debug(f"Keyring service: {self.keyring_service}") logger.debug( f"Config to store in keyring - LLM provider: {config.llm_provider}, " f"LLM API key: {'SET' if config.llm_api_key else 'NULL'}, " f"Semgrep API key: {'SET' if config.semgrep_api_key else 'NULL'}" ) try: config_dict = asdict(config) config_json = json.dumps(config_dict) logger.debug(f"Config JSON length: {len(config_json)} characters") logger.debug( f"Storing at keyring service '{self.keyring_service}' with key 'config'" ) keyring.set_password(self.keyring_service, "config", config_json) # Immediately verify the storage worked (only if not in test environment) try: logger.debug( "Verifying keyring storage by attempting immediate retrieval..." ) verification_json = keyring.get_password(self.keyring_service, "config") if verification_json: verification_config = SecurityConfig( **json.loads(verification_json) ) logger.info( f"Keyring storage verified - stored config has provider: {verification_config.llm_provider}" ) else: # Don't fail on verification issues - storage might still have worked logger.debug( "Keyring storage verification returned None (may be normal in test environments)" ) except Exception as e: # Don't fail on verification issues - the main storage operation succeeded logger.debug( f"Keyring storage verification failed: {e} (may be normal in test environments)" ) logger.info("Successfully stored configuration in keyring") return True except KeyringError as e: logger.warning(f"Failed to store configuration in keyring: {e}") return False except Exception as e: logger.error(f"Unexpected error storing configuration in keyring: {e}") return False def _try_keyring_retrieval(self) -> SecurityConfig | None: """Try to retrieve configuration from keyring. Returns: SecurityConfig if found, None otherwise """ logger.debug("Attempting to retrieve configuration from keyring") logger.debug(f"Keyring service: {self.keyring_service}") logger.debug( f"Looking for key 'config' in keyring service '{self.keyring_service}'" ) try: config_json = keyring.get_password(self.keyring_service, "config") if config_json: logger.debug( f"Configuration found in keyring, JSON length: {len(config_json)} characters" ) logger.debug("Parsing JSON configuration...") config_dict = json.loads(config_json) logger.debug(f"Parsed config dict keys: {list(config_dict.keys())}") # Handle backward compatibility for missing fields # Add default values for any fields that might be missing from older configs defaults = { "enable_llm_analysis": True, "enable_llm_validation": True, "llm_temperature": 0.1, "llm_max_tokens": 4000, "llm_batch_size": 10, "enable_semgrep_scanning": True, "semgrep_config": None, "semgrep_rules": None, "enable_exploit_generation": True, "exploit_safety_mode": True, "max_file_size_mb": 10, "timeout_seconds": 300, "severity_threshold": "medium", "enable_caching": True, "cache_max_size_mb": 100, "cache_max_age_hours": 24, "cache_llm_responses": True, } # Apply defaults for missing keys for key, default_value in defaults.items(): if key not in config_dict: config_dict[key] = default_value logger.debug( f"Added missing field '{key}' with default value: {default_value}" ) config = SecurityConfig(**config_dict) logger.debug( f"Created SecurityConfig object - LLM provider: {config.llm_provider}, " f"LLM API key: {'SET' if config.llm_api_key else 'NULL'}, " f"Semgrep API key: {'SET' if config.semgrep_api_key else 'NULL'}" ) logger.info("Successfully retrieved configuration from keyring") return config else: logger.debug( "No configuration found in keyring (keyring.get_password returned None)" ) except KeyringError as e: logger.warning(f"KeyringError retrieving configuration from keyring: {e}") except json.JSONDecodeError as e: logger.warning( f"JSON decode error retrieving configuration from keyring: {e}" ) except TypeError as e: logger.warning(f"TypeError retrieving configuration from keyring: {e}") except Exception as e: logger.error(f"Unexpected error retrieving configuration from keyring: {e}") return None def _try_keyring_deletion(self) -> bool: """Try to delete configuration from keyring. Returns: True if deletion succeeded, False otherwise """ logger.debug("Attempting to delete configuration from keyring") try: keyring.delete_password(self.keyring_service, "config") logger.info("Successfully deleted configuration from keyring") return True except KeyringError as e: logger.warning(f"Failed to delete configuration from keyring: {e}") return False def _store_file_config(self, config: SecurityConfig) -> None: """Store configuration in encrypted file. Args: config: Security configuration to store Raises: CredentialStorageError: If storage fails """ logger.debug(f"Storing encrypted configuration to file: {self.config_file}") try: config_json = json.dumps(asdict(config)) machine_id = self._get_machine_id() logger.debug(f"Using machine ID for encryption: {machine_id[:10]}...") encrypted_data = self._encrypt_data(config_json, machine_id) logger.debug("Configuration encrypted successfully") with open(self.config_file, "w") as f: json.dump(encrypted_data, f) logger.debug("Encrypted configuration written to file") # Set restrictive permissions self.config_file.chmod(stat.S_IRUSR | stat.S_IWUSR) # 600 logger.debug("Set restrictive permissions (600) on config file") logger.info("Successfully stored encrypted configuration to file") except OSError as e: logger.error(f"Failed to store configuration to file: {e}") raise CredentialStorageError(f"Failed to store configuration: {e}") def _load_file_config(self) -> SecurityConfig | None: """Load configuration from encrypted file. Returns: SecurityConfig if found and decrypted successfully, None otherwise """ logger.debug(f"Attempting to load configuration from file: {self.config_file}") if not self.config_file.exists(): logger.debug("Configuration file does not exist") return None try: with open(self.config_file) as f: data = json.load(f) logger.debug("Configuration file loaded successfully") # Check if this is an encrypted file (has encrypted_data and salt) if "encrypted_data" in data and "salt" in data: logger.debug("Detected encrypted configuration file, decrypting") # Handle encrypted file machine_id = self._get_machine_id() config_json = self._decrypt_data( data["encrypted_data"], data["salt"], machine_id, ) config_dict = json.loads(config_json) logger.debug("Configuration decrypted successfully") else: logger.debug( "Detected plain JSON configuration file (backward compatibility)" ) # Handle plain JSON file (backward compatibility) config_dict = data # Handle backward compatibility for missing fields (same as keyring loading) defaults = { "enable_llm_analysis": True, "enable_llm_validation": True, "llm_temperature": 0.1, "llm_max_tokens": 4000, "llm_batch_size": 10, "enable_semgrep_scanning": True, "semgrep_config": None, "semgrep_rules": None, "enable_exploit_generation": True, "exploit_safety_mode": True, "max_file_size_mb": 10, "timeout_seconds": 300, "severity_threshold": "medium", "enable_caching": True, "cache_max_size_mb": 100, "cache_max_age_hours": 24, "cache_llm_responses": True, } # Apply defaults for missing keys for key, default_value in defaults.items(): if key not in config_dict: config_dict[key] = default_value logger.debug( f"Added missing field '{key}' with default value: {default_value}" ) config = SecurityConfig(**config_dict) logger.info("Successfully loaded configuration from file") return config except ( OSError, json.JSONDecodeError, KeyError, TypeError, CredentialDecryptionError, ) as e: logger.warning(f"Failed to load configuration from file: {e}") return None def _delete_file_config(self) -> bool: """Delete configuration file. Returns: True if deletion succeeded, False otherwise """ logger.debug(f"Attempting to delete configuration file: {self.config_file}") try: if self.config_file.exists(): self.config_file.unlink() logger.info("Successfully deleted configuration file") else: logger.debug("Configuration file does not exist, nothing to delete") return True except OSError as e: logger.warning(f"Failed to delete configuration file: {e}") return False def store_config(self, config: SecurityConfig) -> None: """Store security configuration. Args: config: Security configuration to store Raises: CredentialStorageError: If storage fails in both keyring and file """ logger.info("=== STORING SECURITY CONFIGURATION ===") logger.info("Storing security configuration") logger.debug( f"Configuration to store - LLM provider: {config.llm_provider}, " f"LLM API key configured: {bool(config.llm_api_key)}, Model: {config.llm_model}, " f"Semgrep enabled: {config.enable_semgrep_scanning}, " f"Semgrep API key configured: {bool(config.semgrep_api_key)}, " f"Use LLM validation: {config.enable_llm_validation}" ) # Store individual API keys separately before storing main config if config.llm_provider and config.llm_api_key: logger.info( f"Storing {config.llm_provider} API key separately in keyring..." ) try: self.store_llm_api_key(config.llm_provider, config.llm_api_key) logger.info( f"Successfully stored {config.llm_provider} API key separately" ) except Exception as e: logger.error( f"Failed to store {config.llm_provider} API key separately: {e}" ) if config.semgrep_api_key: logger.info("Storing Semgrep API key separately in keyring...") try: self.store_semgrep_api_key(config.semgrep_api_key) logger.info("Successfully stored Semgrep API key separately") except Exception as e: logger.error(f"Failed to store Semgrep API key separately: {e}") # Create a copy of config with API keys set to None for main storage # (Following the pattern from CLI where keys are stored separately) from dataclasses import replace config_for_storage = replace( config, llm_api_key=None, # Don't store the actual API key in config semgrep_api_key=None, # Don't store the actual API key in config ) logger.debug( f"Config for main storage (API keys nullified) - LLM provider: {config_for_storage.llm_provider}, " f"LLM API key: {'SET' if config_for_storage.llm_api_key else 'NULL'}, " f"Semgrep API key: {'SET' if config_for_storage.semgrep_api_key else 'NULL'}" ) # Try keyring first logger.info("Attempting to store main config in keyring...") if self._try_keyring_storage(config_for_storage): # Update cache with the ORIGINAL config (with API keys) self._config_cache = config self._cache_loaded = True logger.info("Configuration stored successfully in keyring") logger.info( f"Cache updated with full config (LLM key: {'SET' if config.llm_api_key else 'NULL'}, Semgrep key: {'SET' if config.semgrep_api_key else 'NULL'})" ) return # Fall back to encrypted file logger.info("Keyring storage failed, falling back to encrypted file") self._store_file_config(config_for_storage) # Update cache with the ORIGINAL config (with API keys) self._config_cache = config self._cache_loaded = True logger.info("Configuration stored successfully in encrypted file") logger.info( f"Cache updated with full config (LLM key: {'SET' if config.llm_api_key else 'NULL'}, Semgrep key: {'SET' if config.semgrep_api_key else 'NULL'})" ) def load_config(self) -> SecurityConfig: """Load security configuration. Returns: SecurityConfig loaded from storage Raises: CredentialNotFoundError: If no configuration is found """ logger.info("=== LOADING SECURITY CONFIGURATION ===") logger.debug("Loading security configuration") # Return cached config if available if self._cache_loaded and self._config_cache is not None: logger.info("Returning cached configuration") logger.debug( f"Cached config - LLM provider: {self._config_cache.llm_provider}, " f"API key configured: {bool(self._config_cache.llm_api_key)}, " f"Semgrep key configured: {bool(self._config_cache.semgrep_api_key)}" ) return self._config_cache # Try keyring first logger.info("Attempting to load configuration from keyring...") config = self._try_keyring_retrieval() if config is not None: logger.info("Configuration found in keyring, injecting API keys...") logger.debug( f"Raw keyring config - LLM provider: {config.llm_provider}, " f"API key status: {'SET' if config.llm_api_key else 'NULL'}, " f"Semgrep key status: {'SET' if config.semgrep_api_key else 'NULL'}" ) # Inject API keys from keyring if provider is configured if config.llm_provider and not config.llm_api_key: logger.debug( f"Attempting to inject {config.llm_provider} API key from keyring..." ) api_key = self.get_llm_api_key(config.llm_provider) if api_key: config.llm_api_key = api_key logger.info( f"Successfully injected {config.llm_provider} API key from keyring" ) else: logger.warning( f"Failed to retrieve {config.llm_provider} API key from keyring" ) elif config.llm_provider and config.llm_api_key: logger.info(f"{config.llm_provider} API key already present in config") elif not config.llm_provider: logger.debug("No LLM provider configured, skipping API key injection") # Also inject Semgrep API key if needed if config.enable_semgrep_scanning and not config.semgrep_api_key: logger.debug("Attempting to inject Semgrep API key from keyring...") semgrep_key = self.get_semgrep_api_key() if semgrep_key: config.semgrep_api_key = semgrep_key logger.info("Successfully injected Semgrep API key from keyring") else: logger.warning("Failed to retrieve Semgrep API key from keyring") elif config.enable_semgrep_scanning and config.semgrep_api_key: logger.info("Semgrep API key already present in config") # Log final configuration state logger.info( f"Final keyring config - LLM provider: {config.llm_provider}, " f"LLM API key status: {'SET' if config.llm_api_key else 'NULL'}, " f"Semgrep key status: {'SET' if config.semgrep_api_key else 'NULL'}" ) # Cache the loaded config self._config_cache = config self._cache_loaded = True logger.info("Configuration loaded from keyring and cached") return config # Try encrypted file logger.info( "Keyring failed, attempting to load configuration from encrypted file..." ) config = self._load_file_config() if config is not None: logger.info("Configuration found in file, injecting API keys...") logger.debug( f"Raw file config - LLM provider: {config.llm_provider}, " f"API key status: {'SET' if config.llm_api_key else 'NULL'}, " f"Semgrep key status: {'SET' if config.semgrep_api_key else 'NULL'}" ) # Inject API keys from keyring if provider is configured if config.llm_provider and not config.llm_api_key: logger.debug( f"Attempting to inject {config.llm_provider} API key from keyring..." ) api_key = self.get_llm_api_key(config.llm_provider) if api_key: config.llm_api_key = api_key logger.info( f"Successfully injected {config.llm_provider} API key from keyring" ) else: logger.warning( f"Failed to retrieve {config.llm_provider} API key from keyring" ) elif config.llm_provider and config.llm_api_key: logger.info(f"{config.llm_provider} API key already present in config") # Also inject Semgrep API key if needed if config.enable_semgrep_scanning and not config.semgrep_api_key: logger.debug("Attempting to inject Semgrep API key from keyring...") semgrep_key = self.get_semgrep_api_key() if semgrep_key: config.semgrep_api_key = semgrep_key logger.info("Successfully injected Semgrep API key from keyring") else: logger.warning("Failed to retrieve Semgrep API key from keyring") elif config.enable_semgrep_scanning and config.semgrep_api_key: logger.info("Semgrep API key already present in config") # Log final configuration state logger.info( f"Final file config - LLM provider: {config.llm_provider}, " f"LLM API key status: {'SET' if config.llm_api_key else 'NULL'}, " f"Semgrep key status: {'SET' if config.semgrep_api_key else 'NULL'}" ) # Cache the loaded config self._config_cache = config self._cache_loaded = True logger.info("Configuration loaded from encrypted file and cached") return config # Return default configuration if none found and cache it logger.info("No stored configuration found, using default configuration") default_config = SecurityConfig() self._config_cache = default_config self._cache_loaded = True return default_config def delete_config(self) -> None: """Delete stored configuration.""" logger.info("Deleting stored configuration") # Try to delete from keyring keyring_deleted = self._try_keyring_deletion() # Try to delete file file_deleted = self._delete_file_config() # Clear cache self._config_cache = None self._cache_loaded = False logger.debug("Configuration cache cleared") if keyring_deleted or file_deleted: logger.info("Configuration deletion completed") else: logger.warning("No configuration found to delete") def store_semgrep_api_key(self, api_key: str) -> None: """Store Semgrep API key securely. Args: api_key: Semgrep API key to store Raises: CredentialStorageError: If storage fails """ logger.info("Storing Semgrep API key") logger.debug(f"API key length: {len(api_key)} characters") try: keyring.set_password(self.keyring_service, "semgrep_api_key", api_key) logger.info("Successfully stored Semgrep API key in keyring") except KeyringError as e: logger.error(f"Failed to store Semgrep API key: {e}") raise CredentialStorageError(f"Failed to store Semgrep API key: {e}") def get_semgrep_api_key(self) -> str | None: """Get stored Semgrep API key. Returns: API key if found, None otherwise """ logger.debug("Retrieving Semgrep API key from keyring") try: keyring_key = "semgrep_api_key" logger.debug(f"Checking keyring for key: {keyring_key}") api_key = keyring.get_password(self.keyring_service, keyring_key) if api_key: logger.info( f"Semgrep API key found in keyring (length: {len(api_key)} chars)" ) else: logger.warning( f"No Semgrep API key found in keyring at key: {keyring_key}" ) return api_key except KeyringError as e: logger.error(f"KeyringError retrieving Semgrep API key: {e}") return None def delete_semgrep_api_key(self) -> bool: """Delete stored Semgrep API key. Returns: True if deletion succeeded, False otherwise """ logger.info("Deleting Semgrep API key") try: keyring.delete_password(self.keyring_service, "semgrep_api_key") logger.info("Successfully deleted Semgrep API key from keyring") return True except KeyringError as e: logger.warning(f"Failed to delete Semgrep API key: {e}") return False def store_llm_api_key(self, provider: str, api_key: str) -> None: """Store LLM API key securely. Args: provider: LLM provider (openai or anthropic) api_key: API key to store Raises: CredentialStorageError: If storage fails ValueError: If provider is invalid """ if provider not in ["openai", "anthropic"]: raise ValueError(f"Invalid LLM provider: {provider}") logger.info(f"Storing API key for LLM provider: {provider}") logger.debug(f"API key length: {len(api_key)} characters") try: keyring.set_password( self.keyring_service, f"llm_{provider}_api_key", api_key ) logger.info(f"Successfully stored {provider} API key in keyring") except KeyringError as e: logger.error(f"Failed to store {provider} API key: {e}") raise CredentialStorageError(f"Failed to store {provider} API key: {e}") def get_llm_api_key(self, provider: str) -> str | None: """Get stored LLM API key. Args: provider: LLM provider (openai or anthropic) Returns: API key if found, None otherwise """ logger.debug(f"Retrieving API key for LLM provider: {provider}") if provider not in ["openai", "anthropic"]: logger.warning(f"Invalid LLM provider requested: {provider}") return None try: keyring_key = f"llm_{provider}_api_key" logger.debug(f"Checking keyring for key: {keyring_key}") api_key = keyring.get_password(self.keyring_service, keyring_key) if api_key: logger.info( f"{provider} API key found in keyring (length: {len(api_key)} chars)" ) else: logger.warning( f"No {provider} API key found in keyring at key: {keyring_key}" ) return api_key except KeyringError as e: logger.error(f"KeyringError retrieving {provider} API key: {e}") return None def delete_llm_api_key(self, provider: str) -> bool: """Delete stored LLM API key. Args: provider: LLM provider (openai or anthropic) Returns: True if deletion succeeded, False otherwise """ logger.info(f"Deleting API key for LLM provider: {provider}") if provider not in ["openai", "anthropic"]: logger.warning(f"Invalid LLM provider for deletion: {provider}") return False try: keyring.delete_password(self.keyring_service, f"llm_{provider}_api_key") logger.info(f"Successfully deleted {provider} API key from keyring") return True except KeyringError as e: logger.warning(f"Failed to delete {provider} API key: {e}") return False def get_configured_llm_provider(self) -> str | None: """Get the currently configured LLM provider. Returns: Provider name if configured, None otherwise """ logger.debug("Getting configured LLM provider") config = self.load_config() provider = config.llm_provider if provider: logger.debug(f"Configured LLM provider: {provider}") else: logger.debug("No LLM provider configured") return provider def clear_llm_configuration(self) -> None: """Clear all LLM configuration including API keys.""" logger.info("Clearing all LLM configuration") # Delete API keys for both providers openai_deleted = self.delete_llm_api_key("openai") anthropic_deleted = self.delete_llm_api_key("anthropic") logger.debug( f"API key deletion results - OpenAI: {openai_deleted}, Anthropic: {anthropic_deleted}" ) # Clear configuration config = self.load_config() old_provider = config.llm_provider config.llm_provider = None config.llm_api_key = None config.llm_model = None self.store_config(config) logger.info(f"Cleared LLM configuration (was: {old_provider})") # Clear cache self._config_cache = None self._cache_loaded = False logger.debug("LLM configuration cache cleared") def has_config(self) -> bool: """Check if configuration exists and can be loaded. Returns: True if configuration exists and is valid, False otherwise """ logger.debug("Checking if configuration exists") # If we have a cached config, return True if self._cache_loaded and self._config_cache is not None: logger.debug("Configuration found in cache") return True # Check keyring if self._try_keyring_retrieval() is not None: logger.debug("Configuration found in keyring") return True # Check if file config can be loaded if self._load_file_config() is not None: logger.debug("Configuration found in encrypted file") return True logger.debug("No configuration found") return False def debug_keyring_state(self) -> dict[str, Any]: """Debug method to inspect complete keyring state. Returns: Dictionary with keyring state information """ logger.info("=== DEBUGGING KEYRING STATE ===") state = { "keyring_service": self.keyring_service, "main_config": None, "llm_openai_key": None, "llm_anthropic_key": None, "semgrep_key": None, "cache_loaded": self._cache_loaded, "cached_config": None, } # Check main config try: config_json = keyring.get_password(self.keyring_service, "config") if config_json: config_dict = json.loads(config_json) state["main_config"] = { "found": True, "llm_provider": config_dict.get("llm_provider"), "llm_model": config_dict.get("llm_model"), "llm_api_key_status": ( "SET" if config_dict.get("llm_api_key") else "NULL" ), "semgrep_api_key_status": ( "SET" if config_dict.get("semgrep_api_key") else "NULL" ), "enable_semgrep_scanning": config_dict.get( "enable_semgrep_scanning" ), "enable_llm_validation": config_dict.get("enable_llm_validation"), } else: state["main_config"] = {"found": False} except Exception as e: state["main_config"] = {"found": False, "error": str(e)} # Check individual API keys for provider in ["openai", "anthropic"]: try: key = keyring.get_password( self.keyring_service, f"llm_{provider}_api_key" ) state[f"llm_{provider}_key"] = { "found": bool(key), "length": len(key) if key else 0, } except Exception as e: state[f"llm_{provider}_key"] = {"found": False, "error": str(e)} # Check Semgrep key try: key = keyring.get_password(self.keyring_service, "semgrep_api_key") state["semgrep_key"] = { "found": bool(key), "length": len(key) if key else 0, } except Exception as e: state["semgrep_key"] = {"found": False, "error": str(e)} # Check cached config if self._config_cache: state["cached_config"] = { "found": True, "llm_provider": self._config_cache.llm_provider, "llm_model": self._config_cache.llm_model, "llm_api_key_status": ( "SET" if self._config_cache.llm_api_key else "NULL" ), "semgrep_api_key_status": ( "SET" if self._config_cache.semgrep_api_key else "NULL" ), "enable_semgrep_scanning": self._config_cache.enable_semgrep_scanning, "enable_llm_validation": self._config_cache.enable_llm_validation, } else: state["cached_config"] = {"found": False} # Log the complete state logger.info("Keyring state summary:") logger.info(f" Service: {state['keyring_service']}") logger.info(f" Main config found: {state['main_config'].get('found', False)}") logger.info( f" OpenAI key found: {state['llm_openai_key'].get('found', False)}" ) logger.info( f" Anthropic key found: {state['llm_anthropic_key'].get('found', False)}" ) logger.info(f" Semgrep key found: {state['semgrep_key'].get('found', False)}") logger.info(f" Cache loaded: {state['cache_loaded']}") logger.info( f" Cached config found: {state['cached_config'].get('found', False)}" ) return state

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/brettbergin/adversary-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server