Skip to main content
Glama
mcp_backend.pyβ€’64.2 kB
#!/usr/bin/env python3 """ MCPM v5.0 – Full Filesystem Co‑Pilot * Atomic writes + .bak backups * .gitignore filtering * Reference project support * Rich metadata on reads * Git diff / commit / log """ import os import json import hashlib import logging import platform from pathlib import Path, PureWindowsPath from datetime import datetime from typing import List, Dict, Any, Optional import asyncio import yaml import aiohttp import openai import shutil import subprocess from watchdog.observers import Observer from watchdog.events import FileSystemEventHandler import traceback from dotenv import load_dotenv import time import uuid # Cross-platform file locking try: import fcntl # Unix/Linux/Mac HAS_FCNTL = True except ImportError: HAS_FCNTL = False try: import msvcrt # Windows HAS_MSVCRT = True except ImportError: HAS_MSVCRT = False from mcp.server import Server from mcp.server.stdio import stdio_server from mcp.types import Tool, TextContent from pydantic import ValidationError logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) # Ensure environment variables from a local .env are available when running the backend directly load_dotenv() # --------------------------------------------------------------------------- # # -------------------------- HELPER CLASSES --------------------------------- # # --------------------------------------------------------------------------- # class FileChangeHandler(FileSystemEventHandler): def __init__(self, callback): self.callback = callback def on_modified(self, event): if not event.is_directory: self.callback('modified', event.src_path) def on_created(self, event): if not event.is_directory: self.callback('created', event.src_path) def on_deleted(self, event): if not event.is_directory: self.callback('deleted', event.src_path) class FileLock: """Cross-platform file locking context manager.""" def __init__(self, file_path, timeout=10): self.file_path = Path(file_path) self.lock_file = self.file_path.with_suffix('.lock') self.timeout = timeout self.lock_fd = None def __enter__(self): """Acquire lock with timeout.""" start_time = time.time() while True: try: # Create lock file self.lock_fd = open(self.lock_file, 'w') if HAS_FCNTL: # Unix/Linux/Mac: Use fcntl fcntl.flock(self.lock_fd.fileno(), fcntl.LOCK_EX | fcntl.LOCK_NB) elif HAS_MSVCRT: # Windows: Use msvcrt msvcrt.locking(self.lock_fd.fileno(), msvcrt.LK_NBLCK, 1) else: # No locking available - log warning logger.warning("File locking not available on this platform") return self except (IOError, OSError) as e: # Lock is held by another process if self.lock_fd: self.lock_fd.close() self.lock_fd = None if time.time() - start_time >= self.timeout: raise TimeoutError(f"Could not acquire lock on {self.lock_file} after {self.timeout}s") # Wait a bit before retrying time.sleep(0.1) def __exit__(self, exc_type, exc_val, exc_tb): """Release lock.""" if self.lock_fd: try: if HAS_FCNTL: fcntl.flock(self.lock_fd.fileno(), fcntl.LOCK_UN) elif HAS_MSVCRT: msvcrt.locking(self.lock_fd.fileno(), msvcrt.LK_UNLCK, 1) except: pass finally: self.lock_fd.close() # Clean up lock file try: self.lock_file.unlink() except: pass class MemoryStore: def __init__(self, memory_file: Path, config: Dict): self.memory_file = memory_file self.limit = config.get('context_limit', 20) self.max_memory_entries = config.get('max_memory_entries', 1000) # P2 FIX: MEMORY-5 loaded_data = self._load() self.memories = loaded_data.get('memories', {}) if isinstance(loaded_data, dict) else loaded_data self.context = loaded_data.get('context', []) if isinstance(loaded_data, dict) else [] self._prune_if_needed() # P2 FIX: MEMORY-5 - prune on startup if needed def _load(self): if self.memory_file.exists(): try: # Use file locking to prevent reading during writes with FileLock(self.memory_file, timeout=5): data = json.loads(self.memory_file.read_text()) # Handle both old format (just memories) and new format (memories + context) if isinstance(data, dict) and 'memories' in data: return data else: # Old format - just memories dict return {'memories': data, 'context': []} except TimeoutError as e: logger.warning(f"Memory load timeout (file locked): {e}") return {'memories': {}, 'context': []} except Exception as e: logger.error(f"Memory load error: {e}") return {'memories': {}, 'context': []} return {'memories': {}, 'context': []} def _save(self): try: logger.debug(f"πŸ’Ύ Saving memory to: {self.memory_file.resolve()}") # Save both memories and context to persist all data full_data = { "memories": self.memories, "context": self.context } # Use file locking to prevent concurrent writes with FileLock(self.memory_file, timeout=10): # Atomic write: write to temp file, then rename temp_file = self.memory_file.with_suffix('.tmp') temp_file.write_text(json.dumps(full_data, indent=2)) # Set restrictive permissions (600 = owner read/write only) try: os.chmod(temp_file, 0o600) except Exception as perm_error: logger.warning(f"Could not set file permissions: {perm_error}") # Atomic rename with Windows fallback try: temp_file.replace(self.memory_file) except OSError as replace_error: # Windows sometimes locks files - try direct write as fallback try: self.memory_file.write_text(json.dumps(full_data, indent=2)) temp_file.unlink(missing_ok=True) logger.debug("Memory saved using fallback write method (Windows lock workaround)") except Exception as fallback_error: logger.error(f"Fallback write also failed: {fallback_error}") raise replace_error # Verify write succeeded if self.memory_file.exists(): size = self.memory_file.stat().st_size logger.debug(f"βœ… Memory saved: {self.memory_file.resolve()} ({size} bytes)") except Exception as e: logger.error(f"❌ Memory save error to {self.memory_file.resolve()}: {e}") # CRITICAL FIX: Re-raise exception to prevent silent data loss raise def remember(self, key, value, category="general"): if category not in self.memories: self.memories[category] = {} self.memories[category][key] = { "value": value, "timestamp": datetime.now().isoformat(), "access_count": 0 } self._prune_if_needed() # P2 FIX: MEMORY-5 - prune before saving self._save() def recall(self, key=None, category=None): if key and category and category in self.memories and key in self.memories[category]: self.memories[category][key]["access_count"] += 1 self._save() return {key: self.memories[category][key]} elif category: return self.memories.get(category, {}) return self.memories def add_context(self, type_, data): self.context.append({"type": type_, "data": data, "timestamp": datetime.now().isoformat()}) if len(self.context) > self.limit: self.context = self.context[-self.limit:] self._save() # FIX: Persist context to disk immediately def _prune_if_needed(self): """Prune old memory entries if size exceeds limit (P2 FIX: MEMORY-5).""" try: # Count total entries across all categories total_entries = sum(len(entries) for entries in self.memories.values()) if total_entries <= self.max_memory_entries: return # No pruning needed # Calculate how many to remove entries_to_remove = total_entries - self.max_memory_entries logger.info(f"Pruning {entries_to_remove} memory entries (total: {total_entries}, max: {self.max_memory_entries})") # Collect all entries with metadata for LRU sorting all_entries = [] for category, entries in self.memories.items(): for key, value in entries.items(): all_entries.append({ 'category': category, 'key': key, 'timestamp': value.get('timestamp', ''), 'access_count': value.get('access_count', 0), 'value': value }) # Sort by access_count (ascending) then timestamp (oldest first) # This implements LRU: least accessed and oldest entries are removed first all_entries.sort(key=lambda x: (x['access_count'], x['timestamp'])) # Remove the least recently used entries for i in range(entries_to_remove): entry = all_entries[i] if entry['category'] in self.memories and entry['key'] in self.memories[entry['category']]: del self.memories[entry['category']][entry['key']] # Clean up empty categories self.memories = {k: v for k, v in self.memories.items() if v} logger.info(f"βœ… Pruned {entries_to_remove} entries. New total: {sum(len(e) for e in self.memories.values())}") except Exception as e: logger.error(f"Error pruning memory: {e}") def get_context(self): return self.context class LLMBackend: def __init__(self, config: Dict): self.config = config['llm'] self.default = config['llm']['default_provider'] self._normalize_provider_config() def _normalize_provider_config(self) -> None: """Patch known-bad legacy provider settings at runtime.""" providers = self.config.get('providers', {}) grok_conf = providers.get('grok') if not grok_conf: return # Ensure the Grok provider always points at a supported model/base URL so the # connection diagnosis downstream has predictable defaults. These settings can # still be overridden explicitly in fgd_config.yaml, but we heal the most common # legacy misconfigurations here to keep the chat experience online. if not grok_conf.get('model'): logger.warning( "Grok provider missing model configuration; defaulting to 'grok-3'" ) grok_conf['model'] = 'grok-3' elif grok_conf.get('model') == 'grok-beta': logger.warning( "Grok provider configured with deprecated 'grok-beta' model; upgrading to 'grok-3'" ) grok_conf['model'] = 'grok-3' if not grok_conf.get('base_url'): logger.warning( "Grok provider missing base_url; defaulting to https://api.x.ai/v1" ) grok_conf['base_url'] = 'https://api.x.ai/v1' def _diagnose_grok_error(self, status: int, body: str, model: str, base_url: str) -> str: """Return a detailed error message for Grok failures and log diagnostics.""" try: payload = json.loads(body) except json.JSONDecodeError: payload = None error_message = None if isinstance(payload, dict): error_message = payload.get('error') if isinstance(error_message, dict): error_message = error_message.get('message') or error_message.get('error') detail = error_message or (body.strip() if body else "<no response body>") logger.error( "Grok API request failed | status=%s | model=%s | endpoint=%s/chat/completions | detail=%s", status, model, base_url, detail, ) hints = [] lowered_detail = detail.lower() if status == 401: hints.append("Verify XAI_API_KEY is set and valid.") elif status == 404 and ("model" in lowered_detail or "not found" in lowered_detail): hints.append( "Model not found. Confirm your configuration uses the supported 'grok-3' model." ) elif status == 429: hints.append("Rate limited by Grok. Wait before retrying or reduce request frequency.") elif status >= 500: hints.append("xAI service reported a server error. Retry later or check status.x.ai.") message_parts = [f"Grok API Error {status}", detail] if hints: message_parts.append(" ".join(hints)) return " - ".join(part for part in message_parts if part) @staticmethod def _extract_message_content(response: Dict[str, Any], provider: str) -> str: """Normalize chat completion responses across providers.""" try: message = response['choices'][0]['message']['content'] except (KeyError, IndexError, TypeError): logger.error( "Unexpected %s response payload: %s", provider, response, exc_info=True, ) return f"Error: Unexpected response format from {provider} API" if isinstance(message, list): segments = [] for entry in message: if isinstance(entry, dict): text = entry.get('text') if text: segments.append(text) elif isinstance(entry, str): segments.append(entry) if segments: message = "".join(segments) else: message = str(message) if not isinstance(message, str): message = str(message) return message async def _retry_request(self, func, max_retries=3, initial_delay=2): """Retry helper for network requests (P2 FIX: MCP-5).""" last_error = None for attempt in range(max_retries): try: return await func() except (aiohttp.ClientError, asyncio.TimeoutError) as e: last_error = e if attempt < max_retries - 1: delay = initial_delay * (2 ** attempt) # Exponential backoff logger.warning(f"Request failed (attempt {attempt + 1}/{max_retries}), retrying in {delay}s: {e}") await asyncio.sleep(delay) else: logger.error(f"Request failed after {max_retries} attempts: {e}") raise last_error async def query(self, prompt: str, provider: str = None, model: str = None, context: str = "") -> str: provider = provider or self.default conf = self.config['providers'].get(provider) if not conf: return f"Error: Provider '{provider}' not configured" full_prompt = f"{context}\n\n{prompt}" if context else prompt model = model or conf['model'] base_url = conf['base_url'] # P2 FIX: MCP-3 - Configurable timeouts per provider timeout_seconds = conf.get('timeout', 30) # Default 30s if not configured timeout = aiohttp.ClientTimeout(total=timeout_seconds) try: if provider == "grok": api_key = os.getenv("XAI_API_KEY") if not api_key: return "Error: XAI_API_KEY not set" headers = {"Authorization": f"Bearer {api_key}", "Content-Type": "application/json"} data = {"model": model, "messages": [{"role": "user", "content": full_prompt}]} logger.debug( "Dispatching Grok request | model=%s | endpoint=%s/chat/completions", model, base_url, ) # P2 FIX: MCP-5 - Wrap with retry logic async def make_request(): async with aiohttp.ClientSession(timeout=timeout) as session: async with session.post(f"{base_url}/chat/completions", json=data, headers=headers) as r: if r.status != 200: txt = await r.text() return self._diagnose_grok_error(r.status, txt, model, base_url) resp = await r.json() return self._extract_message_content(resp, "grok") return await self._retry_request(make_request) elif provider == "openai": api_key = os.getenv("OPENAI_API_KEY") if not api_key: return "Error: OPENAI_API_KEY not set" headers = {"Authorization": f"Bearer {api_key}", "Content-Type": "application/json"} data = {"model": model, "messages": [{"role": "user", "content": full_prompt}]} # P2 FIX: MCP-5 - Wrap with retry logic async def make_request(): async with aiohttp.ClientSession(timeout=timeout) as session: async with session.post(f"{base_url}/chat/completions", json=data, headers=headers) as r: if r.status != 200: txt = await r.text() return f"OpenAI API Error {r.status}: {txt}" resp = await r.json() return self._extract_message_content(resp, "openai") return await self._retry_request(make_request) elif provider == "claude": api_key = os.getenv("ANTHROPIC_API_KEY") if not api_key: return "Error: ANTHROPIC_API_KEY not set" headers = { "x-api-key": api_key, "anthropic-version": "2023-06-01", "Content-Type": "application/json" } data = { "model": model, "messages": [{"role": "user", "content": full_prompt}], "max_tokens": 4096 } # P2 FIX: MCP-5 - Wrap with retry logic async def make_request(): async with aiohttp.ClientSession(timeout=timeout) as session: async with session.post(f"{base_url}/messages", json=data, headers=headers) as r: if r.status != 200: txt = await r.text() return f"Claude API Error {r.status}: {txt}" resp = await r.json() return resp['content'][0]['text'] return await self._retry_request(make_request) elif provider == "ollama": # Ollama doesn't require an API key (local) headers = {"Content-Type": "application/json"} data = {"model": model, "messages": [{"role": "user", "content": full_prompt}]} # P2 FIX: MCP-5 - Wrap with retry logic async def make_request(): async with aiohttp.ClientSession(timeout=timeout) as session: async with session.post(f"{base_url}/chat/completions", json=data, headers=headers) as r: if r.status != 200: txt = await r.text() return f"Ollama API Error {r.status}: {txt}" resp = await r.json() return self._extract_message_content(resp, "ollama") return await self._retry_request(make_request) else: return f"Provider '{provider}' not supported." except aiohttp.ClientError as e: return f"Network error: {str(e)}" except asyncio.TimeoutError: return "Error: Request timed out" except Exception as e: logger.error(f"LLM query error: {e}", exc_info=True) return f"Error: {str(e)}" # --------------------------------------------------------------------------- # # --------------------------- MAIN SERVER ----------------------------------- # # --------------------------------------------------------------------------- # class FGDMCPServer: def __init__(self, config_path: str): with open(config_path, 'r') as f: self.config = yaml.safe_load(f) # Validate and prepare watch directory before using it watch_dir_path = self._validate_paths() self.watch_dir = self._prepare_watch_dir(watch_dir_path) self.scan = self.config.get('scan', {}) self.max_dir_size = self.scan.get('max_dir_size_gb', 2) * 1_073_741_824 self.max_files = self.scan.get('max_files_per_scan', 5) self.max_file_kb = self.scan.get('max_file_size_kb', 250) * 1024 # Reference projects (read‑only) self.ref_dirs = [Path(p).resolve() for p in self.config.get('reference_dirs', []) if Path(p).exists()] self.memory = MemoryStore(self.watch_dir / ".fgd_memory.json", self.config) self.llm = LLMBackend(self.config) # Verify provider requirements early so the server fails fast if misconfigured default_provider = self.config.get('llm', {}).get('default_provider') if default_provider == 'grok' and not os.getenv("XAI_API_KEY"): raise ValueError( "XAI_API_KEY environment variable is required when using the Grok provider" ) self.recent_changes = [] self.observer = None self._approval_task = None # Track approval monitor task self._start_watcher() self._start_approval_monitor() self.server = Server("fgd-mcp-server") self._setup_handlers() def _validate_paths(self) -> Path: """Validate the configured watch directory and return its resolved path.""" watch_dir_str = str(self.config.get('watch_dir', '')).strip() if not watch_dir_str: raise ValueError("watch_dir is not configured; filesystem tools cannot start") current_os = platform.system() windows_drive = PureWindowsPath(watch_dir_str).drive if windows_drive and current_os != 'Windows': logger.error("=" * 80) logger.error("🚨 CRITICAL PATH CONFIGURATION ERROR 🚨") logger.error("=" * 80) logger.error(f"Running on: {current_os}") logger.error(f"Config has Windows path: {watch_dir_str}") logger.error("This will cause ALL write operations to fail silently!") logger.error("Update fgd_config.yaml with the correct path for your OS.") logger.error("=" * 80) raise ValueError( "watch_dir is configured with a Windows-specific path but the current " f"platform is {current_os}. Update fgd_config.yaml with an OS-appropriate path." ) try: path = Path(watch_dir_str).expanduser() except (TypeError, OSError, RuntimeError) as exc: raise ValueError(f"Failed to interpret watch_dir '{watch_dir_str}': {exc}") from exc try: resolved = path.resolve(strict=False) except (OSError, RuntimeError) as exc: raise ValueError(f"Failed to resolve watch_dir '{watch_dir_str}': {exc}") from exc if resolved.exists() and not resolved.is_dir(): raise ValueError( f"watch_dir '{resolved}' exists but is not a directory; update fgd_config.yaml" ) return resolved def _prepare_watch_dir(self, path: Path) -> Path: """Ensure the watch directory exists, is a directory, and is accessible.""" if not path.exists(): try: logger.warning( f"watch_dir '{path}' does not exist. Creating directory before startup." ) path.mkdir(parents=True, exist_ok=True) except Exception as exc: raise ValueError(f"Failed to create watch_dir '{path}': {exc}") from exc if not path.is_dir(): raise ValueError(f"watch_dir '{path}' is not a directory; update fgd_config.yaml") if not os.access(path, os.R_OK | os.W_OK): raise ValueError( f"watch_dir '{path}' must be readable and writable by the MCP server" ) return path # ------------------------------------------------------------------- # # -------------------------- WATCHER -------------------------------- # # ------------------------------------------------------------------- # def _start_watcher(self): try: handler = FileChangeHandler(self._on_file_change) self.observer = Observer() self.observer.schedule(handler, str(self.watch_dir), recursive=True) self.observer.start() logger.info("File watcher started") except Exception as e: logger.warning(f"File watcher failed: {e}") def _start_approval_monitor(self): """Will start background task to monitor for approval files in run() method.""" logger.info("Approval monitor will be started when event loop is ready") async def _approval_monitor_loop(self): """Background loop to check for approval files and auto-apply edits.""" try: while True: await asyncio.sleep(2) # Check every 2 seconds approval_file = self.watch_dir / ".fgd_approval.json" if not approval_file.exists(): continue # Read approval approval_data = json.loads(approval_file.read_text()) if approval_data.get("approved"): # Execute the edit filepath = approval_data["filepath"] old_text = approval_data["old_text"] new_text = approval_data["new_text"] logger.info(f"πŸ”΅ Auto-applying approved edit: {filepath}") try: path = self._sanitize(filepath) content = path.read_text(encoding='utf-8') new_content = content.replace(old_text, new_text, 1) # Create backup backup = path.with_suffix('.bak') if path.exists(): shutil.copy2(path, backup) logger.info(f"πŸ“ Backup created: {backup.resolve()}") # Write new content logger.info(f"✍️ Auto-applying edit to: {path.resolve()}") path.write_text(new_content, encoding='utf-8') # Verify write succeeded if path.exists(): size = path.stat().st_size logger.info(f"βœ… Auto-edit verified: {path.resolve()} ({size} bytes)") else: logger.error(f"❌ Auto-edit failed: File does not exist after write") self.memory.add_context("file_edit", { "path": filepath, "approved": True, "auto_applied": True, "resolved_path": str(path.resolve()) }) logger.info(f"βœ… Edit successfully applied: {filepath} at {path.resolve()} (backup: {backup.name})") except Exception as e: logger.error(f"❌ Failed to apply edit to {filepath}: {e}") else: logger.info(f"❌ Edit rejected by user: {approval_data.get('filepath')}") # Clean up approval file approval_file.unlink() except asyncio.CancelledError: logger.info("Approval monitor cancelled, shutting down cleanly") raise # Re-raise to allow proper cancellation except Exception as e: logger.error(f"Approval monitor error: {e}", exc_info=True) def _on_file_change(self, event_type, path): try: rel = str(Path(path).relative_to(self.watch_dir)) self.recent_changes.append({ "type": event_type, "path": rel, "timestamp": datetime.now().isoformat() }) if len(self.recent_changes) > 50: self.recent_changes = self.recent_changes[-50:] self.memory.add_context("file_change", {"type": event_type, "path": rel}) except ValueError: # Path is outside watch_dir, ignore pass except Exception as e: logger.debug(f"Error processing file change for {path}: {e}") # ------------------------------------------------------------------- # # -------------------------- HELPERS -------------------------------- # # ------------------------------------------------------------------- # def _sanitize(self, rel, base: Path = None): base = base or self.watch_dir p = (base / rel).resolve() if not str(p).startswith(str(base)): raise ValueError("Path traversal blocked") return p def _get_gitignore_patterns(self, root: Path) -> List[str]: """Load gitignore patterns from .gitignore file""" gitignore = root / ".gitignore" if not gitignore.exists(): return [] try: return [line.strip() for line in gitignore.read_text().splitlines() if line.strip() and not line.startswith('#')] except Exception as e: logger.warning(f"Failed to read .gitignore: {e}") return [] def _matches_gitignore(self, path: Path, patterns: List[str]) -> bool: """Check if path matches any gitignore pattern using fnmatch""" import fnmatch try: rel = str(path.relative_to(self.watch_dir)) # Normalize for cross-platform compatibility rel_posix = rel.replace(os.sep, '/') for pat in patterns: # Handle directory patterns (ending with /) if pat.endswith('/'): pat = pat.rstrip('/') if fnmatch.fnmatch(rel_posix, pat) or fnmatch.fnmatch(rel_posix, f"{pat}/*"): return True # Handle patterns with directory separators elif '/' in pat: if fnmatch.fnmatch(rel_posix, pat): return True # Handle simple filename patterns - check against full path and basename else: if fnmatch.fnmatch(path.name, pat) or fnmatch.fnmatch(rel_posix, f"**/{pat}"): return True return False except Exception as e: logger.debug(f"Error matching gitignore for {path}: {e}") return False def _save_pending_edit(self, payload: Dict[str, Any]) -> Path: """Persist a pending edit file for GUI confirmation.""" pending_edit_file = self.watch_dir / ".fgd_pending_edit.json" logger.info("πŸ’Ύ Saving pending edit to: %s", pending_edit_file.resolve()) pending_edit_file.write_text(json.dumps(payload, indent=2)) return pending_edit_file def _is_git_repo(self) -> bool: """Check if watch_dir is a git repository""" return (self.watch_dir / ".git").exists() def _check_git_available(self) -> Optional[str]: """Check if git is available and repo is initialized. Returns error message or None""" try: result = subprocess.run( ["git", "--version"], capture_output=True, timeout=5 ) if result.returncode != 0: return "Git is not installed or not in PATH" except FileNotFoundError: return "Git is not installed or not in PATH" except Exception as e: return f"Git check failed: {e}" if not self._is_git_repo(): return "Directory is not a git repository (no .git folder found)" return None def _get_mcp_status_context(self) -> str: """Generate MCP server status and capabilities info for LLM context""" status_info = { "mcp_server": { "status": "connected", "name": "fgd-mcp-server", "version": "5.0", "watch_directory": str(self.watch_dir), "available_tools": [ { "name": "list_directory", "description": "List files in a directory (gitignore aware)", "usage": "Use this to explore the project structure" }, { "name": "read_file", "description": "Read file contents with metadata", "usage": "Use this to read and analyze files" }, { "name": "write_file", "description": "Write/create files with automatic backups", "usage": "Use this to create or overwrite files" }, { "name": "edit_file", "description": "Edit files with diff preview and approval workflow", "usage": "Use this to make precise edits to existing files" }, { "name": "git_diff", "description": "Show git diff for changes", "usage": "Use this to review uncommitted changes" }, { "name": "git_commit", "description": "Commit changes to git", "usage": "Use this to save changes to version control" }, { "name": "git_log", "description": "Show git commit history", "usage": "Use this to view recent commits" }, { "name": "llm_query", "description": "Ask questions to Grok (this tool you're using now)", "usage": "Recursive - you're currently using this tool" } ], "capabilities": [ "File system operations (read, write, edit)", "Git integration (diff, commit, log)", "Automatic backups before modifications", "Gitignore-aware file filtering", "Context-aware memory system", "File watching and change detection" ], "reference_projects": [str(p) for p in self.ref_dirs] if self.ref_dirs else [] } } return f"\n\n=== MCP SERVER STATUS ===\n{json.dumps(status_info, indent=2)}\n=== END MCP STATUS ===\n" # ------------------------------------------------------------------- # # --------------------------- TOOLS --------------------------------- # # ------------------------------------------------------------------- # def _setup_handlers(self): @self.server.list_tools() async def list_tools(): return [ Tool(name="list_directory", description="List files (gitignore aware)", inputSchema={ "type": "object", "properties": {"path": {"type": "string", "default": "."}} }), Tool(name="read_file", description="Read file + metadata", inputSchema={ "type": "object", "properties": {"filepath": {"type": "string"}}, "required": ["filepath"] }), Tool(name="write_file", description="Write file (backup)", inputSchema={ "type": "object", "properties": {"filepath": {"type": "string"}, "content": {"type": "string"}}, "required": ["filepath", "content"] }), Tool(name="edit_file", description="Edit with diff preview", inputSchema={ "type": "object", "properties": { "filepath": {"type": "string"}, "old_text": {"type": "string"}, "new_text": {"type": "string"}, "confirm": {"type": "boolean", "default": False} }, "required": ["filepath", "old_text", "new_text"] }), Tool(name="git_diff", description="Show git diff", inputSchema={ "type": "object", "properties": {"files": {"type": "array", "items": {"type": "string"}}} }), Tool(name="git_commit", description="Commit changes", inputSchema={ "type": "object", "properties": {"message": {"type": "string"}}, "required": ["message"] }), Tool(name="git_log", description="Show git log", inputSchema={ "type": "object", "properties": {"limit": {"type": "integer", "default": 5}} }), Tool(name="llm_query", description="Ask Grok", inputSchema={ "type": "object", "properties": {"prompt": {"type": "string"}}, "required": ["prompt"] }), Tool(name="create_directory", description="Create directory (with parents) - P2-FIX", inputSchema={ "type": "object", "properties": {"path": {"type": "string"}}, "required": ["path"] }) ] # ---------- TOOL CALL DISPATCHER ---------- @self.server.call_tool() async def call_tool(name: str, arguments: dict): """Handle all tool calls with a dispatcher pattern""" # ---------- LIST DIRECTORY ---------- if name == "list_directory": rel_path = arguments.get("path", ".") path = self._sanitize(rel_path) # P2-FIX (MCP-CONN): Check if path exists before processing if not path.exists(): logger.warning(f"List failed: Directory does not exist: {path.resolve()}") return [TextContent(type="text", text=f"Error: Directory does not exist: {path.resolve()}")] if not path.is_dir(): logger.warning(f"List failed: Not a directory: {path.resolve()}") return [TextContent(type="text", text=f"Error: Not a directory: {path.resolve()}")] try: patterns = self._get_gitignore_patterns(self.watch_dir) files = [] filtered_hidden = 0 filtered_gitignore = 0 for p in path.iterdir(): if p.name.startswith('.'): filtered_hidden += 1 logger.debug(f"Filtered (hidden): {p.name}") continue if self._matches_gitignore(p, patterns): filtered_gitignore += 1 logger.debug(f"Filtered (gitignore): {p.name}") continue files.append({ "name": p.name, "is_dir": p.is_dir(), "size": p.stat().st_size if p.is_file() else 0 }) # P2-FIX (MCP-CONN): Include detailed metadata about filtering total_entries = len(files) + filtered_hidden + filtered_gitignore result = { "path": str(path.resolve()), "files": files, "file_count": len(files), "filtered_count": filtered_hidden + filtered_gitignore, "filtered_hidden": filtered_hidden, "filtered_gitignore": filtered_gitignore, "total_entries": total_entries, "note": f"Showing {len(files)} visible files ({filtered_hidden} hidden, {filtered_gitignore} gitignored)" } logger.info(f"πŸ“‚ Listed {path.resolve()}: {len(files)} visible, {filtered_hidden} hidden, {filtered_gitignore} gitignored") return [TextContent(type="text", text=json.dumps(result, indent=2))] except PermissionError: logger.error(f"List failed: Permission denied: {path.resolve()}") return [TextContent(type="text", text=f"Error: Permission denied accessing {path.resolve()}")] except Exception as e: logger.error(f"List failed: {path.resolve()}\nException: {e}\n{traceback.format_exc()}") return [TextContent(type="text", text=f"Error: Failed to list {path.resolve()}\nReason: {e}")] # ---------- READ FILE ---------- elif name == "read_file": try: path = self._sanitize(arguments["filepath"]) if path.stat().st_size > self.max_file_kb: return [TextContent(type="text", text="Error: File too large (>250KB)")] content = path.read_text(encoding='utf-8') stat = path.stat() meta = { "size_kb": round(stat.st_size / 1024, 2), "modified": datetime.fromtimestamp(stat.st_mtime).isoformat(), "lines": len(content.splitlines()) } self.memory.add_context("file_read", {"path": arguments["filepath"], "meta": meta}) return [TextContent(type="text", text=json.dumps({"content": content, "meta": meta}, indent=2))] except Exception as e: return [TextContent(type="text", text=f"Error: {e}")] # ---------- WRITE FILE ---------- elif name == "write_file": filepath = arguments["filepath"] content = arguments["content"] path = self._sanitize(filepath) try: # P2-FIX (MCP-CONN): Ensure parent directory exists before writing parent_dir = path.parent if not parent_dir.exists(): logger.info(f"πŸ“ Parent directory missing, creating: {parent_dir.resolve()}") parent_dir.mkdir(parents=True, exist_ok=True) logger.info(f"βœ… Parent directory created: {parent_dir.resolve()}") # Create backup if file exists backup = path.with_suffix('.bak') if path.exists(): shutil.copy2(path, backup) logger.info(f"πŸ“ Backup created: {backup.resolve()}") self.memory.add_context("backup", {"path": str(backup), "original": filepath}) # Write file logger.info(f"✍️ Writing file to: {path.resolve()}") path.write_text(content, encoding='utf-8') # P2-FIX (MCP-CONN): Comprehensive verification if path.exists(): size = path.stat().st_size # Verify content matches actual_content = path.read_text(encoding='utf-8') if actual_content == content: logger.info(f"βœ… Write verified: {path.resolve()} ({size} bytes, content matches)") self.memory.add_context("file_write", { "path": filepath, "resolved_path": str(path.resolve()), "size": size, "verified": True }) return [TextContent(type="text", text=f"""βœ… File Written Successfully Location: {path.resolve()} Size: {size} bytes Content: Verified Backup: {backup.name if backup.exists() else 'None'}""")] else: logger.error(f"❌ Write failed: Content mismatch at {path.resolve()}") return [TextContent(type="text", text=f"Error: File written but content verification failed at {path.resolve()}")] else: logger.error(f"❌ Write failed: File does not exist after write at {path.resolve()}") return [TextContent(type="text", text=f"Error: File was not created at {path.resolve()}")] except PermissionError as e: logger.error(f"❌ Permission denied writing to {filepath}: {e}") return [TextContent(type="text", text=f"Error: Permission denied\nPath: {path.resolve()}\nReason: {e}")] except OSError as e: # P2-FIX (MCP-CONN): Better error context context_info = f""" Path: {path.resolve()} Parent exists: {path.parent.exists()} Grandparent exists: {path.parent.parent.exists()} Watch dir: {self.watch_dir.resolve()} Within watch dir: {str(path).startswith(str(self.watch_dir))}""" logger.error(f"❌ OS error writing to {filepath}: {e}\nContext:{context_info}") return [TextContent(type="text", text=f"Error: Failed to write file\nPath: {path.resolve()}\nReason: {e}\nContext: Check if all parent directories can be created")] except Exception as e: logger.error(f"❌ Unexpected error writing {filepath}: {e}\n{traceback.format_exc()}") return [TextContent(type="text", text=f"Error: Unexpected error writing {filepath}\nReason: {e}\nType: {type(e).__name__}")] # ---------- EDIT FILE ---------- elif name == "edit_file": filepath = arguments["filepath"] old_text = arguments["old_text"] new_text = arguments["new_text"] confirm = arguments.get("confirm", False) path = self._sanitize(filepath) if not path.exists(): return [TextContent(type="text", text="File not found")] content = path.read_text(encoding='utf-8') if old_text not in content: return [TextContent(type="text", text="Old text not found")] if not confirm: preview = content.replace(old_text, new_text, 1) pending_edit_data = { "filepath": filepath, "old_text": old_text, "new_text": new_text, "diff": f"- {old_text}\n+ {new_text}", "preview": preview[:500], "timestamp": datetime.now().isoformat() } try: pending_edit_file = self._save_pending_edit(pending_edit_data) except OSError as exc: logger.error("❌ Failed to save pending edit: %s", exc) return [TextContent(type="text", text=f"Error: Unable to save pending edit ({exc})")] logger.info( "βœ… Pending edit saved: %s (%d bytes)", pending_edit_file.resolve(), pending_edit_file.stat().st_size, ) return [TextContent(type="text", text=json.dumps({ "action": "confirm_edit", "filepath": filepath, "diff": f"- {old_text}\n+ {new_text}", "preview": preview[:500], "message": "Edit pending approval - check GUI" }, indent=2))] try: new_content = content.replace(old_text, new_text, 1) backup = path.with_suffix('.bak') if path.exists(): shutil.copy2(path, backup) logger.info(f"πŸ“ Backup created: {backup.resolve()}") # DEBUG: Log actual write location logger.info(f"✍️ Applying confirmed edit to: {path.resolve()}") path.write_text(new_content, encoding='utf-8') # Verify write succeeded if path.exists(): size = path.stat().st_size logger.info(f"βœ… Edit applied and verified: {path.resolve()} ({size} bytes)") else: logger.error(f"❌ Edit failed: File does not exist after write: {path.resolve()}") self.memory.add_context("file_edit", {"path": filepath, "approved": True, "resolved_path": str(path.resolve())}) # Clean up pending edit file pending_edit_file = self.watch_dir / ".fgd_pending_edit.json" if pending_edit_file.exists(): pending_edit_file.unlink() logger.info(f"🧹 Cleaned up pending edit file") return [TextContent(type="text", text=f"βœ… Approved! File updated + backup: {backup.name}\nActual location: {path.resolve()}")] except Exception as e: logger.error(f"❌ Edit error for {filepath}: {e}") return [TextContent(type="text", text=f"Error: {e}")] # ---------- GIT DIFF ---------- elif name == "git_diff": git_error = self._check_git_available() if git_error: return [TextContent(type="text", text=f"Error: {git_error}")] files = arguments.get("files", []) try: result = subprocess.run( ["git", "diff", "--", *files], cwd=str(self.watch_dir), capture_output=True, text=True, timeout=30 ) diff = result.stdout or "No changes" self.memory.remember(f"diff_{datetime.now().isoformat()}", diff, "git_diffs") return [TextContent(type="text", text=diff)] except Exception as e: logger.error(f"Git diff failed: {e}") return [TextContent(type="text", text=f"Git error: {e}")] # ---------- GIT COMMIT ---------- elif name == "git_commit": git_error = self._check_git_available() if git_error: return [TextContent(type="text", text=f"Error: {git_error}")] message = arguments["message"] if not message or not message.strip(): return [TextContent(type="text", text="Error: Commit message cannot be empty")] try: # Check if there are changes to commit status_result = subprocess.run( ["git", "status", "--porcelain"], cwd=str(self.watch_dir), capture_output=True, text=True, timeout=10 ) if not status_result.stdout.strip(): return [TextContent(type="text", text="No changes to commit")] subprocess.run(["git", "add", "."], cwd=str(self.watch_dir), check=True, timeout=30) result = subprocess.run( ["git", "commit", "-m", message], cwd=str(self.watch_dir), capture_output=True, text=True, check=True, timeout=30 ) commit_hash = result.stdout.split()[1] if "commit" in result.stdout else "unknown" self.memory.remember(f"commit_{commit_hash}", message, "commits") return [TextContent(type="text", text=f"Committed: {commit_hash}\n{message}")] except subprocess.CalledProcessError as e: logger.error(f"Git commit failed: {e.stderr if e.stderr else e}") return [TextContent(type="text", text=f"Commit failed: {e.stderr if e.stderr else str(e)}")] except Exception as e: logger.error(f"Git commit error: {e}") return [TextContent(type="text", text=f"Commit failed: {e}")] # ---------- GIT LOG ---------- elif name == "git_log": git_error = self._check_git_available() if git_error: return [TextContent(type="text", text=f"Error: {git_error}")] limit = arguments.get("limit", 5) try: result = subprocess.run( ["git", "log", f"-{limit}", "--oneline"], cwd=str(self.watch_dir), capture_output=True, text=True, timeout=30 ) log_output = result.stdout if result.stdout else "No commits yet" return [TextContent(type="text", text=log_output)] except Exception as e: logger.error(f"Git log failed: {e}") return [TextContent(type="text", text=f"Git log error: {e}")] # ---------- LLM QUERY ---------- # ---------- CREATE DIRECTORY (P2-FIX) ---------- elif name == "create_directory": rel_path = arguments.get("path") if not rel_path: return [TextContent(type="text", text="Error: path argument required")] path = self._sanitize(rel_path) try: if path.exists(): if path.is_dir(): logger.info(f"πŸ“ Directory already exists: {path.resolve()}") return [TextContent(type="text", text=f"βœ… Directory already exists: {path.resolve()}")] else: logger.error(f"❌ Path exists but is not a directory: {path.resolve()}") return [TextContent(type="text", text=f"Error: Path exists but is not a directory: {path.resolve()}")] # Create directory with all parents path.mkdir(parents=True, exist_ok=True) logger.info(f"βœ… Created directory: {path.resolve()}") return [TextContent(type="text", text=f"βœ… Created directory: {path.resolve()}")] except PermissionError as e: logger.error(f"❌ Permission denied creating directory: {path.resolve()}") return [TextContent(type="text", text=f"Error: Permission denied\nPath: {path.resolve()}\nReason: {e}")] except Exception as e: logger.error(f"❌ Failed to create directory: {path.resolve()}\nException: {e}\n{traceback.format_exc()}") return [TextContent(type="text", text=f"Error: Failed to create directory\nPath: {path.resolve()}\nReason: {e}")] elif name == "llm_query": prompt = arguments["prompt"] # Build comprehensive context for the LLM context_parts = [] # Add MCP server status and capabilities context_parts.append(self._get_mcp_status_context()) # Add recent memory context recent_context = self.memory.get_context()[-5:] if recent_context: context_parts.append(f"\n=== RECENT ACTIVITY ===\n{json.dumps(recent_context, indent=2)}\n=== END RECENT ACTIVITY ===\n") context = "".join(context_parts) # P1 FIX (MCP-1): Use configured default provider instead of hardcoded "grok" provider = self.llm.default response = await self.llm.query(prompt, provider, context=context) # Save conversation as prompt + response pairs timestamp = datetime.now().isoformat() # P1 FIX (MEMORY-4): Use UUID for chat keys to prevent collisions chat_id = str(uuid.uuid4()) conversation_entry = { "id": chat_id, "prompt": prompt, "response": response, "provider": provider, # Use actual provider "timestamp": timestamp, "context_used": len(self.memory.get_context()) } # Store in conversations category for threading (using UUID instead of timestamp) self.memory.remember(f"chat_{chat_id}", conversation_entry, "conversations") # Also keep in llm category for backward compatibility self.memory.remember(f"{provider}_{timestamp}", response, "llm") logger.info(f"Chat saved: prompt={prompt[:50]}..., response={response[:50]}...") return [TextContent(type="text", text=response)] # ---------- UNKNOWN TOOL ---------- else: raise ValueError(f"Unknown tool: {name}") async def run(self): logger.info("=" * 60) logger.info("MCP Server starting with configuration:") logger.info(f" Watch dir: {self.watch_dir}") logger.info(f" LLM Provider: {self.llm.default}") logger.info(f" Grok API Key present: {bool(os.getenv('XAI_API_KEY'))}") logger.info("=" * 60) # Start approval monitor and store task reference for clean shutdown self._approval_task = asyncio.create_task(self._approval_monitor_loop()) logger.info("βœ… Approval monitor started") try: async with stdio_server() as (read, write): # The MCP library handles JSON-RPC validation errors internally # and logs them. It should continue running after errors. # If the server exits unexpectedly, the error will be caught here. try: await self.server.run(read, write, self.server.create_initialization_options()) except ValidationError as e: # Handle Pydantic validation errors from malformed JSON-RPC messages # This catches errors that escape the MCP library's internal handling logger.error("=" * 80) logger.error("JSON-RPC VALIDATION ERROR") logger.error("=" * 80) logger.error(f"Error: {e}") logger.error("The MCP server received invalid input that doesn't conform to JSON-RPC format.") logger.error("This usually happens when:") logger.error(" 1. A client sends malformed JSON") logger.error(" 2. Non-JSON data is sent to stdin") logger.error(" 3. The JSON structure doesn't match JSON-RPC 2.0 spec") logger.error("") logger.error("The MCP server expects messages in this format:") logger.error(' {"jsonrpc": "2.0", "method": "...", "params": {...}, "id": 1}') logger.error("=" * 80) except json.JSONDecodeError as e: # Handle JSON parsing errors that escape the MCP library logger.error("=" * 80) logger.error("JSON DECODE ERROR") logger.error("=" * 80) logger.error(f"Error: {e}") logger.error("Failed to parse JSON from stdin.") logger.error("Ensure all input is valid JSON format.") logger.error("=" * 80) except Exception as e: # Catch any other unexpected errors logger.error("=" * 80) logger.error("UNEXPECTED MCP SERVER ERROR") logger.error("=" * 80) logger.error(f"Error: {e}") logger.error("Stack trace:", exc_info=True) logger.error("=" * 80) raise except KeyboardInterrupt: logger.info("MCP Server interrupted by user (Ctrl+C)") except Exception as e: logger.error("=" * 80) logger.error("FATAL ERROR - MCP SERVER SHUTDOWN") logger.error("=" * 80) logger.error(f"Error: {e}") logger.error("Stack trace:", exc_info=True) logger.error("=" * 80) raise finally: # Clean shutdown of approval monitor if self._approval_task and not self._approval_task.done(): logger.info("Cancelling approval monitor...") self._approval_task.cancel() try: await self._approval_task except asyncio.CancelledError: pass logger.info("Approval monitor cancelled") def stop(self): if self.observer: logger.info("Stopping file watcher...") self.observer.stop() # Join with timeout to prevent hanging self.observer.join(timeout=5.0) if self.observer.is_alive(): logger.warning("File watcher thread did not stop cleanly") else: logger.info("File watcher stopped cleanly") self.observer = None # --------------------------------------------------------------------------- # # ------------------------------- ENTRYPOINT ------------------------------- # # --------------------------------------------------------------------------- # if __name__ == "__main__": import sys config_path = sys.argv[1] if len(sys.argv) > 1 else "fgd_config.yaml" server = FGDMCPServer(config_path) try: asyncio.run(server.run()) except KeyboardInterrupt: server.stop()

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/mikeychann-hash/MCPM'

If you have feedback or need assistance with the MCP directory API, please join our Discord server