"""
Comprehensive configuration management for MCP system
Handles environment variables, config files, and validation
"""
import os
import json
import yaml
import logging
from typing import Dict, Any, Optional, Union
from pathlib import Path
from dataclasses import dataclass, asdict
from enum import Enum
from shared.models import DatabaseConnection, EmbeddingConfig
from shared.exceptions import ConfigurationError
logger = logging.getLogger(__name__)
class Environment(Enum):
"""Application environments"""
DEVELOPMENT = "development"
TESTING = "testing"
STAGING = "staging"
PRODUCTION = "production"
@dataclass
class LLMConfig:
"""LLM configuration"""
provider: str = "openai" # openai, anthropic, ollama
model: str = "gpt-4"
api_key: Optional[str] = None
base_url: Optional[str] = None
max_tokens: int = 4000
temperature: float = 0.7
timeout: int = 30
@dataclass
class ServerConfig:
"""Server configuration"""
host: str = "localhost"
port: int = 8000
debug: bool = False
reload: bool = False
workers: int = 1
cors_origins: list = None
max_request_size: int = 10485760 # 10MB
@dataclass
class LoggingConfig:
"""Logging configuration"""
level: str = "INFO"
format: str = "%(asctime)s - %(name)s - %(levelname)s - %(message)s"
file_path: Optional[str] = None
max_file_size: int = 10485760 # 10MB
backup_count: int = 5
@dataclass
class SecurityConfig:
"""Security configuration"""
secret_key: Optional[str] = None
api_key_header: str = "X-API-Key"
rate_limit_per_minute: int = 60
enable_cors: bool = True
allowed_sql_patterns: list = None
forbidden_sql_patterns: list = None
@dataclass
class PerformanceConfig:
"""Performance configuration"""
connection_pool_size: int = 10
max_overflow: int = 20
pool_timeout: int = 30
query_timeout: int = 30
max_results_per_query: int = 1000
cache_ttl_seconds: int = 300
@dataclass
class MCPConfig:
"""Complete MCP system configuration"""
environment: Environment = Environment.DEVELOPMENT
database: Optional[DatabaseConnection] = None
llm: LLMConfig = None
embedding: Optional[EmbeddingConfig] = None
server: ServerConfig = None
logging: LoggingConfig = None
security: SecurityConfig = None
performance: PerformanceConfig = None
def __post_init__(self):
"""Initialize default configurations"""
if self.llm is None:
self.llm = LLMConfig()
if self.server is None:
self.server = ServerConfig()
if self.logging is None:
self.logging = LoggingConfig()
if self.security is None:
self.security = SecurityConfig()
if self.performance is None:
self.performance = PerformanceConfig()
class ConfigManager:
"""Comprehensive configuration manager"""
def __init__(self, config_file: Optional[str] = None, environment: Optional[str] = None):
self.config_file = config_file
self.environment = Environment(environment or os.getenv("MCP_ENVIRONMENT", "development"))
self.config: Optional[MCPConfig] = None
def load_config(self) -> MCPConfig:
"""Load configuration from multiple sources"""
try:
# Start with defaults
config_dict = self._get_default_config()
# Override with config file if provided
if self.config_file:
file_config = self._load_config_file(self.config_file)
config_dict = self._merge_configs(config_dict, file_config)
# Override with environment variables
env_config = self._load_from_environment()
config_dict = self._merge_configs(config_dict, env_config)
# Create configuration object
self.config = self._create_config_object(config_dict)
# Validate configuration
self._validate_config()
logger.info(f"Configuration loaded successfully for environment: {self.environment.value}")
return self.config
except Exception as e:
logger.error(f"Failed to load configuration: {e}")
raise ConfigurationError(f"Configuration loading failed: {e}")
def _get_default_config(self) -> Dict[str, Any]:
"""Get default configuration based on environment"""
base_config = {
"environment": self.environment.value,
"database": {
"host": "localhost",
"port": 5432,
"database": "mcp_db",
"username": "postgres",
"password": "password"
},
"llm": {
"provider": "openai",
"model": "gpt-4",
"max_tokens": 4000,
"temperature": 0.7
},
"embedding": {
"provider": "openai",
"model": "text-embedding-ada-002",
"dimension": 1536
},
"server": {
"host": "localhost",
"port": 8000,
"cors_origins": ["http://localhost:3000", "http://localhost:5173"]
},
"logging": {
"level": "INFO"
},
"security": {
"rate_limit_per_minute": 60,
"enable_cors": True
},
"performance": {
"connection_pool_size": 10,
"max_results_per_query": 1000
}
}
# Environment-specific overrides
if self.environment == Environment.DEVELOPMENT:
base_config["server"]["debug"] = True
base_config["server"]["reload"] = True
base_config["logging"]["level"] = "DEBUG"
elif self.environment == Environment.TESTING:
base_config["database"]["database"] = "mcp_test_db"
base_config["logging"]["level"] = "WARNING"
elif self.environment == Environment.PRODUCTION:
base_config["server"]["debug"] = False
base_config["server"]["reload"] = False
base_config["server"]["workers"] = 4
base_config["logging"]["level"] = "ERROR"
base_config["security"]["rate_limit_per_minute"] = 100
return base_config
def _load_config_file(self, file_path: str) -> Dict[str, Any]:
"""Load configuration from file (JSON or YAML)"""
try:
path = Path(file_path)
if not path.exists():
raise ConfigurationError(f"Config file not found: {file_path}")
with open(path, 'r') as f:
if path.suffix.lower() in ['.yml', '.yaml']:
return yaml.safe_load(f) or {}
elif path.suffix.lower() == '.json':
return json.load(f) or {}
else:
raise ConfigurationError(f"Unsupported config file format: {path.suffix}")
except Exception as e:
raise ConfigurationError(f"Failed to load config file {file_path}: {e}")
def _load_from_environment(self) -> Dict[str, Any]:
"""Load configuration from environment variables"""
env_config = {}
# Database configuration
if os.getenv("DATABASE_URL"):
env_config["database"] = {"connection_string": os.getenv("DATABASE_URL")}
else:
db_config = {}
for key in ["host", "port", "database", "username", "password", "schema"]:
env_key = f"DB_{key.upper()}"
if os.getenv(env_key):
if key == "port":
db_config[key] = int(os.getenv(env_key))
else:
db_config[key] = os.getenv(env_key)
if db_config:
env_config["database"] = db_config
# LLM configuration
llm_config = {}
llm_mappings = {
"OPENAI_API_KEY": "api_key",
"LLM_PROVIDER": "provider",
"LLM_MODEL": "model",
"LLM_BASE_URL": "base_url",
"LLM_MAX_TOKENS": "max_tokens",
"LLM_TEMPERATURE": "temperature"
}
for env_key, config_key in llm_mappings.items():
if os.getenv(env_key):
value = os.getenv(env_key)
if config_key in ["max_tokens"]:
value = int(value)
elif config_key in ["temperature"]:
value = float(value)
llm_config[config_key] = value
if llm_config:
env_config["llm"] = llm_config
# Embedding configuration
embedding_config = {}
embedding_mappings = {
"EMBEDDING_PROVIDER": "provider",
"EMBEDDING_MODEL": "model",
"EMBEDDING_API_KEY": "api_key",
"EMBEDDING_DIMENSION": "dimension"
}
for env_key, config_key in embedding_mappings.items():
if os.getenv(env_key):
value = os.getenv(env_key)
if config_key == "dimension":
value = int(value)
embedding_config[config_key] = value
if embedding_config:
env_config["embedding"] = embedding_config
# Server configuration
server_config = {}
server_mappings = {
"SERVER_HOST": "host",
"SERVER_PORT": "port",
"SERVER_DEBUG": "debug",
"SERVER_WORKERS": "workers"
}
for env_key, config_key in server_mappings.items():
if os.getenv(env_key):
value = os.getenv(env_key)
if config_key in ["port", "workers"]:
value = int(value)
elif config_key == "debug":
value = value.lower() in ["true", "1", "yes", "on"]
server_config[config_key] = value
if server_config:
env_config["server"] = server_config
# Security configuration
security_config = {}
if os.getenv("SECRET_KEY"):
security_config["secret_key"] = os.getenv("SECRET_KEY")
if os.getenv("API_KEY_HEADER"):
security_config["api_key_header"] = os.getenv("API_KEY_HEADER")
if security_config:
env_config["security"] = security_config
return env_config
def _merge_configs(self, base: Dict[str, Any], override: Dict[str, Any]) -> Dict[str, Any]:
"""Recursively merge configuration dictionaries"""
merged = base.copy()
for key, value in override.items():
if key in merged and isinstance(merged[key], dict) and isinstance(value, dict):
merged[key] = self._merge_configs(merged[key], value)
else:
merged[key] = value
return merged
def _create_config_object(self, config_dict: Dict[str, Any]) -> MCPConfig:
"""Create MCPConfig object from dictionary"""
# Convert database config
db_config = config_dict.get("database", {})
if db_config:
database = DatabaseConnection(**db_config)
else:
database = None
# Convert LLM config
llm_config = config_dict.get("llm", {})
llm = LLMConfig(**llm_config)
# Convert embedding config
embedding_config = config_dict.get("embedding", {})
if embedding_config:
embedding = EmbeddingConfig(**embedding_config)
else:
embedding = None
# Convert other configs
server = ServerConfig(**config_dict.get("server", {}))
logging_cfg = LoggingConfig(**config_dict.get("logging", {}))
security = SecurityConfig(**config_dict.get("security", {}))
performance = PerformanceConfig(**config_dict.get("performance", {}))
return MCPConfig(
environment=Environment(config_dict.get("environment", "development")),
database=database,
llm=llm,
embedding=embedding,
server=server,
logging=logging_cfg,
security=security,
performance=performance
)
def _validate_config(self):
"""Validate the loaded configuration"""
if not self.config:
raise ConfigurationError("No configuration loaded")
errors = []
# Validate database configuration
if self.config.database:
if not self.config.database.host:
errors.append("Database host is required")
if not self.config.database.database:
errors.append("Database name is required")
# Validate LLM configuration
if self.config.llm.provider == "openai" and not self.config.llm.api_key:
if not os.getenv("OPENAI_API_KEY"):
errors.append("OpenAI API key is required")
# Validate server configuration
if self.config.server.port < 1 or self.config.server.port > 65535:
errors.append("Server port must be between 1 and 65535")
if errors:
raise ConfigurationError(f"Configuration validation failed: {'; '.join(errors)}")
def save_config(self, file_path: str, format: str = "yaml"):
"""Save current configuration to file"""
if not self.config:
raise ConfigurationError("No configuration to save")
try:
config_dict = asdict(self.config)
path = Path(file_path)
path.parent.mkdir(parents=True, exist_ok=True)
with open(path, 'w') as f:
if format.lower() == "yaml":
yaml.dump(config_dict, f, default_flow_style=False)
elif format.lower() == "json":
json.dump(config_dict, f, indent=2)
else:
raise ConfigurationError(f"Unsupported format: {format}")
logger.info(f"Configuration saved to {file_path}")
except Exception as e:
raise ConfigurationError(f"Failed to save configuration: {e}")
def get_database_url(self) -> str:
"""Get database connection URL"""
if not self.config or not self.config.database:
raise ConfigurationError("Database configuration not available")
return self.config.database.get_connection_string()
def get_llm_config(self) -> Dict[str, Any]:
"""Get LLM configuration as dictionary"""
if not self.config:
raise ConfigurationError("Configuration not loaded")
return asdict(self.config.llm)
def is_development(self) -> bool:
"""Check if running in development environment"""
return self.environment == Environment.DEVELOPMENT
def is_production(self) -> bool:
"""Check if running in production environment"""
return self.environment == Environment.PRODUCTION
# Global configuration instance
_config_manager: Optional[ConfigManager] = None
def get_config_manager() -> ConfigManager:
"""Get global configuration manager instance"""
global _config_manager
if _config_manager is None:
_config_manager = ConfigManager()
return _config_manager
def load_config(config_file: Optional[str] = None, environment: Optional[str] = None) -> MCPConfig:
"""Load configuration using global manager"""
manager = ConfigManager(config_file, environment)
return manager.load_config()
def get_config() -> MCPConfig:
"""Get current configuration"""
manager = get_config_manager()
if manager.config is None:
manager.load_config()
return manager.config