Skip to main content
Glama
main.py26.1 kB
"""FastMCP server entry point for AutoDocs MCP Server.""" import asyncio import signal # Configure structured logging to use stderr (required for MCP stdio protocol) import sys import time from contextlib import asynccontextmanager from pathlib import Path from typing import Any import structlog from fastmcp import FastMCP from .config import get_config from .core.cache_manager import FileCacheManager from .core.context_fetcher import ConcurrentContextFetcher, create_context_fetcher from .core.dependency_parser import PyProjectParser from .core.doc_fetcher import PyPIDocumentationFetcher from .core.error_formatter import ErrorFormatter, ResponseFormatter from .core.version_resolver import VersionResolver from .exceptions import AutoDocsError, ProjectParsingError from .security import InputValidator structlog.configure( processors=[ structlog.processors.TimeStamper(fmt="ISO"), structlog.processors.add_log_level, structlog.dev.ConsoleRenderer(), ], logger_factory=structlog.WriteLoggerFactory(file=sys.stderr), wrapper_class=structlog.BoundLogger, cache_logger_on_first_use=True, ) logger = structlog.get_logger(__name__) # Initialize FastMCP server mcp = FastMCP("AutoDocs MCP Server 🚀") # Global services (initialized in main) parser: PyProjectParser | None = None cache_manager: FileCacheManager | None = None version_resolver: VersionResolver | None = None context_fetcher: ConcurrentContextFetcher | None = None class GracefulShutdown: """Handle graceful shutdown of the MCP server.""" def __init__(self) -> None: self.shutdown_event = asyncio.Event() self.active_requests = 0 self.max_shutdown_wait = 30.0 # seconds def register_signals(self) -> None: """Register signal handlers for graceful shutdown.""" signal.signal(signal.SIGTERM, self._signal_handler) signal.signal(signal.SIGINT, self._signal_handler) def _signal_handler(self, signum: int, frame: Any) -> None: """Handle shutdown signals.""" logger.info(f"Received signal {signum}, initiating graceful shutdown") self.shutdown_event.set() @asynccontextmanager async def request_context(self) -> Any: """Context manager to track active requests.""" self.active_requests += 1 try: yield finally: self.active_requests -= 1 async def wait_for_shutdown(self) -> None: """Wait for shutdown signal and cleanup.""" await self.shutdown_event.wait() logger.info("Shutting down gracefully...") # Wait for active requests to complete start_time = asyncio.get_event_loop().time() while self.active_requests > 0: elapsed = asyncio.get_event_loop().time() - start_time if elapsed > self.max_shutdown_wait: logger.warning( f"Forcing shutdown after {elapsed}s with {self.active_requests} active requests" ) break logger.info( f"Waiting for {self.active_requests} active requests to complete" ) await asyncio.sleep(0.5) # Cleanup resources await self._cleanup_resources() async def _cleanup_resources(self) -> None: """Cleanup application resources.""" from .core.network_resilience import ConnectionPoolManager # Close HTTP client connections pool_manager = ConnectionPoolManager() await pool_manager.close_all() # Flush any pending cache writes if cache_manager: # Add any pending cache operations cleanup if needed pass logger.info("Graceful shutdown completed") @mcp.tool async def scan_dependencies(project_path: str | None = None) -> dict[str, Any]: """ Scan project dependencies from pyproject.toml Args: project_path: Path to project directory (defaults to current directory) Returns: JSON with dependency specifications and project metadata """ from .observability import get_metrics_collector, track_request async with track_request("scan_dependencies") as metrics: if parser is None: get_metrics_collector().finish_request( metrics.request_id, success=False, error_type="ServiceNotInitialized", ) return { "success": False, "error": { "message": "Parser not initialized", "suggestion": "Try again or restart the MCP server", "severity": "critical", "code": "service_not_initialized", "recoverable": False, }, } try: # Validate project path if provided if project_path is not None: path = InputValidator.validate_project_path(project_path) else: path = Path.cwd() logger.info("Scanning dependencies", project_path=str(path)) result = await parser.parse_project(path) # Record successful metrics get_metrics_collector().finish_request( metrics.request_id, success=True, dependency_count=result.successful_deps, ) # Use ResponseFormatter for consistent error formatting return ResponseFormatter.format_scan_response(result) except ProjectParsingError as e: formatted_error = ErrorFormatter.format_exception(e) get_metrics_collector().finish_request( metrics.request_id, success=False, error_type="ProjectParsingError", ) return { "success": False, "error": { "message": formatted_error.message, "suggestion": formatted_error.suggestion, "severity": formatted_error.severity.value, "code": formatted_error.error_code, "recoverable": formatted_error.recoverable, }, } except Exception as e: formatted_error = ErrorFormatter.format_exception(e) get_metrics_collector().finish_request( metrics.request_id, success=False, error_type=type(e).__name__, ) return { "success": False, "error": { "message": formatted_error.message, "suggestion": formatted_error.suggestion, "severity": formatted_error.severity.value, "code": formatted_error.error_code, "recoverable": formatted_error.recoverable, }, } @mcp.tool async def get_package_docs( package_name: str, version_constraint: str | None = None, query: str | None = None ) -> dict[str, Any]: """ Retrieve formatted documentation for a package with version-based caching. This is the legacy single-package documentation tool. For rich context with dependencies, use get_package_docs_with_context instead. Args: package_name: Name of the package to fetch documentation for version_constraint: Version constraint from dependency scanning query: Optional query to filter documentation sections Returns: Formatted documentation with package metadata """ from .observability import get_metrics_collector, track_request async with track_request("get_package_docs") as metrics: if cache_manager is None or version_resolver is None: get_metrics_collector().finish_request( metrics.request_id, success=False, error_type="ServiceNotInitialized", package_name=package_name, ) return { "success": False, "error": { "message": "Services not initialized", "suggestion": "Try again or restart the MCP server", "severity": "critical", "code": "service_not_initialized", "recoverable": False, }, } try: # Validate inputs validated_package_name = InputValidator.validate_package_name(package_name) if version_constraint is not None: validated_constraint = InputValidator.validate_version_constraint( version_constraint ) else: validated_constraint = None logger.info( "Fetching package docs", package=validated_package_name, constraint=validated_constraint, query=query, ) # Step 1: Resolve to specific version resolved_version = await version_resolver.resolve_version( validated_package_name, validated_constraint ) # Step 2: Check version-specific cache cache_key = version_resolver.generate_cache_key( validated_package_name, resolved_version ) cached_entry = await cache_manager.get(cache_key) if cached_entry: logger.info( "Version-specific cache hit", package=validated_package_name, version=resolved_version, constraint=version_constraint, ) package_info = cached_entry.data from_cache = True else: logger.info( "Fetching fresh package info", package=validated_package_name, version=resolved_version, ) async with PyPIDocumentationFetcher() as fetcher: package_info = await fetcher.fetch_package_info( validated_package_name ) await cache_manager.set(cache_key, package_info) from_cache = False # Step 3: Format documentation async with PyPIDocumentationFetcher() as fetcher: formatted_docs = fetcher.format_documentation(package_info, query) # Record successful metrics get_metrics_collector().finish_request( metrics.request_id, success=True, cache_hit=from_cache, package_name=validated_package_name, ) return { "success": True, "package_name": package_info.name, "version": package_info.version, "resolved_version": resolved_version, "version_constraint": version_constraint, "documentation": formatted_docs, "from_cache": from_cache, "cache_key": cache_key, "query_applied": query is not None, } except AutoDocsError as e: formatted_error = ErrorFormatter.format_exception( e, {"package": validated_package_name} ) logger.error( "Documentation fetch failed", package=validated_package_name, error=str(e), error_type=type(e).__name__, ) get_metrics_collector().finish_request( metrics.request_id, success=False, error_type="AutoDocsError", package_name=package_name, ) return { "success": False, "error": { "message": formatted_error.message, "suggestion": formatted_error.suggestion, "severity": formatted_error.severity.value, "code": formatted_error.error_code, "recoverable": formatted_error.recoverable, }, } except Exception as e: formatted_error = ErrorFormatter.format_exception( e, {"package": validated_package_name} ) logger.error("Unexpected error during documentation fetch", error=str(e)) get_metrics_collector().finish_request( metrics.request_id, success=False, error_type=type(e).__name__, package_name=package_name, ) return { "success": False, "error": { "message": formatted_error.message, "suggestion": formatted_error.suggestion, "severity": formatted_error.severity.value, "code": formatted_error.error_code, "recoverable": formatted_error.recoverable, }, } @mcp.tool async def get_package_docs_with_context( package_name: str, version_constraint: str | None = None, include_dependencies: bool = True, context_scope: str = "smart", max_dependencies: int | None = None, max_tokens: int | None = None, ) -> dict[str, Any]: """ Retrieve comprehensive documentation context including dependencies. This is the main Phase 4 feature providing rich AI context with both the requested package and its most relevant dependencies. Args: package_name: Primary package name to document version_constraint: Version constraint for primary package include_dependencies: Whether to include dependency context (default: True) context_scope: Context scope - "primary_only", "runtime", or "smart" (default: "smart") max_dependencies: Maximum dependencies to include (default: from config) max_tokens: Maximum token budget for context (default: from config) Returns: Rich documentation context with primary package and dependencies """ from .observability import get_metrics_collector, track_request async with track_request("get_package_docs_with_context") as metrics: if context_fetcher is None: get_metrics_collector().finish_request( metrics.request_id, success=False, error_type="InitializationError", package_name=package_name, ) return { "success": False, "error": { "type": "InitializationError", "message": "Context fetcher not initialized", }, } try: # Validate inputs validated_package_name = InputValidator.validate_package_name(package_name) if version_constraint is not None: validated_constraint = InputValidator.validate_version_constraint( version_constraint ) else: validated_constraint = None logger.info( "Fetching package context", package=validated_package_name, constraint=validated_constraint, include_deps=include_dependencies, scope=context_scope, ) # Fetch comprehensive context context, performance_metrics = await context_fetcher.fetch_package_context( package_name=validated_package_name, version_constraint=validated_constraint, include_dependencies=include_dependencies, context_scope=context_scope, max_dependencies=max_dependencies, max_tokens=max_tokens, ) # Convert to serializable format context_data = { "primary_package": context.primary_package.model_dump(), "runtime_dependencies": [ dep.model_dump() for dep in context.runtime_dependencies ], "dev_dependencies": [ dep.model_dump() for dep in context.dev_dependencies ], "context_scope": context.context_scope, "total_packages": context.total_packages, "truncated_packages": context.truncated_packages, "token_estimate": context.token_estimate, } # Record successful metrics cache_hit_rate = performance_metrics.get("cache_hits", 0) / max( 1, performance_metrics.get("cache_hits", 0) + performance_metrics.get("cache_misses", 1), ) get_metrics_collector().finish_request( metrics.request_id, success=True, cache_hit=(cache_hit_rate > 0.5), package_name=validated_package_name, dependency_count=context.total_packages - 1, # Exclude primary package ) return { "success": True, "context": context_data, "performance": performance_metrics, } except AutoDocsError as e: logger.error( "Context fetch failed", package=validated_package_name, error=str(e), error_type=type(e).__name__, ) get_metrics_collector().finish_request( metrics.request_id, success=False, error_type="AutoDocsError", package_name=package_name, ) return { "success": False, "error": {"type": type(e).__name__, "message": str(e)}, } except Exception as e: formatted_error = ErrorFormatter.format_exception( e, {"package": validated_package_name} ) logger.error("Unexpected error during context fetch", error=str(e)) get_metrics_collector().finish_request( metrics.request_id, success=False, error_type=type(e).__name__, package_name=package_name, ) return { "success": False, "error": { "message": formatted_error.message, "suggestion": formatted_error.suggestion, "severity": formatted_error.severity.value, "code": formatted_error.error_code, "recoverable": formatted_error.recoverable, }, } @mcp.tool async def refresh_cache() -> dict[str, Any]: """ Refresh the local documentation cache. Returns: Statistics about cache refresh operation """ if cache_manager is None: return { "success": False, "error": { "message": "Cache manager not initialized", "suggestion": "Try again or restart the MCP server", "severity": "critical", "code": "service_not_initialized", "recoverable": False, }, } try: logger.info("Starting cache refresh") # Get current cache stats initial_stats = await cache_manager.get_cache_stats() # Clear the entire cache await cache_manager.invalidate() # Get final stats final_stats = await cache_manager.get_cache_stats() logger.info( "Cache refresh completed", cleared_entries=initial_stats.get("total_entries", 0), ) return { "success": True, "cleared_entries": initial_stats.get("total_entries", 0), "freed_bytes": initial_stats.get("total_size_bytes", 0), "final_entries": final_stats.get("total_entries", 0), } except AutoDocsError as e: formatted_error = ErrorFormatter.format_exception(e) logger.error("Cache refresh failed", error=str(e)) return { "success": False, "error": { "message": formatted_error.message, "suggestion": formatted_error.suggestion, "severity": formatted_error.severity.value, "code": formatted_error.error_code, "recoverable": formatted_error.recoverable, }, } @mcp.tool async def get_cache_stats() -> dict[str, Any]: """ Get statistics about the documentation cache. Returns: Cache statistics and information """ if cache_manager is None: return { "success": False, "error": { "message": "Cache manager not initialized", "suggestion": "Try again or restart the MCP server", "severity": "critical", "code": "service_not_initialized", "recoverable": False, }, } try: stats = await cache_manager.get_cache_stats() cached_packages = await cache_manager.list_cached_packages() return { "success": True, "cache_stats": stats, "cached_packages": cached_packages, "total_packages": len(cached_packages), } except Exception as e: formatted_error = ErrorFormatter.format_exception( e, {"operation": "get_cache_stats"} ) logger.error("Failed to get cache stats", error=str(e)) return { "success": False, "error": { "message": formatted_error.message, "suggestion": formatted_error.suggestion, "severity": formatted_error.severity.value, "code": formatted_error.error_code, "recoverable": formatted_error.recoverable, }, } @mcp.tool async def health_check() -> dict[str, Any]: """ Get system health status for monitoring and load balancer checks. Returns: Comprehensive health status of all system components """ from .health import HealthChecker health_checker = HealthChecker() return await health_checker.get_overall_health() @mcp.tool async def ready_check() -> dict[str, Any]: """ Kubernetes-style readiness check for deployment orchestration. Returns: Simple ready/not-ready status """ from .health import HealthChecker health_checker = HealthChecker() return await health_checker.get_readiness_status() @mcp.tool async def get_metrics() -> dict[str, Any]: """ Get system performance metrics for monitoring. Returns: Performance statistics and metrics """ from .observability import get_metrics_collector metrics_collector = get_metrics_collector() return { "success": True, "performance": metrics_collector.get_stats(), "health_metrics": metrics_collector.get_health_metrics(), "timestamp": time.time(), } async def initialize_services() -> None: """Initialize global services with new components.""" global parser, cache_manager, version_resolver, context_fetcher config = get_config() # Validate production readiness production_issues = config.validate_production_readiness() if production_issues: if config.environment == "production": # Fail startup in production if there are issues raise ValueError( f"Production configuration issues: {'; '.join(production_issues)}" ) else: # Just warn in other environments logger.warning( "Configuration issues detected (non-production environment)", issues=production_issues, environment=config.environment, ) logger.info( "Initializing services", cache_dir=str(config.cache_dir), environment=config.environment, debug_mode=config.debug_mode, max_concurrent=config.max_concurrent, ) parser = PyProjectParser() cache_manager = FileCacheManager(config.cache_dir) version_resolver = VersionResolver() await cache_manager.initialize() # Initialize Phase 4 context fetcher context_fetcher = await create_context_fetcher(cache_manager) logger.info("Services initialized successfully", phase_4_enabled=True) async def async_main() -> None: """Async main function with graceful shutdown.""" shutdown_handler = GracefulShutdown() shutdown_handler.register_signals() try: await initialize_services() logger.info("Starting AutoDocs MCP Server") # Start server and wait for shutdown server_task = asyncio.create_task(run_mcp_server()) shutdown_task = asyncio.create_task(shutdown_handler.wait_for_shutdown()) await asyncio.wait( [server_task, shutdown_task], return_when=asyncio.FIRST_COMPLETED ) except Exception as e: logger.error("Server startup failed", error=str(e)) raise finally: await shutdown_handler._cleanup_resources() async def run_mcp_server() -> None: """Run the MCP server in async mode.""" # Since FastMCP doesn't have native async support, we need to run it in a thread import threading server_thread = threading.Thread(target=mcp.run, daemon=True) server_thread.start() # Keep the async task alive while server is running while server_thread.is_alive(): await asyncio.sleep(1.0) def main() -> None: """Entry point for the MCP server.""" try: asyncio.run(async_main()) except KeyboardInterrupt: logger.info("Server shutdown requested") except Exception as e: logger.error("Server failed", error=str(e)) raise if __name__ == "__main__": main()

Implementation Reference

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/bradleyfay/autodoc-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server