mcp_server_hybrid.pyโข51.4 kB
#!/usr/bin/env python3
"""
Hybrid MCP Server supporting both stdio and HTTP transports
Provides MCP SDK compatibility while maintaining REST API access for ReactJS
"""
import asyncio
import json
import logging
import sys
from typing import Dict, Any, List, Optional
from contextlib import asynccontextmanager
# MCP SDK imports
from mcp.server import Server
from mcp.server.stdio import stdio_server
from mcp.server.models import InitializationOptions
import mcp.server.stdio
import mcp.types as types
# FastAPI for HTTP transport
from fastapi import FastAPI, HTTPException, BackgroundTasks
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse, Response, StreamingResponse
from datetime import datetime
import uvicorn
# Your existing services
from services.mcp_resources_service import MCPResourcesService
from services.mcp_tools_service import MCPToolsService
from services.mcp_prompts_service import MCPPromptsService
logger = logging.getLogger(__name__)
class HybridMCPServer:
"""MCP Server that supports both stdio and HTTP transports"""
def __init__(self):
# Initialize your existing services
self.resources_service = MCPResourcesService({})
self.tools_service = MCPToolsService({})
self.prompts_service = MCPPromptsService({})
# Create MCP SDK server
self.mcp_server = Server("postgresql-mcp-server")
self._register_mcp_handlers()
# Create FastAPI app for HTTP transport
self.http_app = self._create_http_app()
def _register_mcp_handlers(self):
"""Register all MCP SDK handlers with async optimization"""
# Register resource handlers
@self.mcp_server.list_resources()
async def list_resources() -> List[types.Resource]:
# Ensure async compatibility and efficient processing
resources = await asyncio.get_event_loop().run_in_executor(
None, self.resources_service.list_resources
)
# Convert resources safely, handling any serialization issues
mcp_resources = []
for r in resources:
try:
# Ensure all values are strings and properly formatted
uri = str(r["uri"]) if r["uri"] else ""
name = str(r["name"]) if r["name"] else ""
description = str(r.get("description", ""))
mime_type = str(r.get("mimeType", "application/json"))
mcp_resources.append(types.Resource(
uri=uri,
name=name,
description=description,
mimeType=mime_type
))
except Exception as e:
logger.error(f"Error converting resource {r}: {e}")
continue
return mcp_resources
@self.mcp_server.read_resource()
async def read_resource(uri: str) -> List[types.TextContent]:
result = await self.resources_service.read_resource(uri)
# Convert any complex objects to JSON-serializable strings
def make_serializable(obj):
try:
# Handle common non-serializable types
if hasattr(obj, '__dict__'):
return str(obj)
elif hasattr(obj, '_url'): # Handle AnyUrl objects
return str(obj)
elif hasattr(obj, 'isoformat'): # Handle datetime objects
return obj.isoformat()
elif str(type(obj)).startswith('<class \'pydantic'): # Pydantic models
return str(obj)
else:
return str(obj)
except Exception:
return "<non-serializable object>"
# Handle the result properly - the resource service returns text content
content = ""
if isinstance(result, dict):
if "text" in result:
content = result["text"]
elif "contents" in result:
# Handle old format with contents array
contents = result["contents"]
if isinstance(contents, list) and len(contents) > 0:
content = contents[0].get("text", "")
else:
# Convert entire result to JSON
content = json.dumps(result, default=make_serializable, indent=2)
else:
content = str(result)
# Ensure content is a string
if isinstance(content, dict):
content = json.dumps(content, default=make_serializable, indent=2)
elif not isinstance(content, str):
content = str(content)
return [types.TextContent(type="text", text=content)]
# Register tool handlers
@self.mcp_server.list_tools()
async def list_tools() -> List[types.Tool]:
# Use async executor for potential I/O operations
tool_data = await asyncio.get_event_loop().run_in_executor(
None, self.tools_service.get_available_tools
)
tools = []
# Use the "tools" list from the return structure
for tool in tool_data["tools"]:
tools.append(types.Tool(
name=tool["name"],
description=tool["description"],
inputSchema=tool["parameters"]
))
return tools
@self.mcp_server.call_tool()
async def call_tool(name: str, arguments: Dict[str, Any]) -> List[types.TextContent]:
result = await self.tools_service.call_tool(name, arguments)
# Convert result to MCP TextContent
if isinstance(result, dict):
content = json.dumps(result, indent=2)
else:
content = str(result)
return [types.TextContent(type="text", text=content)]
# Register prompt handlers
@self.mcp_server.list_prompts()
async def list_prompts() -> List[types.Prompt]:
# Use async executor for consistent async handling
prompts = await asyncio.get_event_loop().run_in_executor(
None, self.prompts_service.list_prompts
)
return [
types.Prompt(
name=p["name"],
description=p["description"],
arguments=[
types.PromptArgument(
name=arg["name"],
description=arg["description"],
required=arg.get("required", False)
)
for arg in p.get("arguments", [])
]
)
for p in prompts
]
@self.mcp_server.get_prompt()
async def get_prompt(name: str, arguments: Dict[str, str] = None) -> types.GetPromptResult:
if arguments is None:
arguments = {}
prompt = await self.prompts_service.get_prompt(name, arguments)
messages = []
for msg in prompt["messages"]:
# Handle different message role types safely
role_str = msg.get("role", "user").lower()
if role_str == "user":
role = types.Role.user
elif role_str == "assistant":
role = types.Role.assistant
elif role_str == "system":
role = types.Role.system
else:
role = types.Role.user # Default fallback
content_text = msg.get("content", {}).get("text", "")
messages.append(types.PromptMessage(
role=role,
content=types.TextContent(
type="text",
text=content_text
)
))
return types.GetPromptResult(
description=prompt["description"],
messages=messages
)
def _create_http_app(self) -> FastAPI:
"""Create FastAPI app for HTTP transport"""
@asynccontextmanager
async def lifespan(app: FastAPI):
logger.info("Starting HTTP transport for MCP server")
yield
logger.info("Shutting down HTTP transport")
app = FastAPI(
title="PostgreSQL MCP Server - HTTP Transport",
description="HTTP REST API wrapper for MCP PostgreSQL server",
version="1.0.0",
lifespan=lifespan
)
# CORS middleware for ReactJS
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# HTTP endpoints that mirror MCP functionality
@app.get("/mcp/resources")
async def http_list_resources():
"""HTTP endpoint for listing resources"""
return self.resources_service.list_resources()
@app.get("/mcp/resources/{resource_path:path}")
async def http_read_resource(resource_path: str):
"""HTTP endpoint for reading resources"""
# Reconstruct URI from path
if resource_path.startswith("database/"):
uri = resource_path.replace("database/", "database://", 1)
elif resource_path.startswith("table/"):
uri = resource_path.replace("table/", "table://", 1)
else:
uri = f"database://{resource_path}"
result = await self.resources_service.read_resource(uri)
if "error" in result.get("text", ""):
raise HTTPException(status_code=404, detail="Resource not found")
# Return parsed JSON for HTTP clients
try:
return json.loads(result["text"])
except json.JSONDecodeError:
return {"text": result["text"]}
@app.get("/mcp/tools")
async def http_list_tools():
"""HTTP endpoint for listing tools"""
return self.tools_service.get_available_tools()
@app.post("/mcp/tools/{tool_name}")
async def http_call_tool(tool_name: str, arguments: Dict[str, Any]):
"""HTTP endpoint for calling tools"""
try:
result = await self.tools_service.call_tool(tool_name, arguments)
return result
except Exception as e:
raise HTTPException(status_code=400, detail=str(e))
@app.get("/mcp/prompts")
async def http_list_prompts():
"""HTTP endpoint for listing prompts"""
return self.prompts_service.list_prompts()
@app.post("/mcp/prompts/{prompt_name}")
async def http_get_prompt(prompt_name: str, arguments: Dict[str, Any] = None):
"""HTTP endpoint for getting prompts with arguments"""
if arguments is None:
arguments = {}
try:
result = await self.prompts_service.get_prompt(prompt_name, arguments)
return result
except Exception as e:
raise HTTPException(status_code=400, detail=str(e))
# Health and Status endpoints
@app.get("/health")
async def health_check():
return {"status": "healthy", "timestamp": str(datetime.utcnow())}
@app.get("/status")
async def status_check():
return {
"status": "running",
"server": "postgresql-mcp-server",
"version": "1.0.0",
"uptime": str(datetime.utcnow())
}
@app.get("/models")
async def available_models():
"""Get actual available models from Ollama"""
try:
import requests
# Fetch models from Ollama
ollama_response = requests.get("http://localhost:11434/api/tags", timeout=10)
if ollama_response.status_code == 200:
ollama_data = ollama_response.json()
models = []
# Extract model names from Ollama response
for model in ollama_data.get('models', []):
model_name = model.get('name', '')
if model_name:
# Clean model name (remove :latest suffix if present)
clean_name = model_name.replace(':latest', '')
models.append(model_name) # Keep full name for compatibility
# Set default to mistral:latest if available, otherwise first model
default_model = "mistral:latest"
if models and default_model not in models:
# Try to find mistral variant
mistral_models = [m for m in models if 'mistral' in m.lower()]
if mistral_models:
default_model = mistral_models[0]
else:
default_model = models[0]
return {
"models": models,
"default": default_model,
"source": "ollama",
"count": len(models)
}
else:
# Fallback if Ollama is not available
return {
"models": ["mistral:latest"],
"default": "mistral:latest",
"source": "fallback",
"error": f"Ollama not accessible (status: {ollama_response.status_code})"
}
except Exception as e:
# Fallback if connection fails
return {
"models": ["mistral:latest"],
"default": "mistral:latest",
"source": "fallback",
"error": f"Failed to connect to Ollama: {str(e)}"
}
@app.get("/api/models")
async def api_available_models():
"""API endpoint for available models (same as /models)"""
return await available_models()
@app.get("/tools")
async def available_tools():
return self.tools_service.get_available_tools()
# Streaming Chat endpoints
@app.post("/api/conversation/chat/stream")
async def conversation_chat_stream(request: Dict[str, Any]):
"""
Expected payload:
{
"question": "Your question here",
"session_id": "optional-session-id",
"database": "db3",
"provider": "ollama"
}
"""
try:
# Import enhanced error handling
from error_handling import validate_chat_request, create_error_response, LLMError, ServiceError
# Validate request
validate_chat_request(request)
question = request.get("question", "")
session_id = request.get("session_id", "default")
database = request.get("database", "db3")
provider = request.get("provider", "ollama")
except Exception as e:
from error_handling import create_error_response
error_response = create_error_response(e)
raise HTTPException(
status_code=400,
detail=error_response
)
async def generate():
try:
# Simple response for now - can be enhanced with actual LLM integration
yield f"data: {json.dumps({'type': 'status', 'content': 'Processing your question...'})}\n\n"
# Integrate with LLM for intelligent responses
try:
# Import your LLM integration
from llmintegrationsystem import LLMIntegrationSystem
# Initialize LLM system
llm = LLMIntegrationSystem()
# Use Ollama for streaming chat
model = "mistral:latest"
llm_response = ""
# Create a focused prompt for the LLM
focused_prompt = f"""You are a helpful assistant integrated with an MCP (Model Context Protocol) server. Please provide a direct, concise response to the user's question. Keep your response brief and relevant.
User question: {question}
Response:"""
# Stream response from Ollama
async for chunk in llm.chat_stream(focused_prompt, model=model, provider=provider):
content = chunk.get('content', '')
llm_response += content
# Stream each chunk as it comes from LLM
if content:
chunk_data = {
'content': content,
'done': False
}
yield f"data: {json.dumps(chunk_data)}\n\n"
# Send completion signal
final_data = {
'done': True,
'model': model,
'provider': provider
}
yield f"data: {json.dumps(final_data)}\n\n"
return # Exit here since we've already streamed the response
except Exception as llm_error:
# Log the error and provide detailed fallback
error_details = str(llm_error)
logger.error(f"LLM integration failed: {error_details}")
# Try direct Ollama call as fallback
try:
import requests
ollama_response = requests.post(
"http://localhost:11434/api/generate",
json={
"model": "mistral:latest",
"prompt": question,
"stream": False
},
timeout=30
)
if ollama_response.status_code == 200:
ollama_data = ollama_response.json()
response_text = ollama_data.get('response', 'No response from Ollama')
# Stream the Ollama response word by word
words = response_text.split()
for i, word in enumerate(words):
chunk_data = {
'content': word + (' ' if i < len(words) - 1 else ''),
'done': False
}
yield f"data: {json.dumps(chunk_data)}\n\n"
# Send completion signal
final_data = {
'done': True,
'model': 'mistral:latest',
'provider': 'ollama-direct'
}
yield f"data: {json.dumps(final_data)}\n\n"
return
except Exception as direct_error:
logger.error(f"Direct Ollama call also failed: {str(direct_error)}")
# Final fallback
result = {"response": f"Both LLM integration and direct Ollama failed. LLM error: {error_details}. Question was: {question}"}
# Extract the actual response text
if isinstance(result, dict):
if "response" in result:
response_text = result["response"]
elif "error" in result:
response_text = f"Error: {result['error']}"
else:
response_text = str(result)
else:
response_text = str(result)
# Stream the response word by word
words = response_text.split()
for i, word in enumerate(words):
chunk_data = {
'content': word + (' ' if i < len(words) - 1 else ''),
'done': False
}
yield f"data: {json.dumps(chunk_data)}\n\n"
# Small delay to simulate streaming
import asyncio
await asyncio.sleep(0.1)
# Send completion signal
final_data = {
'done': True,
'model': 'conversation-model',
'provider': 'mcp'
}
yield f"data: {json.dumps(final_data)}\n\n"
except Exception as e:
try:
from error_handling import create_error_response, handle_llm_error, handle_database_error
# Determine error type and create appropriate response
if "llm" in str(e).lower() or "model" in str(e).lower():
enhanced_error = handle_llm_error(e, provider=provider)
error_response = create_error_response(enhanced_error)
elif "database" in str(e).lower() or "sql" in str(e).lower():
enhanced_error = handle_database_error(e, database=database)
error_response = create_error_response(enhanced_error)
else:
error_response = create_error_response(e)
yield f"data: {json.dumps({'type': 'error', 'content': error_response})}\n\n"
except Exception as fallback_error:
# Fallback to simple error if enhanced handling fails
yield f"data: {json.dumps({'type': 'error', 'content': str(e)})}\n\n"
return StreamingResponse(
generate(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"Access-Control-Allow-Origin": "*",
"Access-Control-Allow-Methods": "GET, POST, PUT, DELETE, OPTIONS",
"Access-Control-Allow-Headers": "*",
}
)
@app.post("/api/chat/stream")
async def chat_stream(request: Dict[str, Any]):
"""
Simple chat stream endpoint
Expected payload:
{
"message": "Your message here",
"model": "mistral:latest",
"temperature": 0.7,
"max_tokens": 1000
}
"""
try:
from error_handling import validate_chat_request
# Validate request
validate_chat_request(request)
message = request.get("message", "")
model = request.get("model", "mistral:latest")
temperature = request.get("temperature", 0.7)
max_tokens = request.get("max_tokens", 1000)
async def simple_generate():
try:
# Send status
yield f"data: {json.dumps({'type': 'status', 'content': 'Processing your message...'})}\n\n"
# Integrate with LLM for actual responses
try:
import requests
ollama_response = requests.post(
"http://localhost:11434/api/generate",
json={
"model": model,
"prompt": message,
"stream": False,
"options": {
"temperature": temperature,
"num_predict": max_tokens
}
},
timeout=30
)
if ollama_response.status_code == 200:
ollama_data = ollama_response.json()
response_text = ollama_data.get('response', 'No response from Ollama')
else:
response_text = f"Ollama error (status {ollama_response.status_code}). Fallback echo: {message}"
except Exception as llm_error:
response_text = f"LLM unavailable. Echo: {message} (Error: {str(llm_error)[:50]})"
# Stream the response in chunks
words = response_text.split()
for i, word in enumerate(words):
chunk_data = {
'content': word + (' ' if i < len(words) - 1 else ''),
'done': False
}
yield f"data: {json.dumps(chunk_data)}\n\n"
# Small delay to simulate streaming
import asyncio
await asyncio.sleep(0.1)
# Send final done message
final_data = {
'done': True,
'model': model,
'provider': 'ollama-direct'
}
yield f"data: {json.dumps(final_data)}\n\n"
except Exception as e:
error_data = {'error': str(e), 'done': True}
yield f"data: {json.dumps(error_data)}\n\n"
return StreamingResponse(
simple_generate(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"Access-Control-Allow-Origin": "*",
"Access-Control-Allow-Methods": "GET, POST, PUT, DELETE, OPTIONS",
"Access-Control-Allow-Headers": "*",
}
)
except Exception as e:
from error_handling import create_error_response
error_response = create_error_response(e)
raise HTTPException(
status_code=400,
detail=error_response
)
# Non-streaming Chat endpoints
@app.post("/api/chat")
async def chat_non_streaming(request: Dict[str, Any]):
"""
Simple non-streaming chat endpoint
Expected payload:
{
"message": "Your message here",
"model": "mistral:latest",
"temperature": 0.7,
"max_tokens": 1000
}
"""
try:
from error_handling import validate_chat_request
# Validate request
validate_chat_request(request)
message = request.get("message", "")
model = request.get("model", "mistral:latest")
temperature = request.get("temperature", 0.7)
max_tokens = request.get("max_tokens", 1000)
# Integrate with LLM for actual responses
try:
import requests
ollama_response = requests.post(
"http://localhost:11434/api/generate",
json={
"model": model,
"prompt": message,
"stream": False,
"options": {
"temperature": temperature,
"num_predict": max_tokens
}
},
timeout=30
)
if ollama_response.status_code == 200:
ollama_data = ollama_response.json()
response_text = ollama_data.get('response', 'No response from Ollama')
return {
"response": response_text,
"model": model,
"provider": "ollama-direct",
"usage": {
"prompt_tokens": len(message.split()),
"completion_tokens": len(response_text.split()),
"total_tokens": len(message.split()) + len(response_text.split())
}
}
else:
return {
"error": f"Ollama error (status {ollama_response.status_code})",
"model": model,
"provider": "ollama-direct"
}
except Exception as llm_error:
return {
"error": f"LLM unavailable: {str(llm_error)}",
"model": model,
"provider": "fallback"
}
except Exception as e:
from error_handling import create_error_response
error_response = create_error_response(e)
raise HTTPException(
status_code=400,
detail=error_response
)
@app.post("/api/conversation/chat")
async def conversation_chat_non_streaming(request: Dict[str, Any]):
"""
Conversation non-streaming chat endpoint with memory
Expected payload:
{
"question": "Your question here",
"session_id": "optional-session-id",
"database": "db3",
"provider": "ollama"
}
"""
try:
from error_handling import validate_chat_request, create_error_response, LLMError, ServiceError
# Validate request
validate_chat_request(request)
question = request.get("question", "")
session_id = request.get("session_id", "default")
database = request.get("database", "db3")
provider = request.get("provider", "ollama")
# Create a focused prompt for the LLM
focused_prompt = f"""You are a helpful assistant integrated with an MCP (Model Context Protocol) server. Please provide a direct, concise response to the user's question. Keep your response brief and relevant.
User question: {question}
Response:"""
# Integrate with LLM for intelligent responses
try:
import requests
ollama_response = requests.post(
"http://localhost:11434/api/generate",
json={
"model": "mistral:latest",
"prompt": focused_prompt,
"stream": False
},
timeout=30
)
if ollama_response.status_code == 200:
ollama_data = ollama_response.json()
response_text = ollama_data.get('response', 'No response from Ollama')
return {
"response": response_text,
"model": "mistral:latest",
"provider": "ollama",
"session_id": session_id,
"database": database,
"usage": {
"prompt_tokens": len(focused_prompt.split()),
"completion_tokens": len(response_text.split()),
"total_tokens": len(focused_prompt.split()) + len(response_text.split())
}
}
else:
return {
"error": f"Ollama error (status {ollama_response.status_code})",
"model": "mistral:latest",
"provider": "ollama",
"session_id": session_id
}
except Exception as llm_error:
return {
"error": f"LLM integration failed: {str(llm_error)}",
"model": "mistral:latest",
"provider": "fallback",
"session_id": session_id
}
except Exception as e:
from error_handling import create_error_response
error_response = create_error_response(e)
raise HTTPException(
status_code=400,
detail=error_response
)
# Legacy compatibility endpoints
@app.get("/api/resources")
async def legacy_resources():
return http_list_resources()
@app.get("/api/capabilities/dynamic")
async def legacy_capabilities():
resources = self.resources_service.list_resources()
tools = self.tools_service.get_available_tools()
prompts = self.prompts_service.list_prompts()
return {
"resources": resources,
"tools": tools,
"prompts": prompts,
"server_info": {
"name": "postgresql-mcp-server",
"version": "1.0.0",
"transports": ["stdio", "http"],
"capabilities": ["resources", "tools", "prompts"]
}
}
@app.get("/api/capabilities")
async def api_capabilities():
return await legacy_capabilities()
@app.get("/api/info")
async def api_info():
return {
"name": "postgresql-mcp-server",
"version": "1.0.0",
"description": "PostgreSQL MCP Server with HTTP API",
"endpoints": [
"/mcp/resources", "/mcp/tools", "/mcp/prompts",
"/api/conversation/chat/stream", "/health", "/status"
]
}
@app.get("/api/status")
async def api_status():
return await status_check()
@app.get("/api/health")
async def api_health():
return await health_check()
@app.get("/api/docs")
async def api_documentation():
"""Complete API documentation endpoint"""
return {
"server_info": {
"name": "PostgreSQL MCP Server",
"version": "1.0.0",
"description": "Hybrid MCP server with HTTP API and streaming support",
"ports": {
"http": 8002,
"stdio": "Claude Desktop integration"
}
},
"base_urls": [
"http://192.168.0.71:8002",
"http://localhost:8002"
],
"endpoints": {
"health_status": {
"GET /health": {
"description": "Server health check",
"response": {"status": "healthy", "timestamp": "ISO timestamp"}
},
"GET /status": {
"description": "Server status and uptime",
"response": {"status": "running", "server": "postgresql-mcp-server", "version": "1.0.0"}
}
},
"models_tools": {
"GET /models": {
"description": "Available AI models",
"response": {"models": ["mistral", "gpt-3.5-turbo", "gpt-4", "llama3.2"], "default": "mistral"}
},
"GET /tools": {
"description": "Available tools (same as /mcp/tools)",
"response": {"tools": "array of tool objects", "categories": "tool categories object"}
}
},
"chat_streaming": {
"POST /api/conversation/chat/stream": {
"description": "Primary streaming chat endpoint",
"content_type": "application/json",
"payload": {
"question": "Your question here (required)",
"session_id": "optional-session-id",
"database": "db3 (db1/db2/db3)",
"provider": "ollama"
},
"response_type": "text/event-stream",
"response_format": {
"status": "data: {\"type\": \"status\", \"content\": \"Processing...\"}",
"response": "data: {\"type\": \"response\", \"content\": \"...\"}",
"end": "data: [DONE]"
}
},
"POST /api/chat/stream": {
"description": "Alternative streaming chat endpoint",
"payload": {
"message": "Your message here",
"model": "mistral",
"stream": True
}
}
},
"mcp_protocol": {
"GET /mcp/resources": {
"description": "List all MCP resources",
"response": "Array of resource objects with URI, name, description"
},
"GET /mcp/resources/{resource_path}": {
"description": "Read specific resource by path",
"example": "/mcp/resources/database/tables"
},
"GET /mcp/tools": {
"description": "List all MCP tools",
"response": {"tools": "array of tool objects", "categories": "tool categories object"}
},
"POST /mcp/tools/{tool_name}": {
"description": "Execute a specific tool",
"available_tools": [
"execute_safe_sql",
"validate_sql_syntax",
"search_tables_for_concept",
"find_related_data",
"analyze_query_performance",
"analyze_data_patterns",
"get_postgresql_logs",
"read_resource"
]
},
"POST /mcp/tools/read_resource": {
"description": "Read resource by URI",
"payload": {
"uri": "database://tables or table://products/schema"
}
},
"GET /mcp/prompts": {
"description": "List all MCP prompts"
},
"POST /mcp/prompts/{prompt_name}": {
"description": "Execute a specific prompt"
}
},
"api_info": {
"GET /api/capabilities": {
"description": "Server capabilities (same as /api/capabilities/dynamic)"
},
"GET /api/capabilities/dynamic": {
"description": "Dynamic server capabilities with resources, tools, prompts"
},
"GET /api/info": {
"description": "Server information and endpoint list"
},
"GET /api/status": {
"description": "API status (same as /status)"
},
"GET /api/health": {
"description": "API health (same as /health)"
},
"GET /api/resources": {
"description": "Legacy resources endpoint (same as /mcp/resources)"
}
},
"documentation": {
"GET /docs": {
"description": "FastAPI Swagger UI documentation"
},
"GET /openapi.json": {
"description": "OpenAPI specification"
},
"GET /api/docs": {
"description": "This comprehensive API documentation"
}
}
},
"cors_configuration": {
"enabled": True,
"allow_origins": "*",
"allow_methods": ["GET", "POST", "PUT", "DELETE", "OPTIONS"],
"allow_headers": "*"
},
"authentication": {
"required": False,
"type": "None (open access)"
},
"examples": {
"streaming_chat": {
"url": "http://192.168.0.71:8002/api/conversation/chat/stream",
"method": "POST",
"headers": {"Content-Type": "application/json"},
"body": {
"question": "What tables are in the database?",
"session_id": "user-123",
"database": "db3"
}
},
"execute_sql": {
"url": "http://192.168.0.71:8002/mcp/tools/execute_safe_sql",
"method": "POST",
"body": {
"sql_query": "SELECT * FROM products LIMIT 10",
"database": "db3"
}
},
"read_resource": {
"url": "http://192.168.0.71:8002/mcp/tools/read_resource",
"method": "POST",
"body": {
"uri": "database://tables"
}
}
}
}
return app
def _cleanup_port(self, port: int):
"""Kill any existing processes using the specified port"""
try:
import subprocess
import sys
logger.info(f"๐งน Cleaning up port {port}...")
if sys.platform == "win32":
# Windows
try:
# Find processes using the port
result = subprocess.run(
["netstat", "-ano"],
capture_output=True,
text=True,
timeout=10
)
for line in result.stdout.split('\n'):
if f":{port}" in line and "LISTENING" in line:
parts = line.split()
if len(parts) >= 5:
pid = parts[-1]
try:
logger.info(f"๐ช Killing process {pid} on port {port}")
subprocess.run(
["taskkill", "/PID", pid, "/F"],
capture_output=True,
timeout=5
)
except:
pass
# Wait a moment for the port to be released
import time
time.sleep(1)
logger.info(f"โ
Port {port} cleanup complete")
except Exception as e:
logger.warning(f"โ ๏ธ Port cleanup failed: {e}")
else:
# Unix/Linux/Mac
try:
result = subprocess.run(
["lsof", "-ti", f":{port}"],
capture_output=True,
text=True,
timeout=10
)
pids = result.stdout.strip().split('\n')
for pid in pids:
if pid:
logger.info(f"๐ช Killing process {pid} on port {port}")
subprocess.run(["kill", "-9", pid], timeout=5)
import time
time.sleep(1)
logger.info(f"โ
Port {port} cleanup complete")
except Exception as e:
logger.warning(f"โ ๏ธ Port cleanup failed: {e}")
except Exception as e:
logger.warning(f"โ ๏ธ Could not cleanup port {port}: {e}")
async def run_stdio(self):
"""Run MCP server with stdio transport (for Claude Desktop)"""
# Comprehensive startup logging for stdio mode
logger.info("="*60)
logger.info("๐ MCP SERVER STARTUP")
logger.info("="*60)
logger.info(f"๐ก Server: PostgreSQL MCP Server v1.0.0")
logger.info(f"๐ Mode: STDIO Transport (Claude Desktop)")
logger.info(f"๐ Connection: Standard Input/Output")
logger.info("โ๏ธ Configuration:")
logger.info(f" โข Resources: Enabled")
logger.info(f" โข Tools: 8 tools available")
logger.info(f" โข Prompts: Enabled")
logger.info(f" โข Databases: db1, db2, db3")
logger.info("="*60)
logger.info("โ
Starting stdio server...")
logger.info("="*60)
async with stdio_server() as (read_stream, write_stream):
await self.mcp_server.run(
read_stream,
write_stream,
InitializationOptions(
server_name="postgresql-mcp-server",
server_version="1.0.0",
capabilities=types.ServerCapabilities(
resources=types.ResourcesCapability(subscribe=False),
tools=types.ToolsCapability(),
prompts=types.PromptsCapability(listChanged=False),
),
)
)
def run_http(self, host: str = "0.0.0.0", port: int = 8000):
"""Run HTTP transport (for ReactJS and other web clients)"""
# Kill any existing processes on the target port
self._cleanup_port(port)
# Comprehensive startup logging
logger.info("="*60)
logger.info("๐ MCP SERVER STARTUP")
logger.info("="*60)
logger.info(f"๐ก Server: PostgreSQL MCP Server v1.0.0")
logger.info(f"๐ Mode: HTTP Transport")
logger.info(f"๐ Host: {host}")
logger.info(f"๐ Port: {port}")
logger.info(f"๐ URL: http://{host}:{port}")
# Network URLs
try:
import socket
hostname = socket.gethostname()
local_ip = socket.gethostbyname(hostname)
logger.info(f"๐ Network: http://{local_ip}:{port}")
except:
logger.info(f"๐ Network: Check your IP address")
logger.info("๐ Documentation:")
logger.info(f" โข API Docs: http://{host}:{port}/api/docs")
logger.info(f" โข Swagger: http://{host}:{port}/docs")
logger.info(f" โข OpenAPI: http://{host}:{port}/openapi.json")
logger.info("๐ง Key Endpoints:")
logger.info(f" โข Chat Stream: POST http://{host}:{port}/api/conversation/chat/stream")
logger.info(f" โข Tools: GET http://{host}:{port}/mcp/tools")
logger.info(f" โข Resources: GET http://{host}:{port}/mcp/resources")
logger.info(f" โข Health: GET http://{host}:{port}/health")
logger.info("โ๏ธ Configuration:")
logger.info(f" โข CORS: Enabled (Allow All Origins)")
logger.info(f" โข Authentication: None")
logger.info(f" โข Streaming: Enabled")
logger.info(f" โข Databases: db1, db2, db3")
logger.info("="*60)
logger.info("โ
Starting server...")
logger.info("="*60)
uvicorn.run(self.http_app, host=host, port=port, log_level="info")
async def main():
"""Main entry point - determines transport mode"""
if len(sys.argv) > 1 and sys.argv[1] == "--http":
# HTTP mode for web access
port = int(sys.argv[2]) if len(sys.argv) > 2 else 8000
server = HybridMCPServer()
server.run_http(port=port)
else:
# stdio mode for Claude Desktop
server = HybridMCPServer()
await server.run_stdio()
if __name__ == "__main__":
if len(sys.argv) > 1 and sys.argv[1] == "--http":
# Run HTTP server synchronously
port = int(sys.argv[2]) if len(sys.argv) > 2 else 8000
server = HybridMCPServer()
server.run_http(port=port)
else:
# Run stdio server asynchronously
asyncio.run(main())