Skip to main content
Glama

Claude Code MCP Controller

by mostafa-drz
server.py•16.3 kB
""" Claude-Code MCP Controller Server FastMCP-based server that provides MCP tools for controlling Claude-Code sessions remotely via ChatGPT. Communicates with the local supervisor to manage sessions. Built following FastMCP 2.0 documentation and best practices. """ import asyncio import logging from datetime import datetime from typing import Dict, List, Optional import aiohttp from fastmcp import FastMCP from config import config # Configure logging logging.basicConfig(level=getattr(logging, config.LOG_LEVEL)) logger = logging.getLogger(__name__) # Initialize FastMCP server (no authentication for now - until OpenAI fixes OAuth) mcp = FastMCP("Claude-Code Controller šŸš€") # HTTP session for supervisor communication http_session: Optional[aiohttp.ClientSession] = None async def get_http_session() -> aiohttp.ClientSession: """Get or create HTTP session for supervisor communication.""" global http_session if http_session is None or http_session.closed: # Disable SSL verification for ngrok/local development import ssl ssl_context = ssl.create_default_context() ssl_context.check_hostname = False ssl_context.verify_mode = ssl.CERT_NONE connector = aiohttp.TCPConnector(ssl=ssl_context) http_session = aiohttp.ClientSession( timeout=aiohttp.ClientTimeout(total=config.SUPERVISOR_TIMEOUT), connector=connector ) return http_session async def supervisor_request(method: str, endpoint: str, **kwargs) -> Dict: """Make a request to the supervisor HTTP API.""" try: session = await get_http_session() url = f"{config.SUPERVISOR_URL}{endpoint}" logger.info(f"Making {method} request to supervisor: {url}") async with session.request(method, url, **kwargs) as response: if response.status >= 400: error_text = await response.text() raise RuntimeError(f"Supervisor error {response.status}: {error_text}") return await response.json() except aiohttp.ClientError as e: logger.error(f"Failed to connect to supervisor: {e}") raise RuntimeError(f"Supervisor communication failed: {str(e)}") @mcp.tool async def list_sessions() -> Dict: """List all active Claude-Code sessions with their status.""" try: result = await supervisor_request("GET", "/sessions") sessions = result.get("sessions", []) logger.info(f"Retrieved {len(sessions)} sessions") # CHATGPT OPTIMIZATION: Format response for conversation if len(sessions) == 0: return { "message": "No active Claude-Code sessions found.", "suggestion": "Create a new session by saying: 'Create a session for my project'", "sessions": sessions } # Format sessions for ChatGPT display session_summary = [] for session in sessions: summary = { "name": session.get("session_id", "unknown").split("_")[0], # Cleaner display name "status": session.get("status", "unknown"), "created": session.get("created_at", "unknown"), "directory": session.get("working_dir", "").split("/")[-1] if session.get("working_dir") else "unknown", # Just folder name "full_id": session.get("session_id") # Keep full ID for operations } session_summary.append(summary) return { "message": f"Found {len(sessions)} active Claude-Code session{'s' if len(sessions) != 1 else ''}", "sessions": session_summary, "total_count": len(sessions) } except Exception as e: logger.error(f"Error listing sessions: {e}") return { "error": "Failed to retrieve sessions", "details": str(e), "suggestion": "Check if the supervisor is running and try again" } @mcp.tool async def create_session(name: Optional[str] = None, working_dir: Optional[str] = None) -> Dict: """ Create a new Claude-Code session optimized for ChatGPT responsiveness. Args: name: Optional name for the session (default: auto-generated) working_dir: Optional working directory for the session (default: current dir) """ try: payload = {} if name: payload["name"] = name if working_dir: payload["working_dir"] = working_dir result = await supervisor_request("POST", "/sessions", json=payload) session_id = result.get('session_id') logger.info(f"Created session: {session_id}") # CHATGPT OPTIMIZATION: Return user-friendly response return { "message": f"āœ… Created new Claude-Code session '{session_id.split('_')[0] if session_id else name or 'session'}'", "session_name": session_id.split('_')[0] if session_id else name or 'session', "full_session_id": session_id, "working_directory": working_dir or "default", "status": "starting up", "next_steps": "You can now send commands to this session or check its logs" } except Exception as e: logger.error(f"Error creating session: {e}") return { "error": "Failed to create Claude-Code session", "details": str(e), "suggestion": "Make sure the supervisor is running and the working directory exists" } @mcp.tool async def send_message(session_id: str, message: str) -> Dict: """ Send a message to a Claude-Code session. Args: session_id: The ID of the session to send the message to message: The message to send to Claude-Code """ try: payload = {"message": message} result = await supervisor_request("POST", f"/sessions/{session_id}/message", json=payload) logger.info(f"Sent message to session {session_id}: {message[:50]}...") return result except Exception as e: logger.error(f"Error sending message to session {session_id}: {e}") raise @mcp.tool async def get_logs(session_id: str, lines: int = 50, mobile_format: bool = True) -> Dict: """ Get recent logs from a Claude-Code session, formatted for ChatGPT display. Args: session_id: The ID of the session to get logs from lines: Number of recent log lines to retrieve (default: 50) mobile_format: Format logs for mobile ChatGPT app (default: True) """ try: # Use mobile_friendly parameter for better ChatGPT mobile display params = {"lines": lines, "mobile": "true" if mobile_format else "false"} result = await supervisor_request("GET", f"/sessions/{session_id}/logs", params=params) logs = result.get('logs', []) logger.info(f"Retrieved {len(logs)} log lines from session {session_id}") # CHATGPT OPTIMIZATION: Format for conversation if not logs: return { "message": f"No recent activity in session '{session_id.split('_')[0]}'", "suggestion": "Send a command to the session to see some activity", "logs": [] } # Format logs for ChatGPT with markdown for better mobile display formatted_logs = "Recent activity:\n```\n" + "\n".join(logs[-10:]) + "\n```" return { "message": f"Showing last {min(len(logs), 10)} lines from session '{session_id.split('_')[0]}'", "formatted_logs": formatted_logs, "raw_logs": logs, "session_id": session_id.split('_')[0], # Clean name for display "total_lines": len(logs) } except Exception as e: logger.error(f"Error getting logs from session {session_id}: {e}") return { "error": f"Could not retrieve logs from session '{session_id.split('_')[0] if '_' in session_id else session_id}'", "details": str(e), "suggestion": "Check if the session exists with 'list_sessions' tool" } @mcp.tool async def get_session_status(session_id: str) -> Dict: """ Get detailed status information for a specific session. Args: session_id: The ID of the session to check """ try: result = await supervisor_request("GET", f"/sessions/{session_id}/status") logger.info(f"Retrieved status for session {session_id}") return result except Exception as e: logger.error(f"Error getting status for session {session_id}: {e}") raise @mcp.tool async def terminate_session(session_id: str) -> Dict: """ Terminate a Claude-Code session. Args: session_id: The ID of the session to terminate """ try: result = await supervisor_request("DELETE", f"/sessions/{session_id}") logger.info(f"Terminated session {session_id}") return result except Exception as e: logger.error(f"Error terminating session {session_id}: {e}") raise @mcp.tool async def check_prompts() -> List[Dict]: """Check all sessions for interactive prompts that need user attention.""" try: result = await supervisor_request("GET", "/prompts") prompts = result.get("prompts", []) logger.info(f"Found {len(prompts)} pending prompts") return prompts except Exception as e: logger.error(f"Error checking for prompts: {e}") raise @mcp.tool async def respond_to_prompt(session_id: str, response: str) -> Dict: """ Respond to an interactive prompt in a Claude-Code session. Args: session_id: The ID of the session with the prompt response: The response to send (e.g., "y", "n", "yes", "no") """ try: payload = {"response": response} result = await supervisor_request("POST", f"/sessions/{session_id}/respond", json=payload) logger.info(f"Sent prompt response to session {session_id}: {response}") return result except Exception as e: logger.error(f"Error responding to prompt in session {session_id}: {e}") raise @mcp.tool async def search(query: str, limit: int = 10) -> Dict: """ Search Claude-Code sessions and logs for relevant content. This is a required tool for ChatGPT MCP connector compatibility. Args: query: Search query to find relevant sessions or content limit: Maximum number of results to return (default: 10) """ try: # Get all sessions sessions_result = await supervisor_request("GET", "/sessions") sessions = sessions_result.get("sessions", []) matching_sessions = [] for session in sessions[:limit]: session_id = session.get("session_id", "") working_dir = session.get("working_dir", "") # Simple text matching for now if (query.lower() in session_id.lower() or query.lower() in working_dir.lower()): matching_sessions.append({ "session_id": session_id, "status": session.get("status"), "working_dir": working_dir, "match_reason": f"Found '{query}' in session ID or directory" }) return { "query": query, "results": matching_sessions, "total_matches": len(matching_sessions), "message": f"Found {len(matching_sessions)} sessions matching '{query}'" } except Exception as e: logger.error(f"Error searching sessions: {e}") return { "query": query, "results": [], "total_matches": 0, "error": f"Search failed: {str(e)}" } @mcp.tool async def fetch(url: str) -> Dict: """ Fetch content from Claude-Code session logs or status. This is a required tool for ChatGPT MCP connector compatibility. Args: url: Session ID or resource identifier to fetch content from """ try: # If URL looks like a session ID, fetch session details if "_" in url and len(url.split("_")) == 2: # Try to get session status try: status_result = await supervisor_request("GET", f"/sessions/{url}/status") logs_result = await supervisor_request("GET", f"/sessions/{url}/logs?lines=20") return { "url": url, "content": { "status": status_result, "recent_logs": logs_result.get("logs", []) }, "content_type": "session_data", "message": f"Fetched data for session {url}" } except: pass # Fallback: try to list sessions and find matches sessions_result = await supervisor_request("GET", "/sessions") sessions = sessions_result.get("sessions", []) matching_session = None for session in sessions: if url in session.get("session_id", ""): matching_session = session break if matching_session: return { "url": url, "content": matching_session, "content_type": "session_info", "message": f"Found session information for {url}" } else: return { "url": url, "content": None, "content_type": "not_found", "message": f"No session found matching {url}" } except Exception as e: logger.error(f"Error fetching content for {url}: {e}") return { "url": url, "content": None, "content_type": "error", "error": f"Fetch failed: {str(e)}" } @mcp.resource("health://supervisor") async def supervisor_health() -> Dict: """Get health status of the supervisor and all sessions.""" try: result = await supervisor_request("GET", "/health") return { "supervisor_status": "connected", "supervisor_health": result, "mcp_server_version": "0.1.0", "last_check": datetime.now().isoformat() } except Exception as e: logger.error(f"Error checking supervisor health: {e}") return { "supervisor_status": "disconnected", "error": str(e), "mcp_server_version": "0.1.0", "last_check": datetime.now().isoformat() } @mcp.resource("sessions://active") async def active_sessions() -> Dict: """Get a summary of all active sessions.""" try: sessions = await list_sessions() summary = { "total_sessions": len(sessions), "active_count": len([s for s in sessions if s.get("status") == "active"]), "sessions": sessions, "last_updated": datetime.now().isoformat() } return summary except Exception as e: logger.error(f"Error getting active sessions: {e}") return { "error": str(e), "last_updated": datetime.now().isoformat() } # Cleanup function for HTTP session async def cleanup(): """Cleanup resources when server shuts down.""" global http_session if http_session and not http_session.closed: await http_session.close() logger.info("HTTP session closed") if __name__ == "__main__": try: logger.info(f"Starting Claude-Code MCP Controller") logger.info(f"Supervisor URL: {config.SUPERVISOR_URL}") # Register cleanup function import atexit atexit.register(lambda: asyncio.run(cleanup()) if asyncio.get_event_loop().is_running() else None) # Use FastMCP's run method with default stdio transport mcp.run() except KeyboardInterrupt: logger.info("Server stopped by user") except Exception as e: logger.error(f"Server error: {e}") raise finally: asyncio.run(cleanup())

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/mostafa-drz/claude-code-mcp-controller'

If you have feedback or need assistance with the MCP directory API, please join our Discord server