mcp_backend.py•38.9 kB
#!/usr/bin/env python3
"""
FGD Stack MCP Server – Production-Ready with Complete LLM Support
Features:
- Pydantic config validation
- Complete Claude API support
- Rate limiting for LLM queries
- Graceful shutdown handling
- Comprehensive type hints
- Hardened path security
- Detailed error messages
"""
import os
import json
import logging
import signal
import asyncio
from pathlib import Path
from datetime import datetime, timedelta
from typing import Dict, Any, Callable, Awaitable, List, Optional
from collections import deque
import yaml
import aiohttp
from pydantic import BaseModel, Field, validator
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
# MCP SDK imports
from mcp.server import Server
from mcp.server.stdio import stdio_server
from mcp.types import Tool, TextContent
# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
# ==================== PYDANTIC MODELS ====================
class ScanConfig(BaseModel):
    """Configuration for file scanning limits."""
    max_dir_size_gb: int = Field(default=2, ge=1, le=10)
    max_files_per_scan: int = Field(default=5, ge=1, le=100)
    max_file_size_kb: int = Field(default=250, ge=1, le=10000)
class ProviderConfig(BaseModel):
    """Configuration for individual LLM provider."""
    model: str
    base_url: str
class LLMConfig(BaseModel):
    """Configuration for LLM providers."""
    default_provider: str = Field(default="grok")
    providers: Dict[str, ProviderConfig]
    @validator('default_provider')
    def validate_default_provider(cls, v):
        """Ensure default provider is grok for reliability."""
        if v not in ['grok', 'openai', 'claude', 'ollama']:
            logger.warning(f"Invalid default_provider '{v}', falling back to 'grok'")
            return 'grok'
        return v
class ServerConfig(BaseModel):
    """Main server configuration with validation."""
    watch_dir: str
    memory_file: str = Field(default=".fgd_memory.json")
    log_file: str = Field(default="fgd_server.log")
    context_limit: int = Field(default=20, ge=5, le=100)
    scan: ScanConfig = Field(default_factory=ScanConfig)
    llm: LLMConfig
    @validator('watch_dir')
    def validate_watch_dir(cls, v):
        """Validate watch directory exists."""
        path = Path(v)
        if not path.exists():
            raise ValueError(f"watch_dir does not exist: {v}")
        if not path.is_dir():
            raise ValueError(f"watch_dir is not a directory: {v}")
        return str(path.resolve())
# ==================== RATE LIMITER ====================
class RateLimiter:
    """Token bucket rate limiter for LLM queries."""
    def __init__(self, max_requests: int = 10, window_seconds: int = 60):
        """
        Initialize rate limiter.
        Args:
            max_requests: Maximum requests allowed in time window
            window_seconds: Time window in seconds
        """
        self.max_requests = max_requests
        self.window_seconds = window_seconds
        self.requests: deque = deque()
    async def acquire(self) -> bool:
        """
        Attempt to acquire permission for a request.
        Returns:
            True if request is allowed, False if rate limited
        """
        now = datetime.now()
        cutoff = now - timedelta(seconds=self.window_seconds)
        # Remove old requests outside the window
        while self.requests and self.requests[0] < cutoff:
            self.requests.popleft()
        # Check if we're at the limit
        if len(self.requests) >= self.max_requests:
            return False
        # Add this request
        self.requests.append(now)
        return True
    def get_wait_time(self) -> float:
        """
        Get time to wait before next request is allowed.
        Returns:
            Seconds to wait, or 0 if request would be allowed now
        """
        if len(self.requests) < self.max_requests:
            return 0.0
        oldest = self.requests[0]
        cutoff = oldest + timedelta(seconds=self.window_seconds)
        wait = (cutoff - datetime.now()).total_seconds()
        return max(0.0, wait)
