models.pyโข12.6 kB
#!/usr/bin/env python3
"""
Configuration Models
Data models for configuration management.
"""
from typing import Any, Dict, List, Optional, Union
from dataclasses import dataclass, field
from enum import Enum
from pathlib import Path
class AdapterType(str, Enum):
"""Supported documentation adapter types."""
GITBOOK = "gitbook"
NOTION = "notion"
CONFLUENCE = "confluence"
WEBSITE = "website"
GENERIC = "generic"
class AuthMethod(str, Enum):
"""Supported authentication methods."""
API_KEY = "api_key"
OAUTH2 = "oauth2"
JWT = "jwt"
NONE = "none"
@dataclass
class DocSourceConfig:
"""Configuration for a documentation source."""
name: str
adapter_type: AdapterType
config: Dict[str, Any] = field(default_factory=dict)
enabled: bool = True
priority: int = 1
cache_ttl: int = 3600 # Cache TTL in seconds
def __post_init__(self):
"""Validate configuration after initialization."""
if not self.name:
raise ValueError("Documentation source name cannot be empty")
if not isinstance(self.adapter_type, AdapterType):
if isinstance(self.adapter_type, str):
try:
self.adapter_type = AdapterType(self.adapter_type)
except ValueError:
raise ValueError(f"Invalid adapter type: {self.adapter_type}")
else:
raise ValueError(f"Invalid adapter type: {self.adapter_type}")
if self.priority < 1:
raise ValueError("Priority must be >= 1")
if self.cache_ttl < 0:
raise ValueError("Cache TTL must be >= 0")
def get_config_value(self, key: str, default: Any = None) -> Any:
"""Get a configuration value with optional default."""
return self.config.get(key, default)
def set_config_value(self, key: str, value: Any) -> None:
"""Set a configuration value."""
self.config[key] = value
def validate_config(self) -> List[str]:
"""Validate adapter-specific configuration.
Returns:
List of validation error messages. Empty list if valid.
"""
errors = []
if self.adapter_type == AdapterType.GITBOOK:
errors.extend(self._validate_gitbook_config())
elif self.adapter_type == AdapterType.NOTION:
errors.extend(self._validate_notion_config())
elif self.adapter_type == AdapterType.CONFLUENCE:
errors.extend(self._validate_confluence_config())
return errors
def _validate_gitbook_config(self) -> List[str]:
"""Validate GitBook adapter configuration."""
errors = []
# Required fields
if not self.config.get("space_id"):
errors.append("GitBook space_id is required")
# Optional but recommended fields
if not self.config.get("api_token") and not self.config.get("base_url"):
errors.append("Either api_token or base_url should be provided for GitBook")
return errors
def _validate_notion_config(self) -> List[str]:
"""Validate Notion adapter configuration."""
errors = []
# Required fields
if not self.config.get("token"):
errors.append("Notion token is required")
if not self.config.get("database_id") and not self.config.get("page_id"):
errors.append("Either database_id or page_id is required for Notion")
return errors
def _validate_confluence_config(self) -> List[str]:
"""Validate Confluence adapter configuration."""
errors = []
# Required fields
if not self.config.get("base_url"):
errors.append("Confluence base_url is required")
if not self.config.get("space_key"):
errors.append("Confluence space_key is required")
# Authentication
has_username = bool(self.config.get("username"))
has_password = bool(self.config.get("password"))
has_token = bool(self.config.get("token"))
if not (has_token or (has_username and has_password)):
errors.append("Confluence requires either token or username+password")
return errors
@dataclass
class ServerConfig:
"""Server configuration."""
name: str = "AnyDocs-MCP"
version: str = "0.1.0"
description: str = "Transform documentation into MCP-compatible server"
host: str = "localhost"
port: int = 8000
debug: bool = False
log_level: str = "INFO"
max_workers: int = 4
timeout: int = 30
def __post_init__(self):
"""Validate configuration after initialization."""
if self.port < 1 or self.port > 65535:
raise ValueError("Port must be between 1 and 65535")
if self.max_workers < 1:
raise ValueError("Max workers must be >= 1")
if self.timeout < 1:
raise ValueError("Timeout must be >= 1")
valid_log_levels = ["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"]
if self.log_level.upper() not in valid_log_levels:
raise ValueError(f"Log level must be one of: {valid_log_levels}")
self.log_level = self.log_level.upper()
@dataclass
class DatabaseConfig:
"""Database configuration."""
type: str = "sqlite"
url: Optional[str] = None
echo: bool = False
pool_size: int = 5
max_overflow: int = 10
pool_timeout: int = 30
def __post_init__(self):
"""Validate configuration after initialization."""
if self.type not in ["sqlite", "postgresql", "mysql"]:
raise ValueError("Database type must be sqlite, postgresql, or mysql")
if self.pool_size < 1:
raise ValueError("Pool size must be >= 1")
if self.max_overflow < 0:
raise ValueError("Max overflow must be >= 0")
if self.pool_timeout < 1:
raise ValueError("Pool timeout must be >= 1")
@dataclass
class AuthConfig:
"""Authentication configuration."""
method: AuthMethod = AuthMethod.API_KEY
secret_key: Optional[str] = None
access_token_expire_minutes: int = 30
refresh_token_expire_days: int = 7
algorithm: str = "HS256"
# OAuth2 specific
oauth2_client_id: Optional[str] = None
oauth2_client_secret: Optional[str] = None
oauth2_redirect_uri: Optional[str] = None
oauth2_scopes: List[str] = field(default_factory=list)
def __post_init__(self):
"""Validate configuration after initialization."""
if not isinstance(self.method, AuthMethod):
if isinstance(self.method, str):
try:
self.method = AuthMethod(self.method)
except ValueError:
raise ValueError(f"Invalid auth method: {self.method}")
else:
raise ValueError(f"Invalid auth method: {self.method}")
if self.method in [AuthMethod.JWT, AuthMethod.OAUTH2] and not self.secret_key:
raise ValueError(f"Secret key is required for {self.method.value} authentication")
if self.access_token_expire_minutes < 1:
raise ValueError("Access token expire minutes must be >= 1")
if self.refresh_token_expire_days < 1:
raise ValueError("Refresh token expire days must be >= 1")
if self.method == AuthMethod.OAUTH2:
if not self.oauth2_client_id:
raise ValueError("OAuth2 client ID is required")
if not self.oauth2_client_secret:
raise ValueError("OAuth2 client secret is required")
if not self.oauth2_redirect_uri:
raise ValueError("OAuth2 redirect URI is required")
@dataclass
class CacheConfig:
"""Cache configuration."""
enabled: bool = True
ttl: int = 3600 # Default TTL in seconds
max_size: int = 1000 # Maximum number of cached items
backend: str = "memory" # memory, redis, etc.
# Redis specific (if backend is redis)
redis_url: Optional[str] = None
redis_db: int = 0
redis_password: Optional[str] = None
def __post_init__(self):
"""Validate configuration after initialization."""
if self.ttl < 0:
raise ValueError("TTL must be >= 0")
if self.max_size < 1:
raise ValueError("Max size must be >= 1")
if self.backend not in ["memory", "redis"]:
raise ValueError("Cache backend must be 'memory' or 'redis'")
if self.backend == "redis" and not self.redis_url:
raise ValueError("Redis URL is required when using redis backend")
@dataclass
class RateLimitConfig:
"""Rate limiting configuration."""
enabled: bool = True
requests: int = 100 # Requests per window
window: int = 60 # Window size in seconds
storage: str = "memory" # memory, redis
# Redis specific (if storage is redis)
redis_url: Optional[str] = None
redis_db: int = 1
def __post_init__(self):
"""Validate configuration after initialization."""
if self.requests < 1:
raise ValueError("Requests must be >= 1")
if self.window < 1:
raise ValueError("Window must be >= 1")
if self.storage not in ["memory", "redis"]:
raise ValueError("Rate limit storage must be 'memory' or 'redis'")
if self.storage == "redis" and not self.redis_url:
raise ValueError("Redis URL is required when using redis storage")
@dataclass
class CorsConfig:
"""CORS configuration."""
enabled: bool = True
origins: List[str] = field(default_factory=lambda: ["*"])
methods: List[str] = field(default_factory=lambda: ["GET", "POST", "PUT", "DELETE"])
headers: List[str] = field(default_factory=lambda: ["*"])
credentials: bool = False
def __post_init__(self):
"""Validate configuration after initialization."""
valid_methods = ["GET", "POST", "PUT", "DELETE", "PATCH", "OPTIONS", "HEAD"]
for method in self.methods:
if method.upper() not in valid_methods:
raise ValueError(f"Invalid HTTP method: {method}")
# Convert methods to uppercase
self.methods = [method.upper() for method in self.methods]
@dataclass
class FileUploadConfig:
"""File upload configuration."""
enabled: bool = True
max_size: int = 10 * 1024 * 1024 # 10MB in bytes
allowed_extensions: List[str] = field(default_factory=lambda: [".md", ".txt", ".json", ".yaml", ".yml"])
upload_dir: str = "uploads"
def __post_init__(self):
"""Validate configuration after initialization."""
if self.max_size < 1:
raise ValueError("Max size must be >= 1")
# Ensure extensions start with dot
normalized_extensions = []
for ext in self.allowed_extensions:
if not ext.startswith("."):
ext = "." + ext
normalized_extensions.append(ext.lower())
self.allowed_extensions = normalized_extensions
# Ensure upload directory is a valid path
try:
Path(self.upload_dir)
except Exception as e:
raise ValueError(f"Invalid upload directory: {e}")
@dataclass
class LoggingConfig:
"""Logging configuration."""
level: str = "INFO"
format: str = "%(asctime)s - %(name)s - %(levelname)s - %(message)s"
file_enabled: bool = False
file_path: Optional[str] = None
file_max_size: int = 10 * 1024 * 1024 # 10MB
file_backup_count: int = 5
console_enabled: bool = True
def __post_init__(self):
"""Validate configuration after initialization."""
valid_levels = ["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"]
if self.level.upper() not in valid_levels:
raise ValueError(f"Log level must be one of: {valid_levels}")
self.level = self.level.upper()
if self.file_enabled and not self.file_path:
raise ValueError("File path is required when file logging is enabled")
if self.file_max_size < 1024: # Minimum 1KB
raise ValueError("File max size must be >= 1024 bytes")
if self.file_backup_count < 0:
raise ValueError("File backup count must be >= 0")