Skip to main content
Glama
main.py•10.7 kB
""" Supervisor Main Module Entry point for the local supervisor that manages Claude-Code sessions. Provides a simple HTTP API for the MCP server to communicate with. """ import asyncio import logging import signal import sys import os from typing import Dict import json from aiohttp import web, WSMsgType import aiohttp_cors sys.path.append(os.path.dirname(os.path.dirname(__file__))) from config import config from .session_manager import SessionManager # Configure logging logging.basicConfig( level=getattr(logging, config.LOG_LEVEL), format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) class SupervisorServer: """HTTP/WebSocket server for supervisor communication.""" def __init__(self, host: str = None, port: int = None): self.host = host or config.SUPERVISOR_HOST self.port = port or config.SUPERVISOR_PORT self.session_manager = SessionManager() self.app = web.Application() self.setup_routes() def setup_routes(self): """Setup HTTP routes and WebSocket handlers.""" # Health check self.app.router.add_get('/health', self.health_check) # Session management self.app.router.add_post('/sessions', self.create_session) self.app.router.add_get('/sessions', self.list_sessions) self.app.router.add_get('/sessions/{session_id}/status', self.get_session_status) self.app.router.add_post('/sessions/{session_id}/message', self.send_message) self.app.router.add_get('/sessions/{session_id}/logs', self.get_logs) self.app.router.add_delete('/sessions/{session_id}', self.terminate_session) # Prompt handling self.app.router.add_get('/prompts', self.check_prompts) self.app.router.add_post('/sessions/{session_id}/respond', self.respond_to_prompt) # WebSocket for real-time communication self.app.router.add_get('/ws', self.websocket_handler) # CORS setup cors = aiohttp_cors.setup(self.app, defaults={ "*": aiohttp_cors.ResourceOptions( allow_credentials=True, expose_headers="*", allow_headers="*", allow_methods="*" ) }) # Add CORS to all routes for route in list(self.app.router.routes()): cors.add(route) async def health_check(self, request): """Health check endpoint.""" health = await self.session_manager.check_session_health() return web.json_response({ "status": "healthy", "supervisor_version": "0.1.0", "session_health": health }) async def create_session(self, request): """Create a new Claude-Code session.""" try: data = await request.json() if request.content_type == 'application/json' else {} name = data.get('name') working_dir = data.get('working_dir') session_id = await self.session_manager.create_session(name, working_dir) return web.json_response({ "session_id": session_id, "status": "created" }, status=201) except Exception as e: logger.error(f"Error creating session: {e}") return web.json_response({"error": str(e)}, status=400) async def list_sessions(self, request): """List all active sessions.""" try: sessions = await self.session_manager.list_sessions() return web.json_response({"sessions": sessions}) except Exception as e: logger.error(f"Error listing sessions: {e}") return web.json_response({"error": str(e)}, status=500) async def get_session_status(self, request): """Get status of a specific session.""" try: session_id = request.match_info['session_id'] status = await self.session_manager.get_session_status(session_id) if not status: return web.json_response({"error": "Session not found"}, status=404) return web.json_response(status) except Exception as e: logger.error(f"Error getting session status: {e}") return web.json_response({"error": str(e)}, status=500) async def send_message(self, request): """Send a message to a session.""" try: session_id = request.match_info['session_id'] data = await request.json() message = data.get('message', '') if not message: return web.json_response({"error": "Message is required"}, status=400) result = await self.session_manager.send_message(session_id, message) return web.json_response(result) except ValueError as e: return web.json_response({"error": str(e)}, status=404) except Exception as e: logger.error(f"Error sending message: {e}") return web.json_response({"error": str(e)}, status=500) async def get_logs(self, request): """Get logs from a session with mobile optimization.""" try: session_id = request.match_info['session_id'] lines = int(request.query.get('lines', 50)) mobile = request.query.get('mobile', 'false').lower() == 'true' logs = await self.session_manager.get_logs(session_id, lines, mobile_friendly=mobile) return web.json_response({"logs": logs, "session_id": session_id, "mobile_optimized": mobile}) except ValueError as e: return web.json_response({"error": str(e)}, status=404) except Exception as e: logger.error(f"Error getting logs: {e}") return web.json_response({"error": str(e)}, status=500) async def terminate_session(self, request): """Terminate a session.""" try: session_id = request.match_info['session_id'] success = await self.session_manager.terminate_session(session_id) if success: return web.json_response({"status": "terminated", "session_id": session_id}) else: return web.json_response({"error": "Failed to terminate session"}, status=500) except Exception as e: logger.error(f"Error terminating session: {e}") return web.json_response({"error": str(e)}, status=500) async def check_prompts(self, request): """Check for interactive prompts across all sessions.""" try: prompts = await self.session_manager.check_for_prompts() return web.json_response({"prompts": prompts}) except Exception as e: logger.error(f"Error checking prompts: {e}") return web.json_response({"error": str(e)}, status=500) async def respond_to_prompt(self, request): """Respond to an interactive prompt.""" try: session_id = request.match_info['session_id'] data = await request.json() response = data.get('response', '') if not response: return web.json_response({"error": "Response is required"}, status=400) success = await self.session_manager.respond_to_prompt(session_id, response) if success: return web.json_response({"status": "response_sent", "session_id": session_id}) else: return web.json_response({"error": "Failed to send response"}, status=500) except ValueError as e: return web.json_response({"error": str(e)}, status=404) except Exception as e: logger.error(f"Error responding to prompt: {e}") return web.json_response({"error": str(e)}, status=500) async def websocket_handler(self, request): """WebSocket handler for real-time communication.""" ws = web.WebSocketResponse() await ws.prepare(request) logger.info("WebSocket connection established") async for msg in ws: if msg.type == WSMsgType.TEXT: try: data = json.loads(msg.data) command = data.get('command') if command == 'list_sessions': sessions = await self.session_manager.list_sessions() await ws.send_text(json.dumps({ "type": "sessions", "data": sessions })) elif command == 'check_prompts': prompts = await self.session_manager.check_for_prompts() await ws.send_text(json.dumps({ "type": "prompts", "data": prompts })) except Exception as e: await ws.send_text(json.dumps({ "type": "error", "message": str(e) })) elif msg.type == WSMsgType.ERROR: logger.error(f"WebSocket error: {ws.exception()}") logger.info("WebSocket connection closed") return ws async def start(self): """Start the supervisor server.""" logger.info(f"Starting supervisor server on {self.host}:{self.port}") runner = web.AppRunner(self.app) await runner.setup() site = web.TCPSite(runner, self.host, self.port) await site.start() logger.info(f"Supervisor server running on http://{self.host}:{self.port}") async def shutdown(self): """Shutdown the supervisor gracefully.""" logger.info("Shutting down supervisor...") await self.session_manager.shutdown_all() logger.info("Supervisor shutdown complete") async def main(): """Main entry point for the supervisor.""" supervisor = SupervisorServer() # Setup signal handlers for graceful shutdown def signal_handler(): logger.info("Received shutdown signal") asyncio.create_task(supervisor.shutdown()) # Register signal handlers signal.signal(signal.SIGINT, lambda s, f: signal_handler()) signal.signal(signal.SIGTERM, lambda s, f: signal_handler()) try: await supervisor.start() # Keep the server running while True: await asyncio.sleep(1) except KeyboardInterrupt: logger.info("Keyboard interrupt received") except Exception as e: logger.error(f"Supervisor error: {e}") finally: await supervisor.shutdown() if __name__ == "__main__": asyncio.run(main())

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/mostafa-drz/claude-code-mcp-controller'

If you have feedback or need assistance with the MCP directory API, please join our Discord server