# ==================== MEMORY STORE ====================
class MemoryStore:
    """Persistent memory storage with categorization and access tracking."""
    def __init__(self, memory_file: Path, config: ServerConfig):
        """
        Initialize memory store.
        Args:
            memory_file: Path to JSON memory file
            config: Server configuration
        """
        self.memory_file = memory_file
        self.memories: Dict[str, Dict[str, Any]] = self._load()
        self.context: List[Dict[str, Any]] = []
        self.limit = config.context_limit
    def _load(self) -> Dict[str, Dict[str, Any]]:
        """
        Load memories from disk.
        Returns:
            Dictionary of categorized memories
        """
        if self.memory_file.exists():
            try:
                content = self.memory_file.read_text(encoding='utf-8')
                return json.loads(content)
            except json.JSONDecodeError as e:
                logger.error(f"Failed to parse memory file: {e}")
                return {}
            except IOError as e:
                logger.error(f"Failed to read memory file: {e}")
                return {}
        return {}
    def _save(self) -> None:
        """Save memories to disk."""
        try:
            self.memory_file.write_text(
                json.dumps(self.memories, indent=2),
                encoding='utf-8'
            )
        except IOError as e:
            logger.error(f"Failed to save memory file: {e}")
    def remember(self, key: str, value: Any, category: str = "general") -> None:
        """
        Store a memory.
        Args:
            key: Memory key
            value: Memory value (must be JSON serializable)
            category: Memory category for organization
        """
        if category not in self.memories:
            self.memories[category] = {}
        self.memories[category][key] = {
            "value": value,
            "timestamp": datetime.now().isoformat(),
            "access_count": 0
        }
        self._save()
        logger.debug(f"Stored memory: category={category}, key={key}")
    def recall(
        self,
        key: Optional[str] = None,
        category: Optional[str] = None
    ) -> Dict[str, Any]:
        """
        Retrieve memories.
        Args:
            key: Specific memory key (optional)
            category: Memory category (optional)
        Returns:
            Matching memories
        """
        if key and category:
            if category in self.memories and key in self.memories[category]:
                self.memories[category][key]["access_count"] += 1
                self._save()
                return {key: self.memories[category][key]}
            return {}
        elif category:
            return self.memories.get(category, {})
        return self.memories
    def add_context(self, type_: str, data: Any) -> None:
        """
        Add item to rolling context window.
        Args:
            type_: Context type (e.g., 'file_change', 'file_read')
            data: Context data
        """
        self.context.append({
            "type": type_,
            "data": data,
            "timestamp": datetime.now().isoformat()
        })
        # Maintain rolling window
        if len(self.context) > self.limit:
            self.context = self.context[-self.limit:]
    def get_context(self, count: int = 5) -> List[Dict[str, Any]]:
        """
        Get recent context items.
        Args:
            count: Number of recent items to return
        Returns:
            List of recent context items
        """
        return self.context[-count:] if self.context else []
# ==================== LLM BACKEND ====================
class LLMBackend:
    """Multi-provider LLM backend with complete API support."""
    def __init__(self, config: ServerConfig, rate_limiter: RateLimiter):
        """
        Initialize LLM backend.
        Args:
            config: Server configuration
            rate_limiter: Rate limiter instance
        """
        self.config = config.llm
        self.default = config.llm.default_provider
        self.rate_limiter = rate_limiter
        self.timeout = aiohttp.ClientTimeout(total=60)
    async def query(
        self,
        prompt: str,
        provider: Optional[str] = None,
        model: Optional[str] = None,
        context: str = ""
    ) -> str:
        """
        Query an LLM provider with rate limiting.
        Args:
            prompt: User prompt
            provider: LLM provider name (defaults to config)
            model: Model name override (optional)
            context: Additional context to prepend
        Returns:
            LLM response text
        """
        # Check rate limit
        if not await self.rate_limiter.acquire():
            wait_time = self.rate_limiter.get_wait_time()
            return f"Error: Rate limit exceeded. Please wait {wait_time:.1f} seconds."
        provider = provider or self.default
        conf = self.config.providers.get(provider)
        if not conf:
            return f"Error: Provider '{provider}' not configured"
        # Build full prompt with context
        full_prompt = f"{context}\n\n{prompt}" if context else prompt
        model = model or conf.model
        base_url = conf.base_url
        try:
            if provider == "grok":
                return await self._query_grok(base_url, model, full_prompt)
            elif provider == "openai":
                return await self._query_openai(base_url, model, full_prompt)
            elif provider == "claude":
                return await self._query_claude(base_url, model, full_prompt)
            elif provider == "ollama":
                return await self._query_ollama(base_url, model, full_prompt)
            else:
                return f"Error: Provider '{provider}' not implemented"
        except asyncio.TimeoutError:
            logger.error(f"{provider} request timed out")
            return f"Error: {provider} request timed out after 60 seconds"
        except Exception as e:
            logger.error(f"{provider} query failed: {e}", exc_info=True)
            return f"Error: {provider} query failed: {str(e)}"
    async def _query_grok(self, base_url: str, model: str, prompt: str) -> str:
        """Query Grok (X.AI) API."""
        api_key = os.getenv("XAI_API_KEY")
        if not api_key:
            return "Error: XAI_API_KEY environment variable not set"
        headers = {
            "Authorization": f"Bearer {api_key}",
            "Content-Type": "application/json"
        }
        data = {
            "model": model,
            "messages": [{"role": "user", "content": prompt}]
        }
        async with aiohttp.ClientSession(timeout=self.timeout) as session:
            async with session.post(
                f"{base_url}/chat/completions",
                json=data,
                headers=headers
            ) as response:
                if response.status != 200:
                    error_text = await response.text()
                    return f"Grok API Error {response.status}: {error_text}"
                result = await response.json()
                return result['choices'][0]['message']['content']
    async def _query_openai(self, base_url: str, model: str, prompt: str) -> str:
        """Query OpenAI API."""
        api_key = os.getenv("OPENAI_API_KEY")
        if not api_key:
            return "Error: OPENAI_API_KEY environment variable not set"
        headers = {
            "Authorization": f"Bearer {api_key}",
            "Content-Type": "application/json"
        }
        data = {
            "model": model,
            "messages": [{"role": "user", "content": prompt}]
        }
        async with aiohttp.ClientSession(timeout=self.timeout) as session:
            async with session.post(
                f"{base_url}/chat/completions",
                json=data,
                headers=headers
            ) as response:
                if response.status != 200:
                    error_text = await response.text()
                    return f"OpenAI API Error {response.status}: {error_text}"
                result = await response.json()
                return result['choices'][0]['message']['content']
    async def _query_claude(self, base_url: str, model: str, prompt: str) -> str:
        """Query Claude (Anthropic) API."""
        api_key = os.getenv("ANTHROPIC_API_KEY")
        if not api_key:
            return "Error: ANTHROPIC_API_KEY environment variable not set"
        headers = {
            "x-api-key": api_key,
            "anthropic-version": "2023-06-01",
            "Content-Type": "application/json"
        }
        data = {
            "model": model,
            "max_tokens": 4096,
            "messages": [{"role": "user", "content": prompt}]
        }
        async with aiohttp.ClientSession(timeout=self.timeout) as session:
            async with session.post(
                f"{base_url}/messages",
                json=data,
                headers=headers
            ) as response:
                if response.status != 200:
                    error_text = await response.text()
                    return f"Claude API Error {response.status}: {error_text}"
                result = await response.json()
                # Claude API returns content in a different format
                return result['content'][0]['text']
    async def _query_ollama(self, base_url: str, model: str, prompt: str) -> str:
        """Query Ollama (local) API."""
        data = {
            "model": model,
            "messages": [{"role": "user", "content": prompt}],
            "stream": False
        }
        try:
            async with aiohttp.ClientSession(timeout=self.timeout) as session:
                async with session.post(
                    f"{base_url}/chat/completions",
                    json=data
                ) as response:
                    if response.status != 200:
                        error_text = await response.text()
                        return f"Ollama API Error {response.status}: {error_text}"
                    result = await response.json()
                    return result['choices'][0]['message']['content']
        except aiohttp.ClientConnectorError:
            return "Error: Cannot connect to Ollama. Ensure Ollama is running at http://localhost:11434"
# ==================== FILE WATCHER ====================
class FileChangeHandler(FileSystemEventHandler):
    """Handler for file system events."""
    def __init__(self, callback: Callable[[str, str], None]):
        """
        Initialize handler.
        Args:
            callback: Function to call on file changes (event_type, path)
        """
        super().__init__()
        self.callback = callback
    def on_modified(self, event):
        """Handle file modification."""
        if not event.is_directory:
            self.callback('modified', event.src_path)
    def on_created(self, event):
        """Handle file creation."""
        if not event.is_directory:
            self.callback('created', event.src_path)
    def on_deleted(self, event):
        """Handle file deletion."""
        if not event.is_directory:
            self.callback('deleted', event.src_path)
# ==================== MAIN SERVER ====================
class FGDMCPServer:
    """Main MCP server with production-ready features."""
    def __init__(self, config_path: str):
        """
        Initialize MCP server.
        Args:
            config_path: Path to YAML configuration file
        """
        # Load and validate config
        with open(config_path, 'r', encoding='utf-8') as f:
            config_dict = yaml.safe_load(f)
        self.config = ServerConfig(**config_dict)
        self.watch_dir = Path(self.config.watch_dir).resolve()
        # Initialize components
        self.rate_limiter = RateLimiter(max_requests=10, window_seconds=60)
        self.memory = MemoryStore(
            self.watch_dir / self.config.memory_file,
            self.config
        )
        self.llm = LLMBackend(self.config, self.rate_limiter)
        self.recent_changes: List[Dict[str, Any]] = []
        self.observer: Optional[Observer] = None
        # Setup logging
        self.log_file = self._resolve_log_file()
        self._ensure_log_handler()
        logger.info(f"FGD MCP Server initialized - Logging to {self.log_file}")
        logger.info(f"Watching directory: {self.watch_dir}")
        logger.info(f"Default LLM provider: {self.config.llm.default_provider}")
        # Start file watcher
        self._start_watcher()
        # Initialize MCP server
        self.server = Server("fgd-mcp-server")
        self._setup_handlers()
        # Setup shutdown handlers
        self._setup_shutdown_handlers()
    def _resolve_log_file(self) -> Path:
        """
        Resolve log file path.
        Returns:
            Absolute path to log file
        """
        configured = self.config.log_file
        candidate = Path(configured)
        if not candidate.is_absolute():
            candidate = (self.watch_dir / candidate).resolve()
        candidate.parent.mkdir(parents=True, exist_ok=True)
        return candidate
    def _ensure_log_handler(self) -> None:
        """Ensure file logging handler is configured."""
        # Check if handler already exists
        for handler in logger.handlers:
            if isinstance(handler, logging.FileHandler):
                if Path(handler.baseFilename) == self.log_file:
                    return
        # Add file handler
        file_handler = logging.FileHandler(self.log_file, encoding='utf-8')
        file_handler.setFormatter(
            logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
        )
        logger.addHandler(file_handler)
    def _start_watcher(self) -> None:
        """Start file system watcher."""
        try:
            handler = FileChangeHandler(self._on_file_change)
            self.observer = Observer()
            self.observer.schedule(handler, str(self.watch_dir), recursive=True)
            self.observer.start()
            logger.info("File watcher started successfully")
        except Exception as e:
            logger.warning(f"File watcher failed to start: {e}")
            self.observer = None
    def _on_file_change(self, event_type: str, path: str) -> None:
        """
        Handle file system change events.
        Args:
            event_type: Type of event (created, modified, deleted)
            path: Absolute path to changed file
        """
        try:
            path_obj = Path(path)
            rel_path = str(path_obj.relative_to(self.watch_dir))
            change_record = {
                "type": event_type,
                "path": rel_path,
                "timestamp": datetime.now().isoformat()
            }
            self.recent_changes.append(change_record)
            # Keep only last 50 changes
            if len(self.recent_changes) > 50:
                self.recent_changes = self.recent_changes[-50:]
            # Add to context
            self.memory.add_context("file_change", {
                "type": event_type,
                "path": rel_path
            })
            logger.debug(f"File {event_type}: {rel_path}")
        except ValueError:
            # Path is outside watch directory, ignore
            pass
        except Exception as e:
            logger.error(f"Error processing file change: {e}")
    def _sanitize_path(self, relative_path: str) -> Path:
        """
        Sanitize and validate a relative path.
        Args:
            relative_path: User-provided relative path
        Returns:
            Validated absolute Path object
        Raises:
            ValueError: If path is invalid or outside watch directory
        """
        # Normalize the path to prevent traversal attacks
        normalized = os.path.normpath(relative_path)
        # Check for path traversal attempts
        if normalized.startswith('..') or os.path.isabs(normalized):
            raise ValueError(
                f"Invalid path '{relative_path}': "
                "Path must be relative and within watch directory"
            )
        # Resolve full path
        full_path = (self.watch_dir / normalized).resolve()
        # Ensure path is within watch directory
        try:
            full_path.relative_to(self.watch_dir)
        except ValueError:
            raise ValueError(
                f"Path traversal blocked: '{relative_path}' "
                f"resolves outside watch directory"
            )
        return full_path
    def _setup_shutdown_handlers(self) -> None:
        """Setup graceful shutdown signal handlers."""
        def signal_handler(signum, frame):
            logger.info(f"Received signal {signum}, initiating graceful shutdown...")
            self.stop()
        signal.signal(signal.SIGINT, signal_handler)
        signal.signal(signal.SIGTERM, signal_handler)
    def _setup_handlers(self) -> None:
        """Setup MCP tool handlers."""
        @self.server.list_tools()
        async def list_tools() -> List[Tool]:
            """List all available MCP tools."""
            return [
                Tool(
                    name="read_file",
                    description="Read contents of a file in the watched directory",
                    inputSchema={
                        "type": "object",
                        "properties": {
                            "filepath": {
                                "type": "string",
                                "description": "Relative path to file"
                            }
                        },
                        "required": ["filepath"],
                    },
                ),
                Tool(
                    name="list_files",
                    description="List files matching a glob pattern",
                    inputSchema={
                        "type": "object",
                        "properties": {
                            "pattern": {
                                "type": "string",
                                "description": "Glob pattern (e.g., '**/*.py')",
                                "default": "**/*"
                            }
                        },
                    },
                ),
                Tool(
                    name="search_in_files",
                    description="Search for text across files",
                    inputSchema={
                        "type": "object",
                        "properties": {
                            "query": {
                                "type": "string",
                                "description": "Text to search for"
                            },
                            "pattern": {
                                "type": "string",
                                "description": "Glob pattern for files to search",
                                "default": "**/*"
                            }
                        },
                        "required": ["query"],
                    },
                ),
                Tool(
                    name="llm_query",
                    description="Query an LLM with automatic context injection (defaults to Grok)",
                    inputSchema={
                        "type": "object",
                        "properties": {
                            "prompt": {
                                "type": "string",
                                "description": "Prompt for the LLM"
                            },
                            "provider": {
                                "type": "string",
                                "description": "LLM provider (grok, openai, claude, ollama)",
                                "default": "grok",
                                "enum": ["grok", "openai", "claude", "ollama"]
                            }
                        },
                        "required": ["prompt"],
                    },
                ),
                Tool(
                    name="remember",
                    description="Store information in persistent memory",
                    inputSchema={
                        "type": "object",
                        "properties": {
                            "key": {
                                "type": "string",
                                "description": "Memory key"
                            },
                            "value": {
                                "type": "string",
                                "description": "Memory value"
                            },
                            "category": {
                                "type": "string",
                                "description": "Memory category",
                                "default": "general"
                            }
                        },
                        "required": ["key", "value"],
                    },
                ),
                Tool(
                    name="recall",
                    description="Retrieve stored memories",
                    inputSchema={
                        "type": "object",
                        "properties": {
                            "key": {
                                "type": "string",
                                "description": "Specific memory key (optional)"
                            },
                            "category": {
                                "type": "string",
                                "description": "Memory category (optional)"
                            }
                        },
                    },
                ),
                Tool(
                    name="get_recent_changes",
                    description="Get list of recent file changes",
                    inputSchema={
                        "type": "object",
                        "properties": {
                            "count": {
                                "type": "integer",
                                "description": "Number of changes to return",
                                "default": 10
                            }
                        },
                    },
                ),
            ]
        async def handle_read_file(args: Dict[str, Any]) -> List[TextContent]:
            """Handle read_file tool."""
            try:
                filepath = args.get("filepath")
                if not filepath:
                    return [TextContent(
                        type="text",
                        text="Error: Missing required argument 'filepath'"
                    )]
                path = self._sanitize_path(filepath)
                if not path.exists():
                    return [TextContent(
                        type="text",
                        text=f"Error: File not found: {filepath}"
                    )]
                if not path.is_file():
                    return [TextContent(
                        type="text",
                        text=f"Error: Path is not a file: {filepath}"
                    )]
                # Check file size
                max_bytes = self.config.scan.max_file_size_kb * 1024
                file_size = path.stat().st_size
                if file_size > max_bytes:
                    return [TextContent(
                        type="text",
                        text=f"Error: File too large ({file_size} bytes, limit {max_bytes} bytes)"
                    )]
                # Read file
                content = path.read_text(encoding='utf-8')
                self.memory.add_context("file_read", {"path": filepath})
                return [TextContent(type="text", text=content)]
            except ValueError as e:
                return [TextContent(type="text", text=f"Error: {str(e)}")]
            except UnicodeDecodeError:
                return [TextContent(
                    type="text",
                    text=f"Error: File is not valid UTF-8 text: {filepath}"
                )]
            except IOError as e:
                return [TextContent(
                    type="text",
                    text=f"Error: Failed to read file '{filepath}': {str(e)}"
                )]
            except Exception as e:
                logger.error(f"Unexpected error in read_file: {e}", exc_info=True)
                return [TextContent(
                    type="text",
                    text=f"Error: Unexpected error reading file: {str(e)}"
                )]
        async def handle_list_files(args: Dict[str, Any]) -> List[TextContent]:
            """Handle list_files tool."""
            try:
                pattern = args.get("pattern", "**/*")
                max_files = self.config.scan.max_files_per_scan
                files = []
                for path in self.watch_dir.glob(pattern):
                    if not path.is_file():
                        continue
                    try:
                        rel_path = path.relative_to(self.watch_dir)
                        files.append(str(rel_path))
                    except ValueError:
                        # Path outside watch directory, skip
                        continue
                    if len(files) >= max_files:
                        break
                result = {
                    "files": files,
                    "count": len(files),
                    "truncated": len(files) >= max_files
                }
                return [TextContent(
                    type="text",
                    text=json.dumps(result, indent=2)
                )]
            except Exception as e:
                logger.error(f"Error in list_files: {e}", exc_info=True)
                return [TextContent(
                    type="text",
                    text=f"Error: Failed to list files: {str(e)}"
                )]
        async def handle_search_in_files(args: Dict[str, Any]) -> List[TextContent]:
            """Handle search_in_files tool."""
            try:
                query = args.get("query")
                if not query:
                    return [TextContent(
                        type="text",
                        text="Error: Missing required argument 'query'"
                    )]
                pattern = args.get("pattern", "**/*")
                max_files = self.config.scan.max_files_per_scan
                max_bytes = self.config.scan.max_file_size_kb * 1024
                matches = []
                files_searched = 0
                for path in self.watch_dir.glob(pattern):
                    if files_searched >= max_files:
                        break
                    if not path.is_file():
                        continue
                    try:
                        # Skip files that are too large
                        if path.stat().st_size > max_bytes:
                            continue
                        content = path.read_text(encoding='utf-8').lower()
                        if query.lower() in content:
                            rel_path = path.relative_to(self.watch_dir)
                            matches.append({"filepath": str(rel_path)})
                        files_searched += 1
                    except (UnicodeDecodeError, IOError):
                        # Skip files that can't be read
                        continue
                    except ValueError:
                        # Path outside watch directory
                        continue
                result = {
                    "query": query,
                    "matches": matches,
                    "files_searched": files_searched,
                    "truncated": files_searched >= max_files
                }
                return [TextContent(
                    type="text",
                    text=json.dumps(result, indent=2)
                )]
            except Exception as e:
                logger.error(f"Error in search_in_files: {e}", exc_info=True)
                return [TextContent(
                    type="text",
                    text=f"Error: Failed to search files: {str(e)}"
                )]
        async def handle_llm_query(args: Dict[str, Any]) -> List[TextContent]:
            """Handle llm_query tool."""
            try:
                prompt = args.get("prompt")
                if not prompt:
                    return [TextContent(
                        type="text",
                        text="Error: Missing required argument 'prompt'"
                    )]
                # Default to grok for reliability
                provider = args.get("provider", "grok")
                # Get recent context
                context = json.dumps(self.memory.get_context(5), indent=2)
                # Query LLM
                response = await self.llm.query(prompt, provider, context=context)
                # Store response in memory
                timestamp = datetime.now().isoformat()
                self.memory.remember(
                    f"{provider}_{timestamp}",
                    response,
                    "llm"
                )
                return [TextContent(type="text", text=response)]
            except Exception as e:
                logger.error(f"Error in llm_query: {e}", exc_info=True)
                return [TextContent(
                    type="text",
                    text=f"Error: LLM query failed: {str(e)}"
                )]
        async def handle_remember(args: Dict[str, Any]) -> List[TextContent]:
            """Handle remember tool."""
            try:
                key = args.get("key")
                value = args.get("value")
                if not key or value is None:
                    return [TextContent(
                        type="text",
                        text="Error: Missing required arguments 'key' or 'value'"
                    )]
                category = args.get("category", "general")
                self.memory.remember(key, value, category)
                return [TextContent(
                    type="text",
                    text=f"Successfully stored memory: category={category}, key={key}"
                )]
            except Exception as e:
                logger.error(f"Error in remember: {e}", exc_info=True)
                return [TextContent(
                    type="text",
                    text=f"Error: Failed to store memory: {str(e)}"
                )]
        async def handle_recall(args: Dict[str, Any]) -> List[TextContent]:
            """Handle recall tool."""
            try:
                key = args.get("key")
                category = args.get("category")
                data = self.memory.recall(key, category)
                return [TextContent(
                    type="text",
                    text=json.dumps(data, indent=2)
                )]
            except Exception as e:
                logger.error(f"Error in recall: {e}", exc_info=True)
                return [TextContent(
                    type="text",
                    text=f"Error: Failed to recall memory: {str(e)}"
                )]
        async def handle_get_recent_changes(args: Dict[str, Any]) -> List[TextContent]:
            """Handle get_recent_changes tool."""
            try:
                count = args.get("count", 10)
                changes = self.recent_changes[-count:] if self.recent_changes else []
                result = {
                    "changes": changes,
                    "count": len(changes)
                }
                return [TextContent(
                    type="text",
                    text=json.dumps(result, indent=2)
                )]
            except Exception as e:
                logger.error(f"Error in get_recent_changes: {e}", exc_info=True)
                return [TextContent(
                    type="text",
                    text=f"Error: Failed to get recent changes: {str(e)}"
                )]
        # Register tool handlers
        tool_handlers: Dict[str, Callable[[Dict[str, Any]], Awaitable[List[TextContent]]]] = {
            "read_file": handle_read_file,
            "list_files": handle_list_files,
            "search_in_files": handle_search_in_files,
            "llm_query": handle_llm_query,
            "remember": handle_remember,
            "recall": handle_recall,
            "get_recent_changes": handle_get_recent_changes,
        }
        @self.server.call_tool()
        async def handle_tool_call(
            tool_name: str,
            arguments: Optional[Dict[str, Any]]
        ) -> List[TextContent]:
            """Route tool calls to appropriate handlers."""
            handler = tool_handlers.get(tool_name)
            if not handler:
                return [TextContent(
                    type="text",
                    text=f"Error: Unknown tool '{tool_name}'"
                )]
            return await handler(arguments or {})
    async def run(self) -> None:
        """Run the MCP server."""
        logger.info("MCP Server starting...")
        try:
            async with stdio_server() as (read, write):
                await self.server.run(
                    read,
                    write,
                    self.server.create_initialization_options()
                )
        except Exception as e:
            logger.error(f"Server error: {e}", exc_info=True)
            raise
        finally:
            self.stop()
    def stop(self) -> None:
        """Stop the server and cleanup resources."""
        logger.info("Shutting down MCP server...")
        if self.observer:
            try:
                self.observer.stop()
                self.observer.join(timeout=5)
            except Exception as e:
                logger.error(f"Error stopping file watcher: {e}")
            finally:
                self.observer = None
        logger.info("MCP server stopped")
# ==================== ENTRY POINT ====================
if __name__ == "__main__":
    import sys
    config_path = sys.argv[1] if len(sys.argv) > 1 else "fgd_config.yaml"
    try:
        server = FGDMCPServer(config_path)
        asyncio.run(server.run())
    except KeyboardInterrupt:
        logger.info("Received keyboard interrupt")
    except Exception as e:
        logger.error(f"Fatal error: {e}", exc_info=True)
        sys.exit(1)