Skip to main content
Glama

MCP Server with LLM Integration

by MelaLitho
mcp_server_hybrid.pyโ€ข51.4 kB
#!/usr/bin/env python3 """ Hybrid MCP Server supporting both stdio and HTTP transports Provides MCP SDK compatibility while maintaining REST API access for ReactJS """ import asyncio import json import logging import sys from typing import Dict, Any, List, Optional from contextlib import asynccontextmanager # MCP SDK imports from mcp.server import Server from mcp.server.stdio import stdio_server from mcp.server.models import InitializationOptions import mcp.server.stdio import mcp.types as types # FastAPI for HTTP transport from fastapi import FastAPI, HTTPException, BackgroundTasks from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import JSONResponse, Response, StreamingResponse from datetime import datetime import uvicorn # Your existing services from services.mcp_resources_service import MCPResourcesService from services.mcp_tools_service import MCPToolsService from services.mcp_prompts_service import MCPPromptsService logger = logging.getLogger(__name__) class HybridMCPServer: """MCP Server that supports both stdio and HTTP transports""" def __init__(self): # Initialize your existing services self.resources_service = MCPResourcesService({}) self.tools_service = MCPToolsService({}) self.prompts_service = MCPPromptsService({}) # Create MCP SDK server self.mcp_server = Server("postgresql-mcp-server") self._register_mcp_handlers() # Create FastAPI app for HTTP transport self.http_app = self._create_http_app() def _register_mcp_handlers(self): """Register all MCP SDK handlers with async optimization""" # Register resource handlers @self.mcp_server.list_resources() async def list_resources() -> List[types.Resource]: # Ensure async compatibility and efficient processing resources = await asyncio.get_event_loop().run_in_executor( None, self.resources_service.list_resources ) # Convert resources safely, handling any serialization issues mcp_resources = [] for r in resources: try: # Ensure all values are strings and properly formatted uri = str(r["uri"]) if r["uri"] else "" name = str(r["name"]) if r["name"] else "" description = str(r.get("description", "")) mime_type = str(r.get("mimeType", "application/json")) mcp_resources.append(types.Resource( uri=uri, name=name, description=description, mimeType=mime_type )) except Exception as e: logger.error(f"Error converting resource {r}: {e}") continue return mcp_resources @self.mcp_server.read_resource() async def read_resource(uri: str) -> List[types.TextContent]: result = await self.resources_service.read_resource(uri) # Convert any complex objects to JSON-serializable strings def make_serializable(obj): try: # Handle common non-serializable types if hasattr(obj, '__dict__'): return str(obj) elif hasattr(obj, '_url'): # Handle AnyUrl objects return str(obj) elif hasattr(obj, 'isoformat'): # Handle datetime objects return obj.isoformat() elif str(type(obj)).startswith('<class \'pydantic'): # Pydantic models return str(obj) else: return str(obj) except Exception: return "<non-serializable object>" # Handle the result properly - the resource service returns text content content = "" if isinstance(result, dict): if "text" in result: content = result["text"] elif "contents" in result: # Handle old format with contents array contents = result["contents"] if isinstance(contents, list) and len(contents) > 0: content = contents[0].get("text", "") else: # Convert entire result to JSON content = json.dumps(result, default=make_serializable, indent=2) else: content = str(result) # Ensure content is a string if isinstance(content, dict): content = json.dumps(content, default=make_serializable, indent=2) elif not isinstance(content, str): content = str(content) return [types.TextContent(type="text", text=content)] # Register tool handlers @self.mcp_server.list_tools() async def list_tools() -> List[types.Tool]: # Use async executor for potential I/O operations tool_data = await asyncio.get_event_loop().run_in_executor( None, self.tools_service.get_available_tools ) tools = [] # Use the "tools" list from the return structure for tool in tool_data["tools"]: tools.append(types.Tool( name=tool["name"], description=tool["description"], inputSchema=tool["parameters"] )) return tools @self.mcp_server.call_tool() async def call_tool(name: str, arguments: Dict[str, Any]) -> List[types.TextContent]: result = await self.tools_service.call_tool(name, arguments) # Convert result to MCP TextContent if isinstance(result, dict): content = json.dumps(result, indent=2) else: content = str(result) return [types.TextContent(type="text", text=content)] # Register prompt handlers @self.mcp_server.list_prompts() async def list_prompts() -> List[types.Prompt]: # Use async executor for consistent async handling prompts = await asyncio.get_event_loop().run_in_executor( None, self.prompts_service.list_prompts ) return [ types.Prompt( name=p["name"], description=p["description"], arguments=[ types.PromptArgument( name=arg["name"], description=arg["description"], required=arg.get("required", False) ) for arg in p.get("arguments", []) ] ) for p in prompts ] @self.mcp_server.get_prompt() async def get_prompt(name: str, arguments: Dict[str, str] = None) -> types.GetPromptResult: if arguments is None: arguments = {} prompt = await self.prompts_service.get_prompt(name, arguments) messages = [] for msg in prompt["messages"]: # Handle different message role types safely role_str = msg.get("role", "user").lower() if role_str == "user": role = types.Role.user elif role_str == "assistant": role = types.Role.assistant elif role_str == "system": role = types.Role.system else: role = types.Role.user # Default fallback content_text = msg.get("content", {}).get("text", "") messages.append(types.PromptMessage( role=role, content=types.TextContent( type="text", text=content_text ) )) return types.GetPromptResult( description=prompt["description"], messages=messages ) def _create_http_app(self) -> FastAPI: """Create FastAPI app for HTTP transport""" @asynccontextmanager async def lifespan(app: FastAPI): logger.info("Starting HTTP transport for MCP server") yield logger.info("Shutting down HTTP transport") app = FastAPI( title="PostgreSQL MCP Server - HTTP Transport", description="HTTP REST API wrapper for MCP PostgreSQL server", version="1.0.0", lifespan=lifespan ) # CORS middleware for ReactJS app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # HTTP endpoints that mirror MCP functionality @app.get("/mcp/resources") async def http_list_resources(): """HTTP endpoint for listing resources""" return self.resources_service.list_resources() @app.get("/mcp/resources/{resource_path:path}") async def http_read_resource(resource_path: str): """HTTP endpoint for reading resources""" # Reconstruct URI from path if resource_path.startswith("database/"): uri = resource_path.replace("database/", "database://", 1) elif resource_path.startswith("table/"): uri = resource_path.replace("table/", "table://", 1) else: uri = f"database://{resource_path}" result = await self.resources_service.read_resource(uri) if "error" in result.get("text", ""): raise HTTPException(status_code=404, detail="Resource not found") # Return parsed JSON for HTTP clients try: return json.loads(result["text"]) except json.JSONDecodeError: return {"text": result["text"]} @app.get("/mcp/tools") async def http_list_tools(): """HTTP endpoint for listing tools""" return self.tools_service.get_available_tools() @app.post("/mcp/tools/{tool_name}") async def http_call_tool(tool_name: str, arguments: Dict[str, Any]): """HTTP endpoint for calling tools""" try: result = await self.tools_service.call_tool(tool_name, arguments) return result except Exception as e: raise HTTPException(status_code=400, detail=str(e)) @app.get("/mcp/prompts") async def http_list_prompts(): """HTTP endpoint for listing prompts""" return self.prompts_service.list_prompts() @app.post("/mcp/prompts/{prompt_name}") async def http_get_prompt(prompt_name: str, arguments: Dict[str, Any] = None): """HTTP endpoint for getting prompts with arguments""" if arguments is None: arguments = {} try: result = await self.prompts_service.get_prompt(prompt_name, arguments) return result except Exception as e: raise HTTPException(status_code=400, detail=str(e)) # Health and Status endpoints @app.get("/health") async def health_check(): return {"status": "healthy", "timestamp": str(datetime.utcnow())} @app.get("/status") async def status_check(): return { "status": "running", "server": "postgresql-mcp-server", "version": "1.0.0", "uptime": str(datetime.utcnow()) } @app.get("/models") async def available_models(): """Get actual available models from Ollama""" try: import requests # Fetch models from Ollama ollama_response = requests.get("http://localhost:11434/api/tags", timeout=10) if ollama_response.status_code == 200: ollama_data = ollama_response.json() models = [] # Extract model names from Ollama response for model in ollama_data.get('models', []): model_name = model.get('name', '') if model_name: # Clean model name (remove :latest suffix if present) clean_name = model_name.replace(':latest', '') models.append(model_name) # Keep full name for compatibility # Set default to mistral:latest if available, otherwise first model default_model = "mistral:latest" if models and default_model not in models: # Try to find mistral variant mistral_models = [m for m in models if 'mistral' in m.lower()] if mistral_models: default_model = mistral_models[0] else: default_model = models[0] return { "models": models, "default": default_model, "source": "ollama", "count": len(models) } else: # Fallback if Ollama is not available return { "models": ["mistral:latest"], "default": "mistral:latest", "source": "fallback", "error": f"Ollama not accessible (status: {ollama_response.status_code})" } except Exception as e: # Fallback if connection fails return { "models": ["mistral:latest"], "default": "mistral:latest", "source": "fallback", "error": f"Failed to connect to Ollama: {str(e)}" } @app.get("/api/models") async def api_available_models(): """API endpoint for available models (same as /models)""" return await available_models() @app.get("/tools") async def available_tools(): return self.tools_service.get_available_tools() # Streaming Chat endpoints @app.post("/api/conversation/chat/stream") async def conversation_chat_stream(request: Dict[str, Any]): """ Expected payload: { "question": "Your question here", "session_id": "optional-session-id", "database": "db3", "provider": "ollama" } """ try: # Import enhanced error handling from error_handling import validate_chat_request, create_error_response, LLMError, ServiceError # Validate request validate_chat_request(request) question = request.get("question", "") session_id = request.get("session_id", "default") database = request.get("database", "db3") provider = request.get("provider", "ollama") except Exception as e: from error_handling import create_error_response error_response = create_error_response(e) raise HTTPException( status_code=400, detail=error_response ) async def generate(): try: # Simple response for now - can be enhanced with actual LLM integration yield f"data: {json.dumps({'type': 'status', 'content': 'Processing your question...'})}\n\n" # Integrate with LLM for intelligent responses try: # Import your LLM integration from llmintegrationsystem import LLMIntegrationSystem # Initialize LLM system llm = LLMIntegrationSystem() # Use Ollama for streaming chat model = "mistral:latest" llm_response = "" # Create a focused prompt for the LLM focused_prompt = f"""You are a helpful assistant integrated with an MCP (Model Context Protocol) server. Please provide a direct, concise response to the user's question. Keep your response brief and relevant. User question: {question} Response:""" # Stream response from Ollama async for chunk in llm.chat_stream(focused_prompt, model=model, provider=provider): content = chunk.get('content', '') llm_response += content # Stream each chunk as it comes from LLM if content: chunk_data = { 'content': content, 'done': False } yield f"data: {json.dumps(chunk_data)}\n\n" # Send completion signal final_data = { 'done': True, 'model': model, 'provider': provider } yield f"data: {json.dumps(final_data)}\n\n" return # Exit here since we've already streamed the response except Exception as llm_error: # Log the error and provide detailed fallback error_details = str(llm_error) logger.error(f"LLM integration failed: {error_details}") # Try direct Ollama call as fallback try: import requests ollama_response = requests.post( "http://localhost:11434/api/generate", json={ "model": "mistral:latest", "prompt": question, "stream": False }, timeout=30 ) if ollama_response.status_code == 200: ollama_data = ollama_response.json() response_text = ollama_data.get('response', 'No response from Ollama') # Stream the Ollama response word by word words = response_text.split() for i, word in enumerate(words): chunk_data = { 'content': word + (' ' if i < len(words) - 1 else ''), 'done': False } yield f"data: {json.dumps(chunk_data)}\n\n" # Send completion signal final_data = { 'done': True, 'model': 'mistral:latest', 'provider': 'ollama-direct' } yield f"data: {json.dumps(final_data)}\n\n" return except Exception as direct_error: logger.error(f"Direct Ollama call also failed: {str(direct_error)}") # Final fallback result = {"response": f"Both LLM integration and direct Ollama failed. LLM error: {error_details}. Question was: {question}"} # Extract the actual response text if isinstance(result, dict): if "response" in result: response_text = result["response"] elif "error" in result: response_text = f"Error: {result['error']}" else: response_text = str(result) else: response_text = str(result) # Stream the response word by word words = response_text.split() for i, word in enumerate(words): chunk_data = { 'content': word + (' ' if i < len(words) - 1 else ''), 'done': False } yield f"data: {json.dumps(chunk_data)}\n\n" # Small delay to simulate streaming import asyncio await asyncio.sleep(0.1) # Send completion signal final_data = { 'done': True, 'model': 'conversation-model', 'provider': 'mcp' } yield f"data: {json.dumps(final_data)}\n\n" except Exception as e: try: from error_handling import create_error_response, handle_llm_error, handle_database_error # Determine error type and create appropriate response if "llm" in str(e).lower() or "model" in str(e).lower(): enhanced_error = handle_llm_error(e, provider=provider) error_response = create_error_response(enhanced_error) elif "database" in str(e).lower() or "sql" in str(e).lower(): enhanced_error = handle_database_error(e, database=database) error_response = create_error_response(enhanced_error) else: error_response = create_error_response(e) yield f"data: {json.dumps({'type': 'error', 'content': error_response})}\n\n" except Exception as fallback_error: # Fallback to simple error if enhanced handling fails yield f"data: {json.dumps({'type': 'error', 'content': str(e)})}\n\n" return StreamingResponse( generate(), media_type="text/event-stream", headers={ "Cache-Control": "no-cache", "Connection": "keep-alive", "Access-Control-Allow-Origin": "*", "Access-Control-Allow-Methods": "GET, POST, PUT, DELETE, OPTIONS", "Access-Control-Allow-Headers": "*", } ) @app.post("/api/chat/stream") async def chat_stream(request: Dict[str, Any]): """ Simple chat stream endpoint Expected payload: { "message": "Your message here", "model": "mistral:latest", "temperature": 0.7, "max_tokens": 1000 } """ try: from error_handling import validate_chat_request # Validate request validate_chat_request(request) message = request.get("message", "") model = request.get("model", "mistral:latest") temperature = request.get("temperature", 0.7) max_tokens = request.get("max_tokens", 1000) async def simple_generate(): try: # Send status yield f"data: {json.dumps({'type': 'status', 'content': 'Processing your message...'})}\n\n" # Integrate with LLM for actual responses try: import requests ollama_response = requests.post( "http://localhost:11434/api/generate", json={ "model": model, "prompt": message, "stream": False, "options": { "temperature": temperature, "num_predict": max_tokens } }, timeout=30 ) if ollama_response.status_code == 200: ollama_data = ollama_response.json() response_text = ollama_data.get('response', 'No response from Ollama') else: response_text = f"Ollama error (status {ollama_response.status_code}). Fallback echo: {message}" except Exception as llm_error: response_text = f"LLM unavailable. Echo: {message} (Error: {str(llm_error)[:50]})" # Stream the response in chunks words = response_text.split() for i, word in enumerate(words): chunk_data = { 'content': word + (' ' if i < len(words) - 1 else ''), 'done': False } yield f"data: {json.dumps(chunk_data)}\n\n" # Small delay to simulate streaming import asyncio await asyncio.sleep(0.1) # Send final done message final_data = { 'done': True, 'model': model, 'provider': 'ollama-direct' } yield f"data: {json.dumps(final_data)}\n\n" except Exception as e: error_data = {'error': str(e), 'done': True} yield f"data: {json.dumps(error_data)}\n\n" return StreamingResponse( simple_generate(), media_type="text/event-stream", headers={ "Cache-Control": "no-cache", "Connection": "keep-alive", "Access-Control-Allow-Origin": "*", "Access-Control-Allow-Methods": "GET, POST, PUT, DELETE, OPTIONS", "Access-Control-Allow-Headers": "*", } ) except Exception as e: from error_handling import create_error_response error_response = create_error_response(e) raise HTTPException( status_code=400, detail=error_response ) # Non-streaming Chat endpoints @app.post("/api/chat") async def chat_non_streaming(request: Dict[str, Any]): """ Simple non-streaming chat endpoint Expected payload: { "message": "Your message here", "model": "mistral:latest", "temperature": 0.7, "max_tokens": 1000 } """ try: from error_handling import validate_chat_request # Validate request validate_chat_request(request) message = request.get("message", "") model = request.get("model", "mistral:latest") temperature = request.get("temperature", 0.7) max_tokens = request.get("max_tokens", 1000) # Integrate with LLM for actual responses try: import requests ollama_response = requests.post( "http://localhost:11434/api/generate", json={ "model": model, "prompt": message, "stream": False, "options": { "temperature": temperature, "num_predict": max_tokens } }, timeout=30 ) if ollama_response.status_code == 200: ollama_data = ollama_response.json() response_text = ollama_data.get('response', 'No response from Ollama') return { "response": response_text, "model": model, "provider": "ollama-direct", "usage": { "prompt_tokens": len(message.split()), "completion_tokens": len(response_text.split()), "total_tokens": len(message.split()) + len(response_text.split()) } } else: return { "error": f"Ollama error (status {ollama_response.status_code})", "model": model, "provider": "ollama-direct" } except Exception as llm_error: return { "error": f"LLM unavailable: {str(llm_error)}", "model": model, "provider": "fallback" } except Exception as e: from error_handling import create_error_response error_response = create_error_response(e) raise HTTPException( status_code=400, detail=error_response ) @app.post("/api/conversation/chat") async def conversation_chat_non_streaming(request: Dict[str, Any]): """ Conversation non-streaming chat endpoint with memory Expected payload: { "question": "Your question here", "session_id": "optional-session-id", "database": "db3", "provider": "ollama" } """ try: from error_handling import validate_chat_request, create_error_response, LLMError, ServiceError # Validate request validate_chat_request(request) question = request.get("question", "") session_id = request.get("session_id", "default") database = request.get("database", "db3") provider = request.get("provider", "ollama") # Create a focused prompt for the LLM focused_prompt = f"""You are a helpful assistant integrated with an MCP (Model Context Protocol) server. Please provide a direct, concise response to the user's question. Keep your response brief and relevant. User question: {question} Response:""" # Integrate with LLM for intelligent responses try: import requests ollama_response = requests.post( "http://localhost:11434/api/generate", json={ "model": "mistral:latest", "prompt": focused_prompt, "stream": False }, timeout=30 ) if ollama_response.status_code == 200: ollama_data = ollama_response.json() response_text = ollama_data.get('response', 'No response from Ollama') return { "response": response_text, "model": "mistral:latest", "provider": "ollama", "session_id": session_id, "database": database, "usage": { "prompt_tokens": len(focused_prompt.split()), "completion_tokens": len(response_text.split()), "total_tokens": len(focused_prompt.split()) + len(response_text.split()) } } else: return { "error": f"Ollama error (status {ollama_response.status_code})", "model": "mistral:latest", "provider": "ollama", "session_id": session_id } except Exception as llm_error: return { "error": f"LLM integration failed: {str(llm_error)}", "model": "mistral:latest", "provider": "fallback", "session_id": session_id } except Exception as e: from error_handling import create_error_response error_response = create_error_response(e) raise HTTPException( status_code=400, detail=error_response ) # Legacy compatibility endpoints @app.get("/api/resources") async def legacy_resources(): return http_list_resources() @app.get("/api/capabilities/dynamic") async def legacy_capabilities(): resources = self.resources_service.list_resources() tools = self.tools_service.get_available_tools() prompts = self.prompts_service.list_prompts() return { "resources": resources, "tools": tools, "prompts": prompts, "server_info": { "name": "postgresql-mcp-server", "version": "1.0.0", "transports": ["stdio", "http"], "capabilities": ["resources", "tools", "prompts"] } } @app.get("/api/capabilities") async def api_capabilities(): return await legacy_capabilities() @app.get("/api/info") async def api_info(): return { "name": "postgresql-mcp-server", "version": "1.0.0", "description": "PostgreSQL MCP Server with HTTP API", "endpoints": [ "/mcp/resources", "/mcp/tools", "/mcp/prompts", "/api/conversation/chat/stream", "/health", "/status" ] } @app.get("/api/status") async def api_status(): return await status_check() @app.get("/api/health") async def api_health(): return await health_check() @app.get("/api/docs") async def api_documentation(): """Complete API documentation endpoint""" return { "server_info": { "name": "PostgreSQL MCP Server", "version": "1.0.0", "description": "Hybrid MCP server with HTTP API and streaming support", "ports": { "http": 8002, "stdio": "Claude Desktop integration" } }, "base_urls": [ "http://192.168.0.71:8002", "http://localhost:8002" ], "endpoints": { "health_status": { "GET /health": { "description": "Server health check", "response": {"status": "healthy", "timestamp": "ISO timestamp"} }, "GET /status": { "description": "Server status and uptime", "response": {"status": "running", "server": "postgresql-mcp-server", "version": "1.0.0"} } }, "models_tools": { "GET /models": { "description": "Available AI models", "response": {"models": ["mistral", "gpt-3.5-turbo", "gpt-4", "llama3.2"], "default": "mistral"} }, "GET /tools": { "description": "Available tools (same as /mcp/tools)", "response": {"tools": "array of tool objects", "categories": "tool categories object"} } }, "chat_streaming": { "POST /api/conversation/chat/stream": { "description": "Primary streaming chat endpoint", "content_type": "application/json", "payload": { "question": "Your question here (required)", "session_id": "optional-session-id", "database": "db3 (db1/db2/db3)", "provider": "ollama" }, "response_type": "text/event-stream", "response_format": { "status": "data: {\"type\": \"status\", \"content\": \"Processing...\"}", "response": "data: {\"type\": \"response\", \"content\": \"...\"}", "end": "data: [DONE]" } }, "POST /api/chat/stream": { "description": "Alternative streaming chat endpoint", "payload": { "message": "Your message here", "model": "mistral", "stream": True } } }, "mcp_protocol": { "GET /mcp/resources": { "description": "List all MCP resources", "response": "Array of resource objects with URI, name, description" }, "GET /mcp/resources/{resource_path}": { "description": "Read specific resource by path", "example": "/mcp/resources/database/tables" }, "GET /mcp/tools": { "description": "List all MCP tools", "response": {"tools": "array of tool objects", "categories": "tool categories object"} }, "POST /mcp/tools/{tool_name}": { "description": "Execute a specific tool", "available_tools": [ "execute_safe_sql", "validate_sql_syntax", "search_tables_for_concept", "find_related_data", "analyze_query_performance", "analyze_data_patterns", "get_postgresql_logs", "read_resource" ] }, "POST /mcp/tools/read_resource": { "description": "Read resource by URI", "payload": { "uri": "database://tables or table://products/schema" } }, "GET /mcp/prompts": { "description": "List all MCP prompts" }, "POST /mcp/prompts/{prompt_name}": { "description": "Execute a specific prompt" } }, "api_info": { "GET /api/capabilities": { "description": "Server capabilities (same as /api/capabilities/dynamic)" }, "GET /api/capabilities/dynamic": { "description": "Dynamic server capabilities with resources, tools, prompts" }, "GET /api/info": { "description": "Server information and endpoint list" }, "GET /api/status": { "description": "API status (same as /status)" }, "GET /api/health": { "description": "API health (same as /health)" }, "GET /api/resources": { "description": "Legacy resources endpoint (same as /mcp/resources)" } }, "documentation": { "GET /docs": { "description": "FastAPI Swagger UI documentation" }, "GET /openapi.json": { "description": "OpenAPI specification" }, "GET /api/docs": { "description": "This comprehensive API documentation" } } }, "cors_configuration": { "enabled": True, "allow_origins": "*", "allow_methods": ["GET", "POST", "PUT", "DELETE", "OPTIONS"], "allow_headers": "*" }, "authentication": { "required": False, "type": "None (open access)" }, "examples": { "streaming_chat": { "url": "http://192.168.0.71:8002/api/conversation/chat/stream", "method": "POST", "headers": {"Content-Type": "application/json"}, "body": { "question": "What tables are in the database?", "session_id": "user-123", "database": "db3" } }, "execute_sql": { "url": "http://192.168.0.71:8002/mcp/tools/execute_safe_sql", "method": "POST", "body": { "sql_query": "SELECT * FROM products LIMIT 10", "database": "db3" } }, "read_resource": { "url": "http://192.168.0.71:8002/mcp/tools/read_resource", "method": "POST", "body": { "uri": "database://tables" } } } } return app def _cleanup_port(self, port: int): """Kill any existing processes using the specified port""" try: import subprocess import sys logger.info(f"๐Ÿงน Cleaning up port {port}...") if sys.platform == "win32": # Windows try: # Find processes using the port result = subprocess.run( ["netstat", "-ano"], capture_output=True, text=True, timeout=10 ) for line in result.stdout.split('\n'): if f":{port}" in line and "LISTENING" in line: parts = line.split() if len(parts) >= 5: pid = parts[-1] try: logger.info(f"๐Ÿ”ช Killing process {pid} on port {port}") subprocess.run( ["taskkill", "/PID", pid, "/F"], capture_output=True, timeout=5 ) except: pass # Wait a moment for the port to be released import time time.sleep(1) logger.info(f"โœ… Port {port} cleanup complete") except Exception as e: logger.warning(f"โš ๏ธ Port cleanup failed: {e}") else: # Unix/Linux/Mac try: result = subprocess.run( ["lsof", "-ti", f":{port}"], capture_output=True, text=True, timeout=10 ) pids = result.stdout.strip().split('\n') for pid in pids: if pid: logger.info(f"๐Ÿ”ช Killing process {pid} on port {port}") subprocess.run(["kill", "-9", pid], timeout=5) import time time.sleep(1) logger.info(f"โœ… Port {port} cleanup complete") except Exception as e: logger.warning(f"โš ๏ธ Port cleanup failed: {e}") except Exception as e: logger.warning(f"โš ๏ธ Could not cleanup port {port}: {e}") async def run_stdio(self): """Run MCP server with stdio transport (for Claude Desktop)""" # Comprehensive startup logging for stdio mode logger.info("="*60) logger.info("๐Ÿš€ MCP SERVER STARTUP") logger.info("="*60) logger.info(f"๐Ÿ“ก Server: PostgreSQL MCP Server v1.0.0") logger.info(f"๐ŸŒ Mode: STDIO Transport (Claude Desktop)") logger.info(f"๐Ÿ”— Connection: Standard Input/Output") logger.info("โš™๏ธ Configuration:") logger.info(f" โ€ข Resources: Enabled") logger.info(f" โ€ข Tools: 8 tools available") logger.info(f" โ€ข Prompts: Enabled") logger.info(f" โ€ข Databases: db1, db2, db3") logger.info("="*60) logger.info("โœ… Starting stdio server...") logger.info("="*60) async with stdio_server() as (read_stream, write_stream): await self.mcp_server.run( read_stream, write_stream, InitializationOptions( server_name="postgresql-mcp-server", server_version="1.0.0", capabilities=types.ServerCapabilities( resources=types.ResourcesCapability(subscribe=False), tools=types.ToolsCapability(), prompts=types.PromptsCapability(listChanged=False), ), ) ) def run_http(self, host: str = "0.0.0.0", port: int = 8000): """Run HTTP transport (for ReactJS and other web clients)""" # Kill any existing processes on the target port self._cleanup_port(port) # Comprehensive startup logging logger.info("="*60) logger.info("๐Ÿš€ MCP SERVER STARTUP") logger.info("="*60) logger.info(f"๐Ÿ“ก Server: PostgreSQL MCP Server v1.0.0") logger.info(f"๐ŸŒ Mode: HTTP Transport") logger.info(f"๐Ÿ“ Host: {host}") logger.info(f"๐Ÿ”Œ Port: {port}") logger.info(f"๐Ÿ”— URL: http://{host}:{port}") # Network URLs try: import socket hostname = socket.gethostname() local_ip = socket.gethostbyname(hostname) logger.info(f"๐ŸŒ Network: http://{local_ip}:{port}") except: logger.info(f"๐ŸŒ Network: Check your IP address") logger.info("๐Ÿ“š Documentation:") logger.info(f" โ€ข API Docs: http://{host}:{port}/api/docs") logger.info(f" โ€ข Swagger: http://{host}:{port}/docs") logger.info(f" โ€ข OpenAPI: http://{host}:{port}/openapi.json") logger.info("๐Ÿ”ง Key Endpoints:") logger.info(f" โ€ข Chat Stream: POST http://{host}:{port}/api/conversation/chat/stream") logger.info(f" โ€ข Tools: GET http://{host}:{port}/mcp/tools") logger.info(f" โ€ข Resources: GET http://{host}:{port}/mcp/resources") logger.info(f" โ€ข Health: GET http://{host}:{port}/health") logger.info("โš™๏ธ Configuration:") logger.info(f" โ€ข CORS: Enabled (Allow All Origins)") logger.info(f" โ€ข Authentication: None") logger.info(f" โ€ข Streaming: Enabled") logger.info(f" โ€ข Databases: db1, db2, db3") logger.info("="*60) logger.info("โœ… Starting server...") logger.info("="*60) uvicorn.run(self.http_app, host=host, port=port, log_level="info") async def main(): """Main entry point - determines transport mode""" if len(sys.argv) > 1 and sys.argv[1] == "--http": # HTTP mode for web access port = int(sys.argv[2]) if len(sys.argv) > 2 else 8000 server = HybridMCPServer() server.run_http(port=port) else: # stdio mode for Claude Desktop server = HybridMCPServer() await server.run_stdio() if __name__ == "__main__": if len(sys.argv) > 1 and sys.argv[1] == "--http": # Run HTTP server synchronously port = int(sys.argv[2]) if len(sys.argv) > 2 else 8000 server = HybridMCPServer() server.run_http(port=port) else: # Run stdio server asynchronously asyncio.run(main())

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/MelaLitho/MCPServer'

If you have feedback or need assistance with the MCP directory API, please join our Discord server