Skip to main content
Glama
server.py36 kB
#!/usr/bin/env python3 """ OpenRouter MCP Server - Simple Synchronous Version This version uses simple synchronous stdin to avoid Docker permission issues. Enhanced with graceful shutdown protection for abrupt client disconnects. """ import json import sys import os import logging import signal import threading import time from typing import Dict, Set, Optional from dotenv import load_dotenv # Set up paths for both direct execution and module import try: from .conversation_manager import ConversationManager from .config import ( DEFAULT_MODEL, get_model_alias, OPENROUTER_API_KEY, DEFAULT_MAX_TOKENS, DEFAULT_TEMPERATURE, should_force_internet_search, ) except ImportError: from conversation_manager import ConversationManager from config import ( DEFAULT_MODEL, get_model_alias, OPENROUTER_API_KEY, DEFAULT_MAX_TOKENS, DEFAULT_TEMPERATURE, should_force_internet_search, ) # Simple logging setup logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", handlers=[ logging.StreamHandler(sys.stderr), logging.FileHandler("/tmp/openrouter_simple.log", mode="w"), ], ) logger = logging.getLogger("openrouter-simple") # Load environment load_dotenv() conversation_manager = ConversationManager() # Global state for graceful shutdown protection shutdown_requested = False active_requests: Dict[str, Dict] = {} # request_id -> request_info active_requests_lock = threading.Lock() logger.info("Simple OpenRouter MCP Server starting...") logger.info(f"API Key configured: {bool(OPENROUTER_API_KEY)}") class GracefulShutdownProtection: """Protects against abrupt client disconnects during active requests""" @staticmethod def register_request( request_id: str, request_type: str, continuation_id: Optional[str] = None ): """Register an active request for tracking""" with active_requests_lock: active_requests[request_id] = { "type": request_type, "start_time": time.time(), "continuation_id": continuation_id, "status": "active", } logger.info( f"PROTECTION: Registered active request {request_id} ({request_type})" ) @staticmethod def unregister_request(request_id: str): """Unregister a completed request""" with active_requests_lock: if request_id in active_requests: duration = time.time() - active_requests[request_id]["start_time"] logger.info( f"PROTECTION: Completed request {request_id} in {duration:.2f}s" ) del active_requests[request_id] @staticmethod def get_active_requests() -> Dict[str, Dict]: """Get snapshot of active requests""" with active_requests_lock: return active_requests.copy() @staticmethod def handle_shutdown(): """Handle graceful shutdown with active request protection""" global shutdown_requested shutdown_requested = True active = GracefulShutdownProtection.get_active_requests() if active: logger.warning( f"PROTECTION: Shutdown requested with {len(active)} active requests" ) for req_id, req_info in active.items(): duration = time.time() - req_info["start_time"] logger.warning( f"PROTECTION: Active request {req_id} ({req_info['type']}) running for {duration:.2f}s" ) # Give active requests time to complete max_wait = 30 # seconds wait_interval = 1 waited = 0 while waited < max_wait: active = GracefulShutdownProtection.get_active_requests() if not active: logger.info( "PROTECTION: All requests completed, proceeding with shutdown" ) break logger.info( f"PROTECTION: Waiting for {len(active)} active requests to complete... ({waited}s/{max_wait}s)" ) time.sleep(wait_interval) waited += wait_interval # Force cleanup remaining requests active = GracefulShutdownProtection.get_active_requests() if active: logger.warning( f"PROTECTION: Force shutdown with {len(active)} requests still active" ) for req_id, req_info in active.items(): if req_info.get("continuation_id"): logger.info( f"PROTECTION: Preserving conversation state for {req_info['continuation_id']}" ) else: logger.info("PROTECTION: Clean shutdown - no active requests") def signal_handler(signum, frame): """Handle shutdown signals gracefully""" logger.info( f"PROTECTION: Received signal {signum}, initiating graceful shutdown..." ) GracefulShutdownProtection.handle_shutdown() sys.exit(0) # Register signal handlers signal.signal(signal.SIGINT, signal_handler) signal.signal(signal.SIGTERM, signal_handler) def send_response(response_data): """Send JSON-RPC response to stdout with disconnect protection.""" try: # Check if shutdown is requested if shutdown_requested: logger.debug("PROTECTION: Skipping response send due to shutdown") return response_str = json.dumps(response_data) logger.info(f"Sending response: {response_str}") print(response_str, flush=True) sys.stdout.flush() except BrokenPipeError: logger.warning( "PROTECTION: Broken pipe while sending response, client disconnected" ) GracefulShutdownProtection.handle_shutdown() except OSError as e: if e.errno == 32: # Broken pipe logger.warning( "PROTECTION: Broken pipe (OSError 32) while sending response" ) GracefulShutdownProtection.handle_shutdown() else: logger.error(f"OSError sending response: {e}") except Exception as e: logger.error(f"Failed to send response: {e}") def process_files_and_images(prompt: str, files: list, images: list) -> str: """Process files and images to enhance the prompt with context.""" enhanced_prompt = prompt host_home = os.environ.get("HOST_HOME") if files: logger.info(f"Processing {len(files)} files") enhanced_prompt += "\n\n**Attached Files:**\n" for file_path in files: try: container_path = file_path # Only attempt translation if the file is under the home directory and we have HOST_HOME if host_home and file_path.startswith(host_home): container_path = file_path.replace( host_home, f"/host{host_home}", 1 ) elif file_path.startswith("/home/") and not host_home: logger.warning( "HOST_HOME not set but file is under /home/. Path translation may fail." ) logger.info(f"Reading file: {file_path} -> {container_path}") with open(container_path, "r", encoding="utf-8") as f: content = f.read() enhanced_prompt += ( f"\n**{os.path.basename(file_path)}:**\n```\n{content}\n```\n" ) except Exception as e: logger.error( f"Error reading file {file_path} (tried {container_path}): {e}" ) enhanced_prompt += ( f"\n**{os.path.basename(file_path)}:** Error reading file: {e}\n" ) if images: logger.info(f"Processing {len(images)} images") enhanced_prompt += "\n\n**Attached Images:**\n" for image_path in images: enhanced_prompt += f"- {os.path.basename(image_path)}\n" return enhanced_prompt def add_reasoning_config(data: dict, model: str, thinking_effort: str) -> dict: """Add reasoning configuration to request data based on model capabilities.""" reasoning_models = [ "thinking", "claude", "gemini", "glm", "deepseek", "grok", "qwen", ] clean_model = model.replace(":online", "") has_reasoning = any(keyword in clean_model.lower() for keyword in reasoning_models) if has_reasoning and thinking_effort in ["high", "medium", "low"]: from .config import DEFAULT_MAX_REASONING_TOKENS effort_ratios = {"high": 0.8, "medium": 0.5, "low": 0.2} reasoning_budget = int( DEFAULT_MAX_REASONING_TOKENS * effort_ratios[thinking_effort] ) reasoning_budget = min( reasoning_budget, 32000 if "claude" in clean_model.lower() else DEFAULT_MAX_REASONING_TOKENS, ) if "anthropic" in clean_model.lower() or "claude" in clean_model.lower(): data["thinking"] = {"budget_tokens": reasoning_budget} else: data["reasoning"] = { "effort": thinking_effort, "max_thinking_tokens": reasoning_budget, "exclude": False, } logger.info( f"Enabled reasoning for model {clean_model} with effort: {thinking_effort}, reasoning_budget: {reasoning_budget}" ) return data def _execute_chat_completion( req_id: str, arguments: dict, is_custom_model: bool = False ): """Unified handler for all chat completions.""" continuation_id = arguments.get("continuation_id") GracefulShutdownProtection.register_request(req_id, "chat", continuation_id) try: prompt = arguments.get("prompt") if not prompt: send_response( { "jsonrpc": "2.0", "id": req_id, "error": { "code": -32602, "message": "Missing required parameter: prompt", }, } ) return # Resolve model if is_custom_model: model_name = arguments.get("custom_model") if not model_name: send_response( { "jsonrpc": "2.0", "id": req_id, "error": { "code": -32602, "message": "Missing required parameter: custom_model", }, } ) return actual_model = model_name else: model_alias = arguments.get("model", DEFAULT_MODEL) actual_model = get_model_alias(model_alias, prompt) # Create or get conversation if not continuation_id: continuation_id = conversation_manager.create_conversation() # Check for shutdown request before proceeding if shutdown_requested: logger.warning( f"PROTECTION: Rejecting new request {req_id} due to shutdown" ) send_response( { "jsonrpc": "2.0", "id": req_id, "error": { "code": -32000, "message": "Server shutting down, request rejected", }, } ) return # Process files and images to add to prompt enhanced_prompt = process_files_and_images( prompt, arguments.get("files", []), arguments.get("images", []) ) # Prepare model with internet search if needed final_model = actual_model force_internet_search = arguments.get("force_internet_search", True) if ( force_internet_search and not is_custom_model and should_force_internet_search(actual_model) ): final_model = f"{actual_model}:online" logger.info(f"Enabling web search: {actual_model} -> {final_model}") # Add user message with enhanced content conversation_manager.add_message(continuation_id, "user", enhanced_prompt) messages = conversation_manager.get_conversation_history(continuation_id) # Debug logging logger.info(f"Enhanced prompt length: {len(enhanced_prompt)}") logger.info(f"Number of messages being sent: {len(messages)}") import httpx logger.info(f"Calling OpenRouter with model: {final_model}") headers = { "Authorization": f"Bearer {OPENROUTER_API_KEY}", "Content-Type": "application/json", "HTTP-Referer": "https://claude.ai", "X-Title": "OpenRouter MCP Server", } data = { "model": final_model, "messages": messages, "temperature": arguments.get("temperature", DEFAULT_TEMPERATURE), } # Add max_tokens if specified max_tokens = arguments.get("max_tokens") if max_tokens: data["max_tokens"] = max_tokens # Add reasoning configuration thinking_effort = arguments.get("thinking_effort", "high") data = add_reasoning_config(data, final_model, thinking_effort) # Set timeout based on model capabilities timeout = ( 180.0 if any( keyword in final_model.lower() for keyword in [ "thinking", "claude", "gemini", "glm", "deepseek", "grok", "qwen", ] ) else 60.0 ) with httpx.Client(timeout=timeout) as client: response = client.post( "https://openrouter.ai/api/v1/chat/completions", headers=headers, json=data, ) response.raise_for_status() # Parse JSON response with error handling try: response_text = response.text logger.info(f"Response size: {len(response_text)} characters") if len(response_text) > 1048576: # 1MB logger.warning( f"Very large response ({len(response_text)} chars), may cause parsing issues" ) result = response.json() except json.JSONDecodeError as e: logger.error(f"JSON decode error: {e}") send_response( { "jsonrpc": "2.0", "id": req_id, "error": { "code": -32603, "message": f"Failed to parse OpenRouter response: {e}", }, } ) return # Extract response, handling both regular content and reasoning tokens message = result["choices"][0]["message"] ai_response = message.get("content", "") # Check if model returned reasoning tokens reasoning = message.get("reasoning", "") if reasoning: ai_response = ( f"{reasoning}\n\n---\n\n{ai_response}" if ai_response else reasoning ) logger.info(f"Model returned reasoning tokens: {len(reasoning)} chars") # Add AI response to conversation conversation_manager.add_message(continuation_id, "assistant", ai_response) # Send result send_response( { "jsonrpc": "2.0", "id": req_id, "result": { "content": [ { "type": "text", "text": f"**{actual_model}**: {ai_response}\n\n*Conversation ID: {continuation_id}*", } ], "continuation_id": continuation_id, }, } ) except httpx.HTTPStatusError as e: logger.error(f"HTTP error in chat request: {e}") error_detail = e.response.text try: error_json = e.response.json() error_detail = error_json.get("error", {}).get("message", error_detail) except: pass send_response( { "jsonrpc": "2.0", "id": req_id, "error": { "code": -32603, "message": f"OpenRouter API error: {str(e)} - Details: {error_detail}", }, } ) except Exception as e: logger.error(f"Error calling OpenRouter: {e}") send_response( { "jsonrpc": "2.0", "id": req_id, "error": {"code": -32603, "message": f"OpenRouter API error: {str(e)}"}, } ) finally: # Always unregister the request when done GracefulShutdownProtection.unregister_request(req_id) def handle_initialize(req_id): """Handle initialize request.""" logger.info("Handling initialize request") send_response( { "jsonrpc": "2.0", "id": req_id, "result": { "protocolVersion": "2024-10-07", "capabilities": {"tools": {}}, "serverInfo": {"name": "openrouter-simple", "version": "1.0.0"}, }, } ) def handle_tools_list(req_id): """Handle tools/list request.""" logger.info("Handling tools/list request") tools = [ { "name": "chat", "description": "**PRIMARY TOOL** - Chat with OpenRouter AI models using intelligent model aliases. This is the DEFAULT tool for all standard model requests. Uses smart model resolution with aliases like 'gemini', 'deepseek', 'kimi', 'grok', 'qwen-max', 'qwen-coder', 'glm' that automatically map to the best available models. For best results: include relevant files using the 'files' parameter to provide context, maintain conversation history with continuation_id, and include images when working with visual content.", "inputSchema": { "type": "object", "properties": { "prompt": { "type": "string", "description": "User message with necessary context and background information", }, "model": { "type": "string", "description": "Model alias to use. PREFERRED ALIASES: 'gemini' (Google Gemini 2.5 Pro), 'deepseek' (DeepSeek R1), 'deepseek-v3.1' (DeepSeek Chat v3.1), 'kimi' (moonshotai/kimi-k2-thinking), 'grok' (Grok Code Fast 1), 'qwen-max' (Qwen3 Max), 'qwen-coder' (Qwen3 Coder Plus), 'glm' (GLM 4.5), 'gpt-5' (OpenAI GPT-5). These aliases automatically resolve to the correct OpenRouter model codes.", "default": DEFAULT_MODEL, }, "continuation_id": { "type": "string", "description": "UUID of an existing conversation to continue. Copy the exact UUID from previous responses to maintain conversation memory and context.", }, "files": { "type": "array", "items": {"type": "string"}, "description": "Absolute paths to files providing context. Essential for accurate responses, especially for code-related queries. Include all relevant source files, config files, documentation, etc.", }, "images": { "type": "array", "items": {"type": "string"}, "description": "Absolute paths to images for visual analysis. Required when working with screenshots, diagrams, charts, or any visual content. Supports .png, .jpg, .jpeg, .gif, .svg, .webp formats.", }, "force_internet_search": { "type": "boolean", "description": "Enable web search for models that support it (like Gemini) to get current information.", "default": True, }, "thinking_effort": { "type": "string", "enum": ["high", "medium", "low"], "description": "Controls reasoning depth for capable models. Use 'high' for complex problems, 'medium' for standard queries, 'low' for simple tasks.", "default": "high", }, }, "required": ["prompt"], }, }, { "name": "list_conversations", "description": "List all saved conversations", "inputSchema": {"type": "object", "properties": {}, "required": []}, }, { "name": "get_conversation", "description": "Get conversation history by ID", "inputSchema": { "type": "object", "properties": { "continuation_id": { "type": "string", "description": "Conversation ID to retrieve", } }, "required": ["continuation_id"], }, }, { "name": "delete_conversation", "description": "Delete a conversation by ID", "inputSchema": { "type": "object", "properties": { "continuation_id": { "type": "string", "description": "Conversation ID to delete", } }, "required": ["continuation_id"], }, }, { "name": "chat_with_custom_model", "description": "**ADVANCED TOOL - USE ONLY WHEN NEEDED** - Chat with OpenRouter models using exact model codes. WARNING: Only use this tool when you need a specific model code that is NOT available in the standard aliases (gemini, deepseek, kimi, grok, qwen-max, qwen-coder, glm, gpt-5). For 99% of requests, use the 'chat' tool instead. This tool bypasses intelligent model resolution and requires exact OpenRouter model codes like 'anthropic/claude-3-opus' or 'meta-llama/llama-3.3-70b-instruct'.", "inputSchema": { "type": "object", "properties": { "prompt": { "type": "string", "description": "User message with full context and background information", }, "custom_model": { "type": "string", "description": "The exact OpenRouter model code (e.g., 'anthropic/claude-3-opus', 'meta-llama/llama-3.3-70b-instruct'). MUST be the full model identifier as used by OpenRouter. Do NOT use aliases like 'gemini' here - use the 'chat' tool for aliases.", }, "continuation_id": { "type": "string", "description": "Optional UUID of existing conversation to continue. If not provided, a new conversation will be started.", }, "files": { "type": "array", "items": {"type": "string"}, "description": "Absolute paths to files providing context. Essential for accurate responses, especially for code-related queries.", }, "images": { "type": "array", "items": {"type": "string"}, "description": "Absolute paths to images for visual analysis", }, "max_tokens": { "type": "integer", "description": "Optional maximum tokens for the response. If not specified, will use model's default.", }, "temperature": { "type": "number", "description": "Optional temperature for response generation (0.0-2.0). Default is 0.7.", "minimum": 0.0, "maximum": 2.0, }, "thinking_effort": { "type": "string", "enum": ["high", "medium", "low"], "description": "Controls reasoning depth for capable models. Use 'high' for complex problems, 'medium' for standard queries, 'low' for simple tasks.", "default": "high", }, }, "required": ["prompt", "custom_model"], }, }, ] send_response({"jsonrpc": "2.0", "id": req_id, "result": {"tools": tools}}) def handle_chat_tool(arguments, req_id): """Handle chat tool call by deferring to the unified chat handler.""" logger.info(f"Handling chat tool: {arguments}") _execute_chat_completion(req_id, arguments, is_custom_model=False) def handle_list_conversations(req_id): """Handle list_conversations tool.""" logger.info("Handling list_conversations tool") GracefulShutdownProtection.register_request(req_id, "list_conversations") try: conversations = conversation_manager.list_conversations() if not conversations: result_text = "No conversations found." else: result_text = f"Found {len(conversations)} conversations:\n\n" for conv in conversations: result_text += f"• **ID**: `{conv['id']}`\n" result_text += f" Messages: {conv['message_count']}\n" result_text += f" Preview: {conv.get('first_message', 'No messages')[:100]}...\n\n" send_response( { "jsonrpc": "2.0", "id": req_id, "result": {"content": [{"type": "text", "text": result_text}]}, } ) except Exception as e: logger.error(f"Error listing conversations: {e}") send_response( { "jsonrpc": "2.0", "id": req_id, "error": { "code": -32603, "message": f"Error listing conversations: {str(e)}", }, } ) finally: GracefulShutdownProtection.unregister_request(req_id) def handle_get_conversation(arguments, req_id): """Handle get_conversation tool.""" logger.info(f"Handling get_conversation tool: {arguments}") continuation_id = arguments.get("continuation_id") GracefulShutdownProtection.register_request( req_id, "get_conversation", continuation_id ) try: if not continuation_id: send_response( { "jsonrpc": "2.0", "id": req_id, "error": { "code": -32602, "message": "Missing required parameter: continuation_id", }, } ) return history = conversation_manager.get_conversation_history(continuation_id) if not history: result_text = f"Conversation '{continuation_id}' not found." else: result_text = f"**Conversation ID**: `{continuation_id}`\n\n" for i, msg in enumerate(history, 1): role = msg.get("role", "unknown") content = msg.get("content", "") result_text += f"**{i}. {role.title()}**: {content}\n\n" send_response( { "jsonrpc": "2.0", "id": req_id, "result": {"content": [{"type": "text", "text": result_text}]}, } ) except Exception as e: logger.error(f"Error getting conversation: {e}") send_response( { "jsonrpc": "2.0", "id": req_id, "error": { "code": -32603, "message": f"Error getting conversation: {str(e)}", }, } ) finally: GracefulShutdownProtection.unregister_request(req_id) def handle_delete_conversation(arguments, req_id): """Handle delete_conversation tool.""" logger.info(f"Handling delete_conversation tool: {arguments}") continuation_id = arguments.get("continuation_id") GracefulShutdownProtection.register_request( req_id, "delete_conversation", continuation_id ) try: if not continuation_id: send_response( { "jsonrpc": "2.0", "id": req_id, "error": { "code": -32602, "message": "Missing required parameter: continuation_id", }, } ) return success = conversation_manager.delete_conversation(continuation_id) if success: result_text = f"✅ Conversation '{continuation_id}' deleted successfully." else: result_text = f"❌ Conversation '{continuation_id}' not found." send_response( { "jsonrpc": "2.0", "id": req_id, "result": {"content": [{"type": "text", "text": result_text}]}, } ) except Exception as e: logger.error(f"Error deleting conversation: {e}") send_response( { "jsonrpc": "2.0", "id": req_id, "error": { "code": -32603, "message": f"Error deleting conversation: {str(e)}", }, } ) finally: GracefulShutdownProtection.unregister_request(req_id) def handle_chat_with_custom_model(arguments, req_id): """Handle chat_with_custom_model tool call by deferring to the unified chat handler.""" logger.info(f"Handling chat_with_custom_model tool: {arguments}") _execute_chat_completion(req_id, arguments, is_custom_model=True) def handle_tools_call(params, req_id): """Handle tools/call request.""" tool_name = params.get("name") arguments = params.get("arguments", {}) logger.info(f"Tool call: {tool_name} with args: {arguments}") if tool_name == "chat": handle_chat_tool(arguments, req_id) elif tool_name == "list_conversations": handle_list_conversations(req_id) elif tool_name == "get_conversation": handle_get_conversation(arguments, req_id) elif tool_name == "delete_conversation": handle_delete_conversation(arguments, req_id) elif tool_name == "chat_with_custom_model": handle_chat_with_custom_model(arguments, req_id) else: send_response( { "jsonrpc": "2.0", "id": req_id, "error": {"code": -32601, "message": f"Unknown tool: {tool_name}"}, } ) def main(): """Main synchronous loop with graceful shutdown protection.""" logger.info("Starting main loop, reading from stdin...") # Track consecutive EOF reads to detect client disconnect eof_count = 0 max_eof_retries = 5 try: while not shutdown_requested: try: # Check if stdin is closed (client disconnected) if sys.stdin.closed: logger.warning("PROTECTION: stdin closed, client disconnected") GracefulShutdownProtection.handle_shutdown() break line = sys.stdin.readline() if not line: eof_count += 1 if eof_count >= max_eof_retries: logger.warning( f"PROTECTION: Received {eof_count} consecutive EOFs, client likely disconnected" ) GracefulShutdownProtection.handle_shutdown() break else: logger.debug( f"EOF received ({eof_count}/{max_eof_retries}), waiting for more input..." ) time.sleep(1.0) continue # Reset EOF counter on successful read eof_count = 0 line = line.strip() if not line: continue logger.info(f"Received: {line}") try: message = json.loads(line) method = message.get("method") params = message.get("params", {}) req_id = message.get("id") logger.info(f"Processing method: {method}, id: {req_id}") # Handle notifications (no response needed) if req_id is None: logger.info("Notification received, no response needed") continue # Handle requests if method == "initialize": handle_initialize(req_id) elif method == "tools/list": handle_tools_list(req_id) elif method == "tools/call": handle_tools_call(params, req_id) else: send_response( { "jsonrpc": "2.0", "id": req_id, "error": { "code": -32601, "message": f"Method not found: {method}", }, } ) except json.JSONDecodeError as e: logger.error(f"JSON decode error: {e}") except BrokenPipeError: logger.warning("PROTECTION: Broken pipe detected, client disconnected") GracefulShutdownProtection.handle_shutdown() break except OSError as e: if e.errno == 32: # Broken pipe logger.warning( "PROTECTION: Broken pipe (OSError 32), client disconnected" ) GracefulShutdownProtection.handle_shutdown() break else: logger.error(f"OSError in main loop: {e}") except Exception as e: logger.error(f"Error in main loop: {e}") except KeyboardInterrupt: logger.info("PROTECTION: KeyboardInterrupt received") GracefulShutdownProtection.handle_shutdown() except Exception as e: logger.error(f"Fatal error: {e}") GracefulShutdownProtection.handle_shutdown() logger.info("PROTECTION: Main loop exited, server shutdown complete") if __name__ == "__main__": main()

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/slyfox1186/claude-code-openrouter'

If you have feedback or need assistance with the MCP directory API, please join our Discord server