Skip to main content
Glama

FGD Fusion Stack Pro

mcp_backend.py38.9 kB
#!/usr/bin/env python3 """ FGD Stack MCP Server – Production-Ready with Complete LLM Support Features: - Pydantic config validation - Complete Claude API support - Rate limiting for LLM queries - Graceful shutdown handling - Comprehensive type hints - Hardened path security - Detailed error messages """ import os import json import logging import signal import asyncio from pathlib import Path from datetime import datetime, timedelta from typing import Dict, Any, Callable, Awaitable, List, Optional from collections import deque import yaml import aiohttp from pydantic import BaseModel, Field, validator from watchdog.observers import Observer from watchdog.events import FileSystemEventHandler # MCP SDK imports from mcp.server import Server from mcp.server.stdio import stdio_server from mcp.types import Tool, TextContent # Configure logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) # ==================== PYDANTIC MODELS ==================== class ScanConfig(BaseModel): """Configuration for file scanning limits.""" max_dir_size_gb: int = Field(default=2, ge=1, le=10) max_files_per_scan: int = Field(default=5, ge=1, le=100) max_file_size_kb: int = Field(default=250, ge=1, le=10000) class ProviderConfig(BaseModel): """Configuration for individual LLM provider.""" model: str base_url: str class LLMConfig(BaseModel): """Configuration for LLM providers.""" default_provider: str = Field(default="grok") providers: Dict[str, ProviderConfig] @validator('default_provider') def validate_default_provider(cls, v): """Ensure default provider is grok for reliability.""" if v not in ['grok', 'openai', 'claude', 'ollama']: logger.warning(f"Invalid default_provider '{v}', falling back to 'grok'") return 'grok' return v class ServerConfig(BaseModel): """Main server configuration with validation.""" watch_dir: str memory_file: str = Field(default=".fgd_memory.json") log_file: str = Field(default="fgd_server.log") context_limit: int = Field(default=20, ge=5, le=100) scan: ScanConfig = Field(default_factory=ScanConfig) llm: LLMConfig @validator('watch_dir') def validate_watch_dir(cls, v): """Validate watch directory exists.""" path = Path(v) if not path.exists(): raise ValueError(f"watch_dir does not exist: {v}") if not path.is_dir(): raise ValueError(f"watch_dir is not a directory: {v}") return str(path.resolve()) # ==================== RATE LIMITER ==================== class RateLimiter: """Token bucket rate limiter for LLM queries.""" def __init__(self, max_requests: int = 10, window_seconds: int = 60): """ Initialize rate limiter. Args: max_requests: Maximum requests allowed in time window window_seconds: Time window in seconds """ self.max_requests = max_requests self.window_seconds = window_seconds self.requests: deque = deque() async def acquire(self) -> bool: """ Attempt to acquire permission for a request. Returns: True if request is allowed, False if rate limited """ now = datetime.now() cutoff = now - timedelta(seconds=self.window_seconds) # Remove old requests outside the window while self.requests and self.requests[0] < cutoff: self.requests.popleft() # Check if we're at the limit if len(self.requests) >= self.max_requests: return False # Add this request self.requests.append(now) return True def get_wait_time(self) -> float: """ Get time to wait before next request is allowed. Returns: Seconds to wait, or 0 if request would be allowed now """ if len(self.requests) < self.max_requests: return 0.0 oldest = self.requests[0] cutoff = oldest + timedelta(seconds=self.window_seconds) wait = (cutoff - datetime.now()).total_seconds() return max(0.0, wait) # ==================== MEMORY STORE ==================== class MemoryStore: """Persistent memory storage with categorization and access tracking.""" def __init__(self, memory_file: Path, config: ServerConfig): """ Initialize memory store. Args: memory_file: Path to JSON memory file config: Server configuration """ self.memory_file = memory_file self.memories: Dict[str, Dict[str, Any]] = self._load() self.context: List[Dict[str, Any]] = [] self.limit = config.context_limit def _load(self) -> Dict[str, Dict[str, Any]]: """ Load memories from disk. Returns: Dictionary of categorized memories """ if self.memory_file.exists(): try: content = self.memory_file.read_text(encoding='utf-8') return json.loads(content) except json.JSONDecodeError as e: logger.error(f"Failed to parse memory file: {e}") return {} except IOError as e: logger.error(f"Failed to read memory file: {e}") return {} return {} def _save(self) -> None: """Save memories to disk.""" try: self.memory_file.write_text( json.dumps(self.memories, indent=2), encoding='utf-8' ) except IOError as e: logger.error(f"Failed to save memory file: {e}") def remember(self, key: str, value: Any, category: str = "general") -> None: """ Store a memory. Args: key: Memory key value: Memory value (must be JSON serializable) category: Memory category for organization """ if category not in self.memories: self.memories[category] = {} self.memories[category][key] = { "value": value, "timestamp": datetime.now().isoformat(), "access_count": 0 } self._save() logger.debug(f"Stored memory: category={category}, key={key}") def recall( self, key: Optional[str] = None, category: Optional[str] = None ) -> Dict[str, Any]: """ Retrieve memories. Args: key: Specific memory key (optional) category: Memory category (optional) Returns: Matching memories """ if key and category: if category in self.memories and key in self.memories[category]: self.memories[category][key]["access_count"] += 1 self._save() return {key: self.memories[category][key]} return {} elif category: return self.memories.get(category, {}) return self.memories def add_context(self, type_: str, data: Any) -> None: """ Add item to rolling context window. Args: type_: Context type (e.g., 'file_change', 'file_read') data: Context data """ self.context.append({ "type": type_, "data": data, "timestamp": datetime.now().isoformat() }) # Maintain rolling window if len(self.context) > self.limit: self.context = self.context[-self.limit:] def get_context(self, count: int = 5) -> List[Dict[str, Any]]: """ Get recent context items. Args: count: Number of recent items to return Returns: List of recent context items """ return self.context[-count:] if self.context else [] # ==================== LLM BACKEND ==================== class LLMBackend: """Multi-provider LLM backend with complete API support.""" def __init__(self, config: ServerConfig, rate_limiter: RateLimiter): """ Initialize LLM backend. Args: config: Server configuration rate_limiter: Rate limiter instance """ self.config = config.llm self.default = config.llm.default_provider self.rate_limiter = rate_limiter self.timeout = aiohttp.ClientTimeout(total=60) async def query( self, prompt: str, provider: Optional[str] = None, model: Optional[str] = None, context: str = "" ) -> str: """ Query an LLM provider with rate limiting. Args: prompt: User prompt provider: LLM provider name (defaults to config) model: Model name override (optional) context: Additional context to prepend Returns: LLM response text """ # Check rate limit if not await self.rate_limiter.acquire(): wait_time = self.rate_limiter.get_wait_time() return f"Error: Rate limit exceeded. Please wait {wait_time:.1f} seconds." provider = provider or self.default conf = self.config.providers.get(provider) if not conf: return f"Error: Provider '{provider}' not configured" # Build full prompt with context full_prompt = f"{context}\n\n{prompt}" if context else prompt model = model or conf.model base_url = conf.base_url try: if provider == "grok": return await self._query_grok(base_url, model, full_prompt) elif provider == "openai": return await self._query_openai(base_url, model, full_prompt) elif provider == "claude": return await self._query_claude(base_url, model, full_prompt) elif provider == "ollama": return await self._query_ollama(base_url, model, full_prompt) else: return f"Error: Provider '{provider}' not implemented" except asyncio.TimeoutError: logger.error(f"{provider} request timed out") return f"Error: {provider} request timed out after 60 seconds" except Exception as e: logger.error(f"{provider} query failed: {e}", exc_info=True) return f"Error: {provider} query failed: {str(e)}" async def _query_grok(self, base_url: str, model: str, prompt: str) -> str: """Query Grok (X.AI) API.""" api_key = os.getenv("XAI_API_KEY") if not api_key: return "Error: XAI_API_KEY environment variable not set" headers = { "Authorization": f"Bearer {api_key}", "Content-Type": "application/json" } data = { "model": model, "messages": [{"role": "user", "content": prompt}] } async with aiohttp.ClientSession(timeout=self.timeout) as session: async with session.post( f"{base_url}/chat/completions", json=data, headers=headers ) as response: if response.status != 200: error_text = await response.text() return f"Grok API Error {response.status}: {error_text}" result = await response.json() return result['choices'][0]['message']['content'] async def _query_openai(self, base_url: str, model: str, prompt: str) -> str: """Query OpenAI API.""" api_key = os.getenv("OPENAI_API_KEY") if not api_key: return "Error: OPENAI_API_KEY environment variable not set" headers = { "Authorization": f"Bearer {api_key}", "Content-Type": "application/json" } data = { "model": model, "messages": [{"role": "user", "content": prompt}] } async with aiohttp.ClientSession(timeout=self.timeout) as session: async with session.post( f"{base_url}/chat/completions", json=data, headers=headers ) as response: if response.status != 200: error_text = await response.text() return f"OpenAI API Error {response.status}: {error_text}" result = await response.json() return result['choices'][0]['message']['content'] async def _query_claude(self, base_url: str, model: str, prompt: str) -> str: """Query Claude (Anthropic) API.""" api_key = os.getenv("ANTHROPIC_API_KEY") if not api_key: return "Error: ANTHROPIC_API_KEY environment variable not set" headers = { "x-api-key": api_key, "anthropic-version": "2023-06-01", "Content-Type": "application/json" } data = { "model": model, "max_tokens": 4096, "messages": [{"role": "user", "content": prompt}] } async with aiohttp.ClientSession(timeout=self.timeout) as session: async with session.post( f"{base_url}/messages", json=data, headers=headers ) as response: if response.status != 200: error_text = await response.text() return f"Claude API Error {response.status}: {error_text}" result = await response.json() # Claude API returns content in a different format return result['content'][0]['text'] async def _query_ollama(self, base_url: str, model: str, prompt: str) -> str: """Query Ollama (local) API.""" data = { "model": model, "messages": [{"role": "user", "content": prompt}], "stream": False } try: async with aiohttp.ClientSession(timeout=self.timeout) as session: async with session.post( f"{base_url}/chat/completions", json=data ) as response: if response.status != 200: error_text = await response.text() return f"Ollama API Error {response.status}: {error_text}" result = await response.json() return result['choices'][0]['message']['content'] except aiohttp.ClientConnectorError: return "Error: Cannot connect to Ollama. Ensure Ollama is running at http://localhost:11434" # ==================== FILE WATCHER ==================== class FileChangeHandler(FileSystemEventHandler): """Handler for file system events.""" def __init__(self, callback: Callable[[str, str], None]): """ Initialize handler. Args: callback: Function to call on file changes (event_type, path) """ super().__init__() self.callback = callback def on_modified(self, event): """Handle file modification.""" if not event.is_directory: self.callback('modified', event.src_path) def on_created(self, event): """Handle file creation.""" if not event.is_directory: self.callback('created', event.src_path) def on_deleted(self, event): """Handle file deletion.""" if not event.is_directory: self.callback('deleted', event.src_path) # ==================== MAIN SERVER ==================== class FGDMCPServer: """Main MCP server with production-ready features.""" def __init__(self, config_path: str): """ Initialize MCP server. Args: config_path: Path to YAML configuration file """ # Load and validate config with open(config_path, 'r', encoding='utf-8') as f: config_dict = yaml.safe_load(f) self.config = ServerConfig(**config_dict) self.watch_dir = Path(self.config.watch_dir).resolve() # Initialize components self.rate_limiter = RateLimiter(max_requests=10, window_seconds=60) self.memory = MemoryStore( self.watch_dir / self.config.memory_file, self.config ) self.llm = LLMBackend(self.config, self.rate_limiter) self.recent_changes: List[Dict[str, Any]] = [] self.observer: Optional[Observer] = None # Setup logging self.log_file = self._resolve_log_file() self._ensure_log_handler() logger.info(f"FGD MCP Server initialized - Logging to {self.log_file}") logger.info(f"Watching directory: {self.watch_dir}") logger.info(f"Default LLM provider: {self.config.llm.default_provider}") # Start file watcher self._start_watcher() # Initialize MCP server self.server = Server("fgd-mcp-server") self._setup_handlers() # Setup shutdown handlers self._setup_shutdown_handlers() def _resolve_log_file(self) -> Path: """ Resolve log file path. Returns: Absolute path to log file """ configured = self.config.log_file candidate = Path(configured) if not candidate.is_absolute(): candidate = (self.watch_dir / candidate).resolve() candidate.parent.mkdir(parents=True, exist_ok=True) return candidate def _ensure_log_handler(self) -> None: """Ensure file logging handler is configured.""" # Check if handler already exists for handler in logger.handlers: if isinstance(handler, logging.FileHandler): if Path(handler.baseFilename) == self.log_file: return # Add file handler file_handler = logging.FileHandler(self.log_file, encoding='utf-8') file_handler.setFormatter( logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') ) logger.addHandler(file_handler) def _start_watcher(self) -> None: """Start file system watcher.""" try: handler = FileChangeHandler(self._on_file_change) self.observer = Observer() self.observer.schedule(handler, str(self.watch_dir), recursive=True) self.observer.start() logger.info("File watcher started successfully") except Exception as e: logger.warning(f"File watcher failed to start: {e}") self.observer = None def _on_file_change(self, event_type: str, path: str) -> None: """ Handle file system change events. Args: event_type: Type of event (created, modified, deleted) path: Absolute path to changed file """ try: path_obj = Path(path) rel_path = str(path_obj.relative_to(self.watch_dir)) change_record = { "type": event_type, "path": rel_path, "timestamp": datetime.now().isoformat() } self.recent_changes.append(change_record) # Keep only last 50 changes if len(self.recent_changes) > 50: self.recent_changes = self.recent_changes[-50:] # Add to context self.memory.add_context("file_change", { "type": event_type, "path": rel_path }) logger.debug(f"File {event_type}: {rel_path}") except ValueError: # Path is outside watch directory, ignore pass except Exception as e: logger.error(f"Error processing file change: {e}") def _sanitize_path(self, relative_path: str) -> Path: """ Sanitize and validate a relative path. Args: relative_path: User-provided relative path Returns: Validated absolute Path object Raises: ValueError: If path is invalid or outside watch directory """ # Normalize the path to prevent traversal attacks normalized = os.path.normpath(relative_path) # Check for path traversal attempts if normalized.startswith('..') or os.path.isabs(normalized): raise ValueError( f"Invalid path '{relative_path}': " "Path must be relative and within watch directory" ) # Resolve full path full_path = (self.watch_dir / normalized).resolve() # Ensure path is within watch directory try: full_path.relative_to(self.watch_dir) except ValueError: raise ValueError( f"Path traversal blocked: '{relative_path}' " f"resolves outside watch directory" ) return full_path def _setup_shutdown_handlers(self) -> None: """Setup graceful shutdown signal handlers.""" def signal_handler(signum, frame): logger.info(f"Received signal {signum}, initiating graceful shutdown...") self.stop() signal.signal(signal.SIGINT, signal_handler) signal.signal(signal.SIGTERM, signal_handler) def _setup_handlers(self) -> None: """Setup MCP tool handlers.""" @self.server.list_tools() async def list_tools() -> List[Tool]: """List all available MCP tools.""" return [ Tool( name="read_file", description="Read contents of a file in the watched directory", inputSchema={ "type": "object", "properties": { "filepath": { "type": "string", "description": "Relative path to file" } }, "required": ["filepath"], }, ), Tool( name="list_files", description="List files matching a glob pattern", inputSchema={ "type": "object", "properties": { "pattern": { "type": "string", "description": "Glob pattern (e.g., '**/*.py')", "default": "**/*" } }, }, ), Tool( name="search_in_files", description="Search for text across files", inputSchema={ "type": "object", "properties": { "query": { "type": "string", "description": "Text to search for" }, "pattern": { "type": "string", "description": "Glob pattern for files to search", "default": "**/*" } }, "required": ["query"], }, ), Tool( name="llm_query", description="Query an LLM with automatic context injection (defaults to Grok)", inputSchema={ "type": "object", "properties": { "prompt": { "type": "string", "description": "Prompt for the LLM" }, "provider": { "type": "string", "description": "LLM provider (grok, openai, claude, ollama)", "default": "grok", "enum": ["grok", "openai", "claude", "ollama"] } }, "required": ["prompt"], }, ), Tool( name="remember", description="Store information in persistent memory", inputSchema={ "type": "object", "properties": { "key": { "type": "string", "description": "Memory key" }, "value": { "type": "string", "description": "Memory value" }, "category": { "type": "string", "description": "Memory category", "default": "general" } }, "required": ["key", "value"], }, ), Tool( name="recall", description="Retrieve stored memories", inputSchema={ "type": "object", "properties": { "key": { "type": "string", "description": "Specific memory key (optional)" }, "category": { "type": "string", "description": "Memory category (optional)" } }, }, ), Tool( name="get_recent_changes", description="Get list of recent file changes", inputSchema={ "type": "object", "properties": { "count": { "type": "integer", "description": "Number of changes to return", "default": 10 } }, }, ), ] async def handle_read_file(args: Dict[str, Any]) -> List[TextContent]: """Handle read_file tool.""" try: filepath = args.get("filepath") if not filepath: return [TextContent( type="text", text="Error: Missing required argument 'filepath'" )] path = self._sanitize_path(filepath) if not path.exists(): return [TextContent( type="text", text=f"Error: File not found: {filepath}" )] if not path.is_file(): return [TextContent( type="text", text=f"Error: Path is not a file: {filepath}" )] # Check file size max_bytes = self.config.scan.max_file_size_kb * 1024 file_size = path.stat().st_size if file_size > max_bytes: return [TextContent( type="text", text=f"Error: File too large ({file_size} bytes, limit {max_bytes} bytes)" )] # Read file content = path.read_text(encoding='utf-8') self.memory.add_context("file_read", {"path": filepath}) return [TextContent(type="text", text=content)] except ValueError as e: return [TextContent(type="text", text=f"Error: {str(e)}")] except UnicodeDecodeError: return [TextContent( type="text", text=f"Error: File is not valid UTF-8 text: {filepath}" )] except IOError as e: return [TextContent( type="text", text=f"Error: Failed to read file '{filepath}': {str(e)}" )] except Exception as e: logger.error(f"Unexpected error in read_file: {e}", exc_info=True) return [TextContent( type="text", text=f"Error: Unexpected error reading file: {str(e)}" )] async def handle_list_files(args: Dict[str, Any]) -> List[TextContent]: """Handle list_files tool.""" try: pattern = args.get("pattern", "**/*") max_files = self.config.scan.max_files_per_scan files = [] for path in self.watch_dir.glob(pattern): if not path.is_file(): continue try: rel_path = path.relative_to(self.watch_dir) files.append(str(rel_path)) except ValueError: # Path outside watch directory, skip continue if len(files) >= max_files: break result = { "files": files, "count": len(files), "truncated": len(files) >= max_files } return [TextContent( type="text", text=json.dumps(result, indent=2) )] except Exception as e: logger.error(f"Error in list_files: {e}", exc_info=True) return [TextContent( type="text", text=f"Error: Failed to list files: {str(e)}" )] async def handle_search_in_files(args: Dict[str, Any]) -> List[TextContent]: """Handle search_in_files tool.""" try: query = args.get("query") if not query: return [TextContent( type="text", text="Error: Missing required argument 'query'" )] pattern = args.get("pattern", "**/*") max_files = self.config.scan.max_files_per_scan max_bytes = self.config.scan.max_file_size_kb * 1024 matches = [] files_searched = 0 for path in self.watch_dir.glob(pattern): if files_searched >= max_files: break if not path.is_file(): continue try: # Skip files that are too large if path.stat().st_size > max_bytes: continue content = path.read_text(encoding='utf-8').lower() if query.lower() in content: rel_path = path.relative_to(self.watch_dir) matches.append({"filepath": str(rel_path)}) files_searched += 1 except (UnicodeDecodeError, IOError): # Skip files that can't be read continue except ValueError: # Path outside watch directory continue result = { "query": query, "matches": matches, "files_searched": files_searched, "truncated": files_searched >= max_files } return [TextContent( type="text", text=json.dumps(result, indent=2) )] except Exception as e: logger.error(f"Error in search_in_files: {e}", exc_info=True) return [TextContent( type="text", text=f"Error: Failed to search files: {str(e)}" )] async def handle_llm_query(args: Dict[str, Any]) -> List[TextContent]: """Handle llm_query tool.""" try: prompt = args.get("prompt") if not prompt: return [TextContent( type="text", text="Error: Missing required argument 'prompt'" )] # Default to grok for reliability provider = args.get("provider", "grok") # Get recent context context = json.dumps(self.memory.get_context(5), indent=2) # Query LLM response = await self.llm.query(prompt, provider, context=context) # Store response in memory timestamp = datetime.now().isoformat() self.memory.remember( f"{provider}_{timestamp}", response, "llm" ) return [TextContent(type="text", text=response)] except Exception as e: logger.error(f"Error in llm_query: {e}", exc_info=True) return [TextContent( type="text", text=f"Error: LLM query failed: {str(e)}" )] async def handle_remember(args: Dict[str, Any]) -> List[TextContent]: """Handle remember tool.""" try: key = args.get("key") value = args.get("value") if not key or value is None: return [TextContent( type="text", text="Error: Missing required arguments 'key' or 'value'" )] category = args.get("category", "general") self.memory.remember(key, value, category) return [TextContent( type="text", text=f"Successfully stored memory: category={category}, key={key}" )] except Exception as e: logger.error(f"Error in remember: {e}", exc_info=True) return [TextContent( type="text", text=f"Error: Failed to store memory: {str(e)}" )] async def handle_recall(args: Dict[str, Any]) -> List[TextContent]: """Handle recall tool.""" try: key = args.get("key") category = args.get("category") data = self.memory.recall(key, category) return [TextContent( type="text", text=json.dumps(data, indent=2) )] except Exception as e: logger.error(f"Error in recall: {e}", exc_info=True) return [TextContent( type="text", text=f"Error: Failed to recall memory: {str(e)}" )] async def handle_get_recent_changes(args: Dict[str, Any]) -> List[TextContent]: """Handle get_recent_changes tool.""" try: count = args.get("count", 10) changes = self.recent_changes[-count:] if self.recent_changes else [] result = { "changes": changes, "count": len(changes) } return [TextContent( type="text", text=json.dumps(result, indent=2) )] except Exception as e: logger.error(f"Error in get_recent_changes: {e}", exc_info=True) return [TextContent( type="text", text=f"Error: Failed to get recent changes: {str(e)}" )] # Register tool handlers tool_handlers: Dict[str, Callable[[Dict[str, Any]], Awaitable[List[TextContent]]]] = { "read_file": handle_read_file, "list_files": handle_list_files, "search_in_files": handle_search_in_files, "llm_query": handle_llm_query, "remember": handle_remember, "recall": handle_recall, "get_recent_changes": handle_get_recent_changes, } @self.server.call_tool() async def handle_tool_call( tool_name: str, arguments: Optional[Dict[str, Any]] ) -> List[TextContent]: """Route tool calls to appropriate handlers.""" handler = tool_handlers.get(tool_name) if not handler: return [TextContent( type="text", text=f"Error: Unknown tool '{tool_name}'" )] return await handler(arguments or {}) async def run(self) -> None: """Run the MCP server.""" logger.info("MCP Server starting...") try: async with stdio_server() as (read, write): await self.server.run( read, write, self.server.create_initialization_options() ) except Exception as e: logger.error(f"Server error: {e}", exc_info=True) raise finally: self.stop() def stop(self) -> None: """Stop the server and cleanup resources.""" logger.info("Shutting down MCP server...") if self.observer: try: self.observer.stop() self.observer.join(timeout=5) except Exception as e: logger.error(f"Error stopping file watcher: {e}") finally: self.observer = None logger.info("MCP server stopped") # ==================== ENTRY POINT ==================== if __name__ == "__main__": import sys config_path = sys.argv[1] if len(sys.argv) > 1 else "fgd_config.yaml" try: server = FGDMCPServer(config_path) asyncio.run(server.run()) except KeyboardInterrupt: logger.info("Received keyboard interrupt") except Exception as e: logger.error(f"Fatal error: {e}", exc_info=True) sys.exit(1)

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/mikeychann-hash/MCPM'

If you have feedback or need assistance with the MCP directory API, please join our Discord server