Skip to main content
Glama
juanqui
by juanqui
web_server.py25.8 kB
"""Integration point for running both MCP and web servers concurrently.""" import asyncio import logging from typing import Optional from .background_queue import BackgroundProcessingQueue from .config import ServerConfig from .main import PDFKnowledgebaseServer from .web.middleware import setup_exception_handlers, setup_middleware from .web.server import PDFKnowledgebaseWebServer logger = logging.getLogger(__name__) class IntegratedPDFKnowledgebaseServer: """Integrated server that runs both MCP and web interfaces concurrently.""" def __init__(self, config: Optional[ServerConfig] = None): """Initialize the integrated server. Args: config: Server configuration. If None, loads from environment. """ self.config = config or ServerConfig.from_env() # Background queue for async processing - initialize early so it can be passed to MCP server self.background_queue: Optional[BackgroundProcessingQueue] = None # Initialize the core MCP server (background queue will be set during web server initialization) self.mcp_server: Optional[PDFKnowledgebaseServer] = None # Web server will be initialized after MCP server is ready self.web_server: Optional[PDFKnowledgebaseWebServer] = None self.web_app = None self._web_server_task: Optional[asyncio.Task] = None self._shutdown_event = asyncio.Event() async def initialize(self) -> None: """Initialize both MCP and web servers.""" try: logger.info("Initializing integrated PDF Knowledgebase server...") # Validate configuration before initialization self._validate_configuration() # Initialize background queue first if web is enabled if self.config.web_enabled: logger.info( f"Initializing background processing queue with {self.config.background_queue_workers} workers..." ) self.background_queue = BackgroundProcessingQueue( concurrency=self.config.background_queue_workers, # Use configured number of workers max_retries=3, thread_pool_size=self.config.thread_pool_size, # Use configured thread pool size ) logger.info( f"Background processing queue initialized with {self.config.background_queue_workers} workers " f"and {self.config.thread_pool_size} thread pool size" ) # Initialize MCP server with background queue (this sets up all core components) self.mcp_server = PDFKnowledgebaseServer(self.config, background_queue=self.background_queue) # First initialize core components (without FileMonitor) await self.mcp_server.initialize_core() # Initialize web server if enabled if self.config.web_enabled: await self._initialize_web_server() # Now initialize FileMonitor with web document service reference await self.mcp_server.initialize_file_monitor(web_document_service=self.web_server.document_service) # Update web server's file monitor reference after FileMonitor is initialized self.web_server._file_monitor = self.mcp_server.file_monitor else: logger.info("Web interface disabled in configuration") # Initialize FileMonitor without web document service await self.mcp_server.initialize_file_monitor() logger.info("Integrated PDF Knowledgebase server initialized successfully") except Exception as e: logger.error(f"Failed to initialize integrated server: {e}") # Cleanup any partially initialized components await self._cleanup_on_error() raise def _validate_configuration(self) -> None: """Validate configuration for integrated server operation.""" # Validate MCP transport configuration if self.config.transport not in ["stdio", "http", "sse"]: raise ValueError(f"Invalid MCP transport mode: {self.config.transport}") # For integrated mode, validate web port (MCP is mounted within same app) if self.config.web_enabled and self.config.transport in ["http", "sse"]: # Validate web port (unified server) if self.config.web_port <= 0 or self.config.web_port > 65535: raise ValueError(f"Invalid web port: {self.config.web_port}") if not self.config.web_host: raise ValueError("Web host cannot be empty") elif self.config.web_enabled: # Web-only mode (no HTTP MCP) if self.config.web_port <= 0 or self.config.web_port > 65535: raise ValueError(f"Invalid web server port: {self.config.web_port}") if not self.config.web_host: raise ValueError("Web server host cannot be empty") # Check if web port is already in use import socket try: with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock: sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) sock.bind((self.config.web_host, self.config.web_port)) logger.debug(f"Web server port {self.config.web_port} is available") except OSError as e: if e.errno == 48: # Address already in use raise ValueError(f"Web server port {self.config.web_port} is already in use") from e elif e.errno == 49: # Can't assign requested address raise ValueError(f"Cannot bind to host {self.config.web_host}") from e else: raise ValueError(f"Failed to bind to {self.config.web_host}:{self.config.web_port}: {e}") from e async def _cleanup_on_error(self) -> None: """Cleanup partially initialized components on error.""" try: if hasattr(self, "mcp_server") and self.mcp_server: await self.mcp_server.shutdown() except Exception as cleanup_error: logger.error(f"Error during cleanup: {cleanup_error}") async def _initialize_web_server(self) -> None: """Initialize the web server using shared MCP components.""" try: logger.info("Initializing web server...") # Check for web dependencies self._check_web_dependencies() # Initialize MCP HTTP app and lifespan handling based on transport mcp_http_app = None mcp_lifespan = None mount_path = None if self.config.transport in ["http", "sse"]: # Determine the mount path based on transport type mount_path = "/mcp" if self.config.transport == "http" else "/sse" logger.info(f"Preparing MCP server for {mount_path} with {self.config.transport.upper()} transport") try: # Create the MCP HTTP app with root path - FastAPI mount handles the prefix # This avoids double path prefix (e.g., /mcp/mcp) mcp_http_app = self.mcp_server.get_http_app(path="/") # Extract the lifespan from the MCP HTTP app if hasattr(mcp_http_app, "lifespan"): mcp_lifespan = mcp_http_app.lifespan logger.info("Extracted lifespan from FastMCP HTTP app for integration") else: logger.warning( "FastMCP HTTP app does not expose lifespan property - this may cause initialization issues" ) except Exception as e: logger.error(f"Failed to create FastMCP HTTP app: {e}") raise RuntimeError(f"FastMCP HTTP app creation failed: {e}") from e # Create web server with optional MCP lifespan logger.info("Creating web server with MCP lifespan integration...") self.web_server = PDFKnowledgebaseWebServer( config=self.config, document_processor=self.mcp_server.document_processor, vector_store=self.mcp_server.vector_store, embedding_service=self.mcp_server.embedding_service, document_cache=self.mcp_server._document_cache, save_cache_callback=self.mcp_server._save_document_cache, background_queue=self.background_queue, lifespan=mcp_lifespan, # Pass MCP lifespan to web server file_monitor=getattr( self.mcp_server, "file_monitor", None ), # Pass file monitor for rescan functionality ) # Get the FastAPI app (now created with MCP lifespan integration) self.web_app = self.web_server.get_app() # Mount the MCP HTTP app if we have one if mcp_http_app and mount_path: logger.info(f"Mounting MCP server at {mount_path} with {self.config.transport.upper()} transport") try: # Mount the pre-created MCP ASGI app into FastAPI self.web_app.mount(mount_path, mcp_http_app) endpoint_url = f"http://{self.config.web_host}:{self.config.web_port}{mount_path}/" logger.info(f"✅ MCP endpoints successfully mounted at: {endpoint_url}") except Exception as e: logger.error(f"Failed to mount MCP HTTP app: {e}") raise RuntimeError(f"MCP HTTP app mounting failed: {e}") from e # Setup middleware and exception handlers setup_middleware(self.web_app, self.config) setup_exception_handlers(self.web_app) logger.info(f"Unified server configured to run on {self.config.web_host}:{self.config.web_port}") except Exception as e: logger.error(f"Failed to initialize web server: {e}") raise def _check_web_dependencies(self) -> None: """Check that required web dependencies are available.""" missing_deps = [] try: import fastapi # noqa: F401 except ImportError: missing_deps.append("fastapi") try: import hypercorn # noqa: F401 except ImportError: missing_deps.append("hypercorn") if missing_deps: raise ImportError( f"Missing required web server dependencies: {', '.join(missing_deps)}. " "Install with: pip install 'pdfkb-mcp[web]'" ) async def run_mcp_only(self) -> None: """Run only the MCP server (without web interface).""" try: logger.info("Starting MCP-only server...") await self.initialize() await self.mcp_server.run() except KeyboardInterrupt: logger.info("Received interrupt signal, shutting down MCP server...") await self.shutdown() except Exception as e: logger.error(f"MCP server error: {e}") raise async def run_web_only(self) -> None: """Run only the web server (without MCP interface).""" if not self.config.web_enabled: raise ValueError("Web server is disabled in configuration") try: logger.info("Starting web-only server...") await self.initialize() if not self.web_app: raise RuntimeError("Web server not initialized") import hypercorn.asyncio from hypercorn.config import Config as HypercornConfig # Create hypercorn config (replaces uvicorn for better websockets 14+ support) hypercorn_config = HypercornConfig() hypercorn_config.bind = [f"{self.config.web_host}:{self.config.web_port}"] hypercorn_config.loglevel = self.config.log_level.lower() hypercorn_config.access_log_format = "%(h)s %(r)s %(s)s %(b)s %(D)s" hypercorn_config.accesslog = "-" # Log to stdout # Run the web server await hypercorn.asyncio.serve(self.web_app, hypercorn_config) except KeyboardInterrupt: logger.info("Received interrupt signal, shutting down web server...") await self.shutdown() except Exception as e: logger.error(f"Web server error: {e}") raise async def run_integrated(self) -> None: """Run integrated server with MCP and web on single port (if web enabled).""" try: logger.info("Starting integrated server (MCP + Web)...") await self.initialize() if self.config.transport == "stdio": # STDIO mode - run only MCP server logger.info("Running MCP server in stdio mode...") await self.mcp_server.run() elif self.config.transport in ["http", "sse"]: if self.config.web_enabled: # Integrated mode: dual server with MCP and web on adjacent ports logger.info(f"Running integrated server with {self.config.transport.upper()} MCP and web...") await self._run_unified_server() else: # HTTP/SSE MCP only logger.info(f"Running MCP server in {self.config.transport.upper()} mode only...") await self.mcp_server.run() except KeyboardInterrupt: logger.info("Received interrupt signal, shutting down integrated server...") self._shutdown_event.set() except Exception as e: logger.error(f"Integrated server error: {e}") raise finally: await self.shutdown() async def _run_unified_server(self) -> None: """Run unified server with FastMCP mounted in FastAPI via Hypercorn.""" try: # Determine endpoint path based on transport endpoint_path = "mcp" if self.config.transport == "http" else "sse" logger.info(f"🌐 Starting unified server ({self.config.transport.upper()} transport)...") logger.info(f"🌍 Web interface: http://{self.config.web_host}:{self.config.web_port}") logger.info(f"📡 MCP endpoints: http://{self.config.web_host}:{self.config.web_port}/{endpoint_path}/") logger.info(f"📚 API docs: http://{self.config.web_host}:{self.config.web_port}/docs") if not self.web_app: raise RuntimeError("Web server not initialized") import hypercorn.asyncio from hypercorn.config import Config as HypercornConfig # Create hypercorn config (unified server for both web and MCP) hypercorn_config = HypercornConfig() hypercorn_config.bind = [f"{self.config.web_host}:{self.config.web_port}"] hypercorn_config.loglevel = self.config.log_level.lower() hypercorn_config.access_log_format = "%(h)s %(r)s %(s)s %(b)s %(D)s" hypercorn_config.accesslog = "-" # Log to stdout # Run the unified server await hypercorn.asyncio.serve(self.web_app, hypercorn_config) except Exception as e: logger.error(f"❌ Unified server error: {e}") self._shutdown_event.set() raise finally: logger.info("🔴 Unified server completed") async def shutdown(self) -> None: """Shutdown both servers gracefully.""" try: logger.info("Shutting down integrated server...") # Set shutdown event self._shutdown_event.set() # Shutdown background queue first to stop processing new jobs if self.background_queue: logger.info("Shutting down background processing queue...") # Use shorter timeout for more responsive shutdown await self.background_queue.shutdown(wait=True, timeout=3.0) logger.info("Background processing queue shutdown complete") # No separate web server task in unified mode # Shutdown MCP server if self.mcp_server: await self.mcp_server.shutdown() logger.info("Integrated server shutdown complete") except Exception as e: logger.error(f"Error during shutdown: {e}") def get_mcp_server(self) -> PDFKnowledgebaseServer: """Get the MCP server instance. Returns: MCP server instance """ return self.mcp_server def get_web_server(self) -> Optional[PDFKnowledgebaseWebServer]: """Get the web server instance. Returns: Web server instance or None if not initialized """ return self.web_server def get_web_app(self): """Get the FastAPI application instance. Returns: FastAPI application instance or None if not initialized """ return self.web_app @property def is_web_enabled(self) -> bool: """Check if web server is enabled. Returns: True if web server is enabled """ return self.config.web_enabled @property def web_url(self) -> Optional[str]: """Get the web server URL. Returns: Web server URL or None if not enabled """ if self.config.web_enabled: return f"http://{self.config.web_host}:{self.config.web_port}" return None @property def docs_url(self) -> Optional[str]: """Get the API documentation URL. Returns: API docs URL or None if not enabled """ if self.config.web_enabled: return f"http://{self.config.web_host}:{self.config.web_port}/docs" return None async def run_integrated_server(config: Optional[ServerConfig] = None) -> None: """Run the integrated server with both MCP and web interfaces. Args: config: Server configuration. If None, loads from environment. """ server = IntegratedPDFKnowledgebaseServer(config) await server.run_integrated() async def run_web_only_server(config: Optional[ServerConfig] = None) -> None: """Run only the web server. Args: config: Server configuration. If None, loads from environment. """ server = IntegratedPDFKnowledgebaseServer(config) await server.run_web_only() def main_integrated(): """Entry point for integrated server (MCP + Web).""" import argparse import sys # Parse command line arguments parser = argparse.ArgumentParser( description="PDF Knowledgebase Integrated Server (MCP + Web)", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" Environment Variables: OPENAI_API_KEY OpenAI API key (required) KNOWLEDGEBASE_PATH Path to PDF directory (default: ./pdfs) CACHE_DIR Cache directory (default: <KNOWLEDGEBASE_PATH>/.cache) WEB_ENABLED Enable web interface (default: true) WEB_PORT Web server port (default: 8080) WEB_HOST Web server host (default: localhost) WEB_CORS_ORIGINS CORS origins (default: http://localhost:3000,http://127.0.0.1:3000) PDF_PARSER PDF parser to use (default: pymupdf4llm) PDF_CHUNKER Text chunker to use (default: langchain) LOG_LEVEL Logging level (default: INFO) Examples: pdfkb-web --enable-web # Run both MCP and web servers pdfkb-web --enable-web --port 9000 # Use custom web port with web enabled pdfkb-web --config myconfig.env # Use custom config file pdfkb-web # Run MCP server only (web disabled by default) """, ) parser.add_argument("--config", type=str, help="Path to environment configuration file") parser.add_argument("--port", type=int, help="Override web server port") parser.add_argument("--host", type=str, help="Override web server host") parser.add_argument("--enable-web", action="store_true", help="Enable web interface") parser.add_argument("--disable-web", action="store_true", help="Disable web interface (run MCP only)") parser.add_argument("--log-level", choices=["DEBUG", "INFO", "WARNING", "ERROR"], help="Override logging level") parser.add_argument("--version", action="version", version=f'pdfkb-web {__import__("pdfkb").__version__}') args = parser.parse_args() # Load configuration from custom file if specified if args.config: from dotenv import load_dotenv load_dotenv(args.config, override=True) logger.info(f"Loaded configuration from: {args.config}") # Load main configuration config = ServerConfig.from_env() # Apply command line overrides if args.port: config.web_port = args.port if args.host: config.web_host = args.host if args.enable_web: config.web_enabled = True if args.disable_web: config.web_enabled = False if args.log_level: config.log_level = args.log_level # Configure logging log_level = getattr(logging, config.log_level.upper(), logging.INFO) logging.basicConfig(level=log_level, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s") logger.info("Starting integrated PDF Knowledgebase server...") logger.info(f"Version: {__import__('pdfkb').__version__}") logger.info("MCP server: enabled") logger.info(f"Web server: {'enabled' if config.web_enabled else 'disabled'}") logger.info(f"Configuration: {config.knowledgebase_path}") logger.info(f"Cache directory: {config.cache_dir}") if config.web_enabled: logger.info(f"Web interface will be available at: http://{config.web_host}:{config.web_port}") logger.info(f"API documentation will be available at: http://{config.web_host}:{config.web_port}/docs") try: asyncio.run(run_integrated_server(config)) except KeyboardInterrupt: logger.info("Received interrupt signal, shutting down...") except Exception as e: logger.error(f"Server error: {e}") sys.exit(1) def main_web_only(): """Entry point for web-only server.""" import argparse import sys # Parse command line arguments parser = argparse.ArgumentParser( description="PDF Knowledgebase Web-Only Server", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" Environment Variables: OPENAI_API_KEY OpenAI API key (required) KNOWLEDGEBASE_PATH Path to PDF directory (default: ./pdfs) CACHE_DIR Cache directory (default: <KNOWLEDGEBASE_PATH>/.cache) WEB_PORT Web server port (default: 8080) WEB_HOST Web server host (default: localhost) WEB_CORS_ORIGINS CORS origins (default: http://localhost:3000,http://127.0.0.1:3000) PDF_PARSER PDF parser to use (default: pymupdf4llm) PDF_CHUNKER Text chunker to use (default: langchain) LOG_LEVEL Logging level (default: INFO) Examples: pdfkb-web-only # Run web server only pdfkb-web-only --port 9000 # Use custom web port pdfkb-web-only --config myconfig.env # Use custom config file """, ) parser.add_argument("--config", type=str, help="Path to environment configuration file") parser.add_argument("--port", type=int, help="Override web server port") parser.add_argument("--host", type=str, help="Override web server host") parser.add_argument("--log-level", choices=["DEBUG", "INFO", "WARNING", "ERROR"], help="Override logging level") parser.add_argument("--version", action="version", version=f'pdfkb-web-only {__import__("pdfkb").__version__}') args = parser.parse_args() # Load configuration from custom file if specified if args.config: from dotenv import load_dotenv load_dotenv(args.config, override=True) logger.info(f"Loaded configuration from: {args.config}") # Load main configuration config = ServerConfig.from_env() # Apply command line overrides if args.port: config.web_port = args.port if args.host: config.web_host = args.host if args.log_level: config.log_level = args.log_level # Force web to be enabled for web-only mode config.web_enabled = True # Configure logging log_level = getattr(logging, config.log_level.upper(), logging.INFO) logging.basicConfig(level=log_level, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s") logger.info("Starting web-only PDF Knowledgebase server...") logger.info(f"Version: {__import__('pdfkb').__version__}") logger.info(f"Configuration: {config.knowledgebase_path}") logger.info(f"Cache directory: {config.cache_dir}") logger.info(f"Web interface will be available at: http://{config.web_host}:{config.web_port}") logger.info(f"API documentation will be available at: http://{config.web_host}:{config.web_port}/docs") try: asyncio.run(run_web_only_server(config)) except KeyboardInterrupt: logger.info("Received interrupt signal, shutting down...") except Exception as e: logger.error(f"Server error: {e}") sys.exit(1) if __name__ == "__main__": import sys if len(sys.argv) > 1 and sys.argv[1] == "web-only": main_web_only() else: main_integrated()

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/juanqui/pdfkb-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server