Skip to main content
Glama
unified.py10.1 kB
""" Unified server that runs both mitmproxy and FastMCP HTTP in same process. This module provides the unified architecture where: - mitmproxy runs as proxy on port 1337 - FastMCP HTTP runs as MCP server on port 9973 - Both share the same AugmentManager instance (in-memory) - Checkpoint-based context compression via shared state """ import asyncio import logging import sys from pathlib import Path from typing import Optional from mitmproxy import options from mitmproxy.tools import dump from nisaba.server.config import NisabaConfig from nisaba.server.factory import NisabaMCPFactory from nisaba.wrapper.proxy import AugmentInjector logger = logging.getLogger(__name__) class SuppressCancelledErrorFilter(logging.Filter): """Filter to suppress CancelledError tracebacks from uvicorn/starlette during shutdown.""" def filter(self, record): # Suppress ERROR logs that contain CancelledError traceback if record.levelno == logging.ERROR: if 'asyncio.exceptions.CancelledError' in record.getMessage(): return False # Also check exc_info if present if record.exc_info and record.exc_info[0] is asyncio.CancelledError: return False return True # Install the filter on uvicorn and starlette loggers logging.getLogger('uvicorn.error').addFilter(SuppressCancelledErrorFilter()) logging.getLogger('starlette').addFilter(SuppressCancelledErrorFilter()) class UnifiedNisabaServer: """ Unified server running both proxy and MCP in single process. Architecture: - Single asyncio event loop - mitmproxy Master with shared event loop - FastMCP HTTP server on asyncio - Shared AugmentManager for zero-latency state sharing """ def __init__( self, augments_dir: Path, composed_file: Path, proxy_port: int = 1337, mcp_port: int = 9973, debug_proxy: bool = False ): """ Initialize unified server. Args: augments_dir: Directory containing augment files composed_file: Path to composed augments file proxy_port: Port for mitmproxy (default: 1337) mcp_port: Port for MCP HTTP server (default: 9973) debug_proxy: Show proxy debug output """ self.augments_dir = Path(augments_dir) self.composed_file = Path(composed_file) self.proxy_port = proxy_port self.mcp_port = mcp_port self.debug_proxy = debug_proxy # Shared state (single source of truth) self.augment_manager: Optional[AugmentManager] = None # Component references self.proxy_master: Optional[dump.DumpMaster] = None self.mcp_factory: Optional[NisabaMCPFactory] = None self.mcp_server = None # Task references for cleanup self.proxy_task: Optional[asyncio.Task] = None self.mcp_task: Optional[asyncio.Task] = None async def start(self) -> None: """ Start both proxy and MCP server. Initializes shared state and starts both components concurrently. """ logger.info("=" * 60) logger.info("🚀 Starting Unified Nisaba Server") logger.info("=" * 60) # Start both components await self._start_proxy() await self._start_mcp_server() logger.info("=" * 60) logger.info(f"✅ Unified server ready:") logger.info(f" • Proxy: http://localhost:{self.proxy_port}") logger.info(f" • MCP HTTP: http://localhost:{self.mcp_port}") logger.info("=" * 60) async def _start_proxy(self) -> None: """Start mitmproxy with shared AugmentManager.""" logger.info(f"🔌 Starting proxy on port {self.proxy_port}...") # Create mitmproxy options proxy_opts = options.Options( listen_port=self.proxy_port, mode=["regular"], # Must be list, not string ) # Create proxy addon with shared AugmentManager proxy_addon = AugmentInjector(augment_manager=self.augment_manager) # Create DumpMaster with our event loop self.proxy_master = dump.DumpMaster( options=proxy_opts, loop=asyncio.get_event_loop(), with_termlog=self.debug_proxy, with_dumper=False # No dumping, just proxying ) # Add our addon self.proxy_master.addons.add(proxy_addon) # Start proxy as background task self.proxy_task = asyncio.create_task(self._run_proxy()) # Give it a moment to start await asyncio.sleep(0.5) logger.info(f"✓ Proxy running on port {self.proxy_port}") async def _run_proxy(self) -> None: """Run proxy (async task).""" try: await self.proxy_master.run() except asyncio.CancelledError: # Expected during shutdown - don't re-raise to avoid error logs logger.debug("Proxy task cancelled (clean shutdown)") return except Exception as e: logger.error(f"Proxy error: {e}", exc_info=True) raise async def _start_mcp_server(self) -> None: """Start FastMCP HTTP server with shared AugmentManager.""" logger.info(f"🤖 Starting MCP server on port {self.mcp_port}...") # Create config with HTTP transport DISABLED # (We manually manage HTTP server in unified mode) config = NisabaConfig( dev_mode=False, enable_http_transport=False, # Don't auto-start, we manage it http_host="localhost", http_port=self.mcp_port, augments_dir=self.augments_dir, composed_augments_file=self.composed_file ) # Create factory with shared AugmentManager self.mcp_factory = NisabaMCPFactory(config) # Create MCP server (HTTP transport) self.mcp_server = self.mcp_factory.create_mcp_server( host=config.http_host, port=config.http_port ) # Start MCP server as background task self.mcp_task = asyncio.create_task(self._run_mcp_server()) # Give it a moment to start await asyncio.sleep(1) # Self-register for discovery (temporarily enable flag to bypass check) original_flag = self.mcp_factory.config.enable_http_transport try: self.mcp_factory.config.enable_http_transport = True self.mcp_factory._register_to_discovery() finally: self.mcp_factory.config.enable_http_transport = original_flag logger.info(f"✓ MCP server running on port {self.mcp_port}") async def _run_mcp_server(self) -> None: """Run MCP server (async task).""" try: # Run as HTTP transport (SSE) await self.mcp_server.run_streamable_http_async() except asyncio.CancelledError: # Expected during shutdown - don't re-raise to avoid error logs logger.debug("MCP server task cancelled (clean shutdown)") return except Exception as e: logger.error(f"MCP server error: {e}", exc_info=True) raise async def stop(self) -> None: """ Stop both servers gracefully. Cancels tasks and performs cleanup. """ logger.info("=" * 60) logger.info("🛑 Stopping Unified Nisaba Server") logger.info("=" * 60) # Cancel MCP server task if self.mcp_task and not self.mcp_task.done(): logger.info("Stopping MCP server...") self.mcp_task.cancel() try: await self.mcp_task except asyncio.CancelledError: pass # Cancel proxy task if self.proxy_task and not self.proxy_task.done(): logger.info("Stopping proxy...") self.proxy_task.cancel() try: await self.proxy_task except asyncio.CancelledError: pass # Shutdown proxy master if self.proxy_master: self.proxy_master.shutdown() logger.info("✓ Unified server stopped") logger.info("=" * 60) async def run_until_stopped(self) -> None: """ Run until externally stopped. Waits for both tasks to complete (or be cancelled). """ await self.start() try: # Wait for both tasks (they run indefinitely until cancelled) await asyncio.gather( self.proxy_task, self.mcp_task, return_exceptions=True ) except asyncio.CancelledError: logger.info("Server cancelled") finally: await self.stop() async def run_unified_server( augments_dir: Path, composed_file: Path, proxy_port: int = 1337, mcp_port: int = 9973, debug_proxy: bool = False, timeout: Optional[float] = None ) -> None: """ Run unified server for a specific duration or until interrupted. Args: augments_dir: Directory containing augment files composed_file: Path to composed augments file proxy_port: Port for proxy (default: 1337) mcp_port: Port for MCP server (default: 9973) debug_proxy: Show proxy debug output timeout: Optional timeout in seconds (for testing) """ server = UnifiedNisabaServer( augments_dir=augments_dir, composed_file=composed_file, proxy_port=proxy_port, mcp_port=mcp_port, debug_proxy=debug_proxy ) try: if timeout: # Run with timeout (for testing) await asyncio.wait_for(server.run_until_stopped(), timeout=timeout) else: # Run indefinitely await server.run_until_stopped() except asyncio.TimeoutError: logger.info(f"Server timeout after {timeout}s") except KeyboardInterrupt: logger.info("Server interrupted by user") finally: await server.stop()

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/y3i12/nabu_nisaba'

If you have feedback or need assistance with the MCP directory API, please join our Discord server