"""Configuration management for MCP server."""
import os
from pathlib import Path
from typing import Any, Dict, List, Optional
import yaml
class Config:
"""Configuration manager for MCP server."""
def __init__(self, config_path: Optional[str] = None):
"""
Initialize configuration.
Args:
config_path: Path to user config file. If None, only default config is loaded.
"""
self.project_root = Path(__file__).parent.parent
self._config: Dict[str, Any] = {}
self._load_config(config_path)
def _load_config(self, user_config_path: Optional[str] = None) -> None:
"""Load configuration from default and user config files."""
# Load default config
default_config_path = self.project_root / "config" / "default.yaml"
with open(default_config_path, "r", encoding="utf-8") as f:
self._config = yaml.safe_load(f)
# Load user config if provided
if user_config_path:
user_path = Path(user_config_path)
if user_path.exists():
with open(user_path, "r", encoding="utf-8") as f:
user_config = yaml.safe_load(f)
self._merge_config(user_config)
# Override with environment variables
self._apply_env_overrides()
def _merge_config(self, user_config: Dict[str, Any]) -> None:
"""Merge user config into default config."""
for key, value in user_config.items():
if isinstance(value, dict) and key in self._config:
self._config[key].update(value)
else:
self._config[key] = value
def _apply_env_overrides(self) -> None:
"""Apply environment variable overrides."""
# Override log level
if env_log_level := os.getenv("MCP_LOG_LEVEL"):
self._config["logging"]["level"] = env_log_level
# Override workspace root
if env_workspace := os.getenv("MCP_WORKSPACE_ROOT"):
self._config["file_operations"]["workspace_root"] = env_workspace
# Override debug mode
if env_debug := os.getenv("MCP_DEBUG"):
self._config["server"]["debug"] = env_debug.lower() in ("true", "1", "yes")
def get(self, *keys: str, default: Any = None) -> Any:
"""
Get configuration value by nested keys.
Args:
*keys: Nested keys to navigate config
default: Default value if key not found
Returns:
Configuration value or default
"""
value = self._config
for key in keys:
if isinstance(value, dict):
value = value.get(key)
if value is None:
return default
else:
return default
return value
@property
def is_debug(self) -> bool:
"""Check if debug mode is enabled."""
return self.get("server", "debug", default=False)
@property
def server_name(self) -> str:
"""Get server name."""
return self.get("server", "name", default="Local MCP Server")
@property
def server_version(self) -> str:
"""Get server version."""
return self.get("server", "version", default="1.0.0")
@property
def workspace_root(self) -> Path:
"""Get workspace root as absolute path."""
root = self.get("file_operations", "workspace_root", default="./workspace")
path = Path(root)
if not path.is_absolute():
path = self.project_root / path
return path.resolve()
@property
def enable_sandbox(self) -> bool:
"""Check if sandbox is enabled."""
return self.get("file_operations", "enable_sandbox", default=True)
@property
def max_file_size(self) -> int:
"""Get maximum file size in bytes."""
size_mb = self.get("file_operations", "max_file_size", default=10)
return size_mb * 1024 * 1024
@property
def default_encoding(self) -> str:
"""Get default file encoding."""
return self.get("file_operations", "default_encoding", default="utf-8")
@property
def blocked_extensions(self) -> List[str]:
"""Get list of blocked file extensions."""
return self.get("file_operations", "blocked_extensions", default=[])
@property
def allowed_extensions(self) -> List[str]:
"""Get list of allowed file extensions."""
return self.get("file_operations", "allowed_extensions", default=[])
@property
def log_level(self) -> str:
"""Get log level."""
level = self.get("logging", "level", default="INFO")
# Override to DEBUG in debug mode
if self.is_debug:
return "DEBUG"
return level
@property
def log_file(self) -> Path:
"""Get log file path."""
log_file = self.get("logging", "file", default="./logs/mcp_server.log")
path = Path(log_file)
if not path.is_absolute():
path = self.project_root / path
# Ensure log directory exists
path.parent.mkdir(parents=True, exist_ok=True)
return path
@property
def log_format(self) -> str:
"""Get log format string."""
return self.get("logging", "format", default="%(asctime)s - %(name)s - %(levelname)s - %(message)s")
@property
def console_output(self) -> bool:
"""Check if console output is enabled."""
return self.get("logging", "console_output", default=True)
def validate_file_path(self, file_path: str) -> Path:
"""
Validate and resolve file path.
Args:
file_path: Relative file path
Returns:
Absolute Path object
Raises:
ValueError: If path is invalid or outside sandbox
"""
# Convert to Path
path = Path(file_path)
# Reject absolute paths
if path.is_absolute():
raise ValueError(f"Absolute paths are not allowed: {file_path}")
# Resolve relative to workspace root
resolved = (self.workspace_root / path).resolve()
# Check if sandbox is enabled
if self.enable_sandbox:
# Ensure path is within workspace
try:
resolved.relative_to(self.workspace_root)
except ValueError:
raise ValueError(
f"Path is outside workspace: {file_path} -> {resolved}"
)
# Check path traversal
if self.get("security", "prevent_path_traversal", default=True):
if ".." in path.parts:
raise ValueError(f"Path traversal detected: {file_path}")
# Check file extension
if resolved.suffix:
# Check blocked extensions
if resolved.suffix.lower() in [ext.lower() for ext in self.blocked_extensions]:
raise ValueError(f"File extension not allowed: {resolved.suffix}")
# Check allowed extensions (if not empty)
allowed = self.allowed_extensions
if allowed and resolved.suffix.lower() not in [ext.lower() for ext in allowed]:
raise ValueError(f"File extension not allowed: {resolved.suffix}")
return resolved
# Global config instance
_config: Optional[Config] = None
def get_config(config_path: Optional[str] = None) -> Config:
"""
Get global config instance.
Args:
config_path: Path to user config file (only used on first call)
Returns:
Config instance
"""
global _config
if _config is None:
# Check for config path in environment
env_config = os.getenv("MCP_CONFIG_PATH")
_config = Config(config_path or env_config)
return _config