Skip to main content
Glama

MCP Server with LLM Integration

by MelaLitho
http_bridge.py35.6 kB
#!/usr/bin/env python3 """ HTTP Bridge for MCP Server Provides REST API endpoints to interact with the MCP server, making it easy to integrate with React frontend applications. """ import asyncio import json import logging from contextlib import asynccontextmanager from typing import Any, Dict, Optional import uvicorn from fastapi import FastAPI, HTTPException from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import StreamingResponse from pydantic import BaseModel from chatmemorysystem import ChatMemorySystem from llmintegrationsystem import LLMIntegrationSystem # Import new architecture components from presentation.mcp_server import MCPDatabaseServer from services.mcp_resources_service import MCPResourcesService from services.mcp_tools_service import MCPToolsService # Request/Response Models class ChatRequest(BaseModel): message: str model: Optional[str] = "mistral:latest" temperature: Optional[float] = 0.7 max_tokens: Optional[int] = 1000 class ConversationChatRequest(BaseModel): message: str conversation_id: str model: Optional[str] = "mistral:latest" temperature: Optional[float] = 0.7 max_tokens: Optional[int] = 1000 include_context: Optional[bool] = True context_limit: Optional[int] = 10 class ChatResponse(BaseModel): response: str model: str provider: str metadata: Optional[Dict[str, Any]] = None class EchoRequest(BaseModel): text: str class EchoResponse(BaseModel): text: str class StoreMemoryRequest(BaseModel): conversation_id: str content: str metadata: Optional[Dict[str, Any]] = None role: Optional[str] = "user" importance: Optional[float] = 1.0 class StoreMemoryResponse(BaseModel): memory_id: str success: bool class GetMemoryRequest(BaseModel): conversation_id: str limit: Optional[int] = None min_importance: Optional[float] = 0.0 class MemoryEntry(BaseModel): id: str conversation_id: str content: str metadata: Dict[str, Any] timestamp: str role: str importance: float class GetMemoryResponse(BaseModel): memories: list[MemoryEntry] count: int class HealthResponse(BaseModel): status: str message: str available_models: Optional[list[str]] = None # Initialize FastAPI app app = FastAPI( title="MCP Server HTTP Bridge", description="HTTP API bridge for Model Context Protocol server", version="1.0.0" ) # Configure CORS for React development app.add_middleware( CORSMiddleware, allow_origins=[ "http://localhost:3000", # Create React App "http://localhost:5173", # Vite default "http://localhost:5174", # Vite alternate "http://127.0.0.1:3000", # Create React App alternate "http://127.0.0.1:5173", # Vite alternate "http://localhost:8080", # Vue/other "http://localhost:4173", # Vite preview "*" # Allow all origins for development ], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # Global instances llm_integration: Optional[LLMIntegrationSystem] = None chat_memory: Optional[ChatMemorySystem] = None mcp_server: Optional[MCPDatabaseServer] = None mcp_resources_service: Optional[MCPResourcesService] = None mcp_tools_service: Optional[MCPToolsService] = None @app.on_event("startup") async def startup_event(): """Initialize MCP server components""" global llm_integration, chat_memory, mcp_server, mcp_resources_service, mcp_tools_service llm_integration = LLMIntegrationSystem() chat_memory = ChatMemorySystem() # Initialize new MCP database server (optional - for database features) try: # Use your existing config or environment variable import os database_url = os.getenv('DATABASE_URL') if not database_url: # Use one of your configured databases try: from config import Config # Use your local postgres database from config.py database_url = Config.SQLALCHEMY_BINDS['db3'] except ImportError: database_url = 'postgresql://postgres:postgres@localhost/postgres' config = { 'database': { 'connection_string': database_url }, 'llm': { 'provider': 'ollama', 'model': 'mistral:latest', 'base_url': 'http://localhost:11434' } } # Skip old MCP server initialization - using new services instead logging.info("Skipping old MCP Database Server (using new services)") mcp_server = None # Initialize new MCP Resources and Tools services (separate from old server) try: mcp_resources_service = MCPResourcesService(config) mcp_tools_service = MCPToolsService(config) logging.info("MCP Resources and Tools services initialized") except Exception as e: logging.warning(f"MCP Services initialization failed: {e}") mcp_resources_service = None mcp_tools_service = None except Exception as e: logging.warning(f"MCP initialization failed: {e}") mcp_server = None mcp_resources_service = None mcp_tools_service = None logging.info("MCP HTTP Bridge initialized") @app.on_event("shutdown") async def shutdown_event(): """Cleanup resources""" if llm_integration: await llm_integration.close() # Health check endpoint @app.get("/health", response_model=HealthResponse) async def health_check(): """Check if the server is healthy and return available models""" try: if not llm_integration: raise HTTPException(status_code=503, detail="LLM integration not initialized") # Try to get Ollama models try: ollama_models = await llm_integration.get_ollama_models() except Exception: ollama_models = [] return HealthResponse( status="healthy", message="MCP HTTP Bridge is running", available_models=ollama_models ) except Exception as e: raise HTTPException(status_code=500, detail=f"Health check failed: {str(e)}") # Echo endpoint (for testing) @app.post("/api/echo", response_model=EchoResponse) async def echo(request: EchoRequest): """Simple echo endpoint for testing connectivity""" return EchoResponse(text=request.text) # Chat endpoint @app.post("/api/chat", response_model=ChatResponse) async def chat(request: ChatRequest): """Send a message to the LLM and get a response""" try: if not llm_integration: raise HTTPException(status_code=503, detail="LLM integration not initialized") response = await llm_integration.chat( message=request.message, model=request.model, temperature=request.temperature, max_tokens=request.max_tokens ) # Get provider info provider = llm_integration._get_provider_for_model(request.model) return ChatResponse( response=response, model=request.model, provider=provider, metadata={ "temperature": request.temperature, "max_tokens": request.max_tokens } ) except Exception as e: raise HTTPException(status_code=500, detail=f"Chat failed: {str(e)}") # Memory-aware conversation chat endpoint @app.post("/api/conversation/chat", response_model=ChatResponse) async def conversation_chat(request: ConversationChatRequest): """Chat with automatic memory storage and context retrieval""" try: if not llm_integration or not chat_memory: raise HTTPException(status_code=503, detail="Services not initialized") # 1. Store user message await chat_memory.store_memory( conversation_id=request.conversation_id, content=request.message, role="user" ) # 2. Get conversation context if requested context_messages = [] if request.include_context: memories = await chat_memory.get_memory( conversation_id=request.conversation_id, limit=request.context_limit ) # Format as conversation history context_messages = [ f"{memory.role}: {memory.content}" for memory in reversed(memories[:-1]) # Exclude the message we just stored ] # 3. Build context-aware prompt if context_messages: context_prompt = "Previous conversation:\n" + "\n".join(context_messages) full_message = f"{context_prompt}\n\nCurrent message: {request.message}" else: full_message = request.message # 4. Get LLM response response = await llm_integration.chat( message=full_message, model=request.model, temperature=request.temperature, max_tokens=request.max_tokens ) # 5. Store assistant response await chat_memory.store_memory( conversation_id=request.conversation_id, content=response, role="assistant" ) # Get provider safely try: provider = llm_integration._get_provider_for_model(request.model) except Exception: provider = "unknown" return ChatResponse( response=response, model=request.model, provider=provider, metadata={ "conversation_id": request.conversation_id, "context_included": request.include_context, "context_messages_count": len(context_messages), "temperature": request.temperature, "max_tokens": request.max_tokens } ) except Exception as e: raise HTTPException(status_code=500, detail=f"Conversation chat failed: {str(e)}") # Memory-aware conversation chat streaming endpoint @app.post("/api/conversation/chat/stream") async def conversation_chat_stream(request: ConversationChatRequest): """Stream chat responses with automatic memory storage and context retrieval""" try: if not llm_integration or not chat_memory: raise HTTPException(status_code=503, detail="Services not initialized") # 1. Store user message await chat_memory.store_memory( conversation_id=request.conversation_id, content=request.message, role="user" ) # 2. Get conversation context if requested context_messages = [] if request.include_context: memories = await chat_memory.get_memory( conversation_id=request.conversation_id, limit=request.context_limit ) # Format as conversation history context_messages = [ f"{memory.role}: {memory.content}" for memory in reversed(memories[:-1]) # Exclude the message we just stored ] # 3. Build context-aware prompt if context_messages: context_prompt = "Previous conversation:\n" + "\n".join(context_messages) full_message = f"{context_prompt}\n\nCurrent message: {request.message}" else: full_message = request.message # 4. Stream LLM response and collect it for storage collected_response = "" async def generate_stream(): nonlocal collected_response try: async for chunk in llm_integration.chat_stream( message=full_message, model=request.model, temperature=request.temperature, max_tokens=request.max_tokens ): # Collect the response for storage if isinstance(chunk, dict) and 'content' in chunk: collected_response += chunk['content'] elif isinstance(chunk, str): collected_response += chunk # Format as Server-Sent Events (SSE) sse_data = json.dumps(chunk) yield f"data: {sse_data}\n\n" # Send metadata with final done event try: provider = llm_integration._get_provider_for_model(request.model) except Exception: provider = "unknown" final_data = { "done": True, "metadata": { "conversation_id": request.conversation_id, "context_included": request.include_context, "context_messages_count": len(context_messages), "temperature": request.temperature, "max_tokens": request.max_tokens, "model": request.model, "provider": provider } } yield f"data: {json.dumps(final_data)}\n\n" # 5. Store assistant response in memory if collected_response: await chat_memory.store_memory( conversation_id=request.conversation_id, content=collected_response, role="assistant" ) except Exception as e: error_data = json.dumps({"error": str(e), "done": True}) yield f"data: {error_data}\n\n" return StreamingResponse( generate_stream(), media_type="text/plain", headers={ "Cache-Control": "no-cache", "Connection": "keep-alive", "Access-Control-Allow-Origin": "*", "Access-Control-Allow-Headers": "*", } ) except Exception as e: raise HTTPException(status_code=500, detail=f"Streaming conversation chat failed: {str(e)}") # Simple test streaming endpoint @app.get("/api/test/stream") async def test_stream(): """Simple test streaming endpoint""" async def generate(): for i in range(5): yield f"data: {{\"chunk\": {i}, \"content\": \"Test chunk {i}\"}}\n\n" await asyncio.sleep(0.5) yield "data: {\"done\": true}\n\n" return StreamingResponse( generate(), media_type="text/plain", headers={ "Cache-Control": "no-cache", "Connection": "keep-alive", } ) # Chat streaming endpoint @app.post("/api/chat/stream") async def chat_stream(request: ChatRequest): """Stream chat responses from the LLM""" try: if not llm_integration: raise HTTPException(status_code=503, detail="LLM integration not initialized") async def generate_stream(): try: async for chunk in llm_integration.chat_stream( message=request.message, model=request.model, temperature=request.temperature, max_tokens=request.max_tokens ): # Format as Server-Sent Events (SSE) sse_data = json.dumps(chunk) yield f"data: {sse_data}\n\n" # Send final done event yield "data: {\"done\": true}\n\n" except Exception as e: error_data = json.dumps({"error": str(e), "done": True}) yield f"data: {error_data}\n\n" return StreamingResponse( generate_stream(), media_type="text/plain", headers={ "Cache-Control": "no-cache", "Connection": "keep-alive", "Access-Control-Allow-Origin": "*", "Access-Control-Allow-Headers": "*", } ) except Exception as e: raise HTTPException(status_code=500, detail=f"Streaming chat failed: {str(e)}") # Memory endpoints @app.post("/api/memory/store", response_model=StoreMemoryResponse) async def store_memory(request: StoreMemoryRequest): """Store a memory in the chat memory system""" try: if not chat_memory: raise HTTPException(status_code=503, detail="Chat memory not initialized") memory_id = await chat_memory.store_memory( conversation_id=request.conversation_id, content=request.content, metadata=request.metadata or {}, role=request.role, importance=request.importance ) return StoreMemoryResponse(memory_id=memory_id, success=True) except Exception as e: raise HTTPException(status_code=500, detail=f"Memory storage failed: {str(e)}") @app.post("/api/memory/get", response_model=GetMemoryResponse) async def get_memory(request: GetMemoryRequest): """Retrieve memories from the chat memory system""" try: if not chat_memory: raise HTTPException(status_code=503, detail="Chat memory not initialized") memories = await chat_memory.get_memory( conversation_id=request.conversation_id, limit=request.limit, min_importance=request.min_importance ) # Convert to response format memory_entries = [ MemoryEntry( id=memory.id or "", conversation_id=memory.conversation_id, content=memory.content, metadata=memory.metadata, timestamp=memory.timestamp.isoformat(), role=memory.role, importance=memory.importance ) for memory in memories ] return GetMemoryResponse(memories=memory_entries, count=len(memory_entries)) except Exception as e: raise HTTPException(status_code=500, detail=f"Memory retrieval failed: {str(e)}") # Get MCP capabilities/tools @app.get("/api/capabilities") async def get_capabilities(): """Get all available MCP tools and capabilities""" try: capabilities = { "tools": [ { "name": "echo", "description": "Echo back the input text", "input_schema": { "type": "object", "properties": { "text": {"type": "string", "description": "Text to echo back"} }, "required": ["text"] } }, { "name": "llm_chat", "description": "Send a message to the integrated LLM", "input_schema": { "type": "object", "properties": { "message": {"type": "string", "description": "Message to send to the LLM"}, "model": {"type": "string", "description": "LLM model to use", "default": "mistral:latest"}, "temperature": {"type": "number", "description": "Temperature for response creativity", "default": 0.7}, "max_tokens": {"type": "number", "description": "Maximum tokens in response", "default": 1000} }, "required": ["message"] } }, { "name": "store_memory", "description": "Store information in chat memory", "input_schema": { "type": "object", "properties": { "conversation_id": {"type": "string", "description": "ID of the conversation"}, "content": {"type": "string", "description": "Content to store"}, "metadata": {"type": "object", "description": "Optional metadata", "default": {}}, "role": {"type": "string", "description": "Role of the message sender", "default": "user"}, "importance": {"type": "number", "description": "Importance score", "default": 1.0} }, "required": ["conversation_id", "content"] } }, { "name": "get_memory", "description": "Retrieve chat memory for a conversation", "input_schema": { "type": "object", "properties": { "conversation_id": {"type": "string", "description": "ID of the conversation"}, "limit": {"type": "number", "description": "Maximum number of memories to retrieve"}, "min_importance": {"type": "number", "description": "Minimum importance score", "default": 0.0} }, "required": ["conversation_id"] } }, { "name": "search_memory", "description": "Search through stored memories", "input_schema": { "type": "object", "properties": { "query": {"type": "string", "description": "Search query"}, "conversation_id": {"type": "string", "description": "Limit search to specific conversation"}, "limit": {"type": "number", "description": "Maximum results", "default": 10} }, "required": ["query"] } }, { "name": "conversation_chat", "description": "Chat with automatic memory storage and context retrieval", "input_schema": { "type": "object", "properties": { "message": {"type": "string", "description": "Message to send"}, "conversation_id": {"type": "string", "description": "ID of the conversation"}, "model": {"type": "string", "description": "LLM model to use", "default": "mistral:latest"}, "temperature": {"type": "number", "description": "Temperature for response creativity", "default": 0.7}, "max_tokens": {"type": "number", "description": "Maximum tokens in response", "default": 1000}, "include_context": {"type": "boolean", "description": "Include conversation history in prompt", "default": True}, "context_limit": {"type": "number", "description": "Maximum number of previous messages to include", "default": 10} }, "required": ["message", "conversation_id"] } } ], "server_info": { "name": "MCP Server HTTP Bridge", "version": "1.0.0", "description": "HTTP API bridge for Model Context Protocol server with Ollama integration" }, "features": { "chat": True, "memory": True, "search": True, "models": True, "ollama_integration": True, "conversation_chat": True } } # Add available models to capabilities if llm_integration: try: all_models = llm_integration.get_available_models() ollama_models = await llm_integration.get_ollama_models() if ollama_models: all_models["ollama"] = ollama_models capabilities["available_models"] = all_models except Exception: pass return capabilities except Exception as e: raise HTTPException(status_code=500, detail=f"Capabilities retrieval failed: {str(e)}") # Get dynamic MCP capabilities including database tools and resources @app.get("/api/capabilities/dynamic") async def get_dynamic_capabilities(): """Get all available capabilities including dynamic database tools and resources""" try: # Start with base HTTP bridge tools tools = [ { "name": "echo", "description": "Echo back the input text", "input_schema": { "type": "object", "properties": { "text": {"type": "string", "description": "Text to echo back"} }, "required": ["text"] } }, { "name": "llm_chat", "description": "Send a message to the integrated LLM", "input_schema": { "type": "object", "properties": { "message": {"type": "string", "description": "Message to send to the LLM"}, "model": {"type": "string", "description": "LLM model to use", "default": "mistral:latest"}, "temperature": {"type": "number", "description": "Temperature for response creativity", "default": 0.7}, "max_tokens": {"type": "number", "description": "Maximum tokens in response", "default": 1000} }, "required": ["message"] } } ] # Add dynamic database tools from MCP Tools Service if mcp_tools_service: try: tool_info = mcp_tools_service.get_available_tools() database_tools = tool_info.get("tools", []) for tool in database_tools: tools.append({ "name": tool["name"], "description": tool["description"], "input_schema": tool["parameters"], "category": tool.get("category", "database") }) logging.info(f"Added {len(database_tools)} database tools to capabilities") except Exception as e: logging.warning(f"Could not load database tools: {e}") # Get dynamic resources from MCP Resources Service resources = [] if mcp_resources_service: try: available_resources = mcp_resources_service.list_resources() for resource in available_resources: resources.append({ "uri": resource["uri"], "name": resource["name"], "description": resource["description"], "mimeType": resource["mimeType"] }) logging.info(f"Added {len(resources)} database resources to capabilities") except Exception as e: logging.warning(f"Could not load database resources: {e}") capabilities = { "tools": tools, "resources": resources, "server_info": { "name": "MCP Server HTTP Bridge", "version": "1.0.0", "description": "HTTP API bridge for Model Context Protocol server with database integration" }, "features": { "chat": True, "memory": True, "search": True, "models": True, "ollama_integration": True, "conversation_chat": True, "database_tools": mcp_tools_service is not None, "database_resources": mcp_resources_service is not None } } # Add available models to capabilities try: if llm_integration: all_models = await llm_integration.get_available_models() try: import aiohttp async with aiohttp.ClientSession() as session: async with session.get("http://localhost:11434/api/tags") as response: if response.status == 200: data = await response.json() ollama_models = [model["name"] for model in data.get("models", [])] all_models["ollama"] = ollama_models except: pass capabilities["available_models"] = all_models except Exception: pass return capabilities except Exception as e: raise HTTPException(status_code=500, detail=f"Dynamic capabilities retrieval failed: {str(e)}") # Get available models @app.get("/api/models") async def get_models(): """Get all available models from all providers""" try: if not llm_integration: raise HTTPException(status_code=503, detail="LLM integration not initialized") all_models = llm_integration.get_available_models() # Also get current Ollama models try: ollama_models = await llm_integration.get_ollama_models() if ollama_models: all_models["ollama"] = ollama_models except Exception: pass return { "models": all_models, "default_model": "mistral:latest", "recommended_provider": "ollama" } except Exception as e: raise HTTPException(status_code=500, detail=f"Model retrieval failed: {str(e)}") # Search memories @app.get("/api/memory/search") async def search_memory( query: str, conversation_id: Optional[str] = None, limit: int = 10 ): """Search through stored memories""" try: if not chat_memory: raise HTTPException(status_code=503, detail="Chat memory not initialized") memories = await chat_memory.search_memory( query=query, conversation_id=conversation_id, limit=limit ) # Convert to response format memory_entries = [ MemoryEntry( id=memory.id or "", conversation_id=memory.conversation_id, content=memory.content, metadata=memory.metadata, timestamp=memory.timestamp.isoformat(), role=memory.role, importance=memory.importance ) for memory in memories ] return GetMemoryResponse(memories=memory_entries, count=len(memory_entries)) except Exception as e: raise HTTPException(status_code=500, detail=f"Memory search failed: {str(e)}") # Import and add database endpoints try: from database_endpoints import add_database_endpoints # Add database endpoints if MCP server is available if mcp_server: add_database_endpoints(app, mcp_server) except ImportError as e: logging.warning(f"Database endpoints not available: {e}") # MCP Resource endpoints @app.get("/api/mcp/resources") async def list_mcp_resources(): """List all available MCP resources""" try: if not mcp_resources_service: raise HTTPException(status_code=503, detail="MCP Resources service not initialized") resources = mcp_resources_service.list_resources() return {"resources": resources, "count": len(resources)} except Exception as e: logger.error(f"Error listing MCP resources: {e}") raise HTTPException(status_code=500, detail=str(e)) @app.get("/api/mcp/resources/read") async def read_mcp_resource(uri: str): """Read a specific MCP resource by URI""" try: if not mcp_resources_service: raise HTTPException(status_code=503, detail="MCP Resources service not initialized") resource_data = await mcp_resources_service.read_resource(uri) return resource_data except Exception as e: logger.error(f"Error reading MCP resource {uri}: {e}") raise HTTPException(status_code=500, detail=str(e)) # Compatibility endpoint for frontend @app.get("/api/resources") async def read_resource_compat(uri: str): """Compatibility endpoint - Read a specific MCP resource by URI""" try: if not mcp_resources_service: raise HTTPException(status_code=503, detail="MCP Resources service not initialized") resource_data = await mcp_resources_service.read_resource(uri) return resource_data except Exception as e: logging.error(f"Error reading resource {uri}: {e}") raise HTTPException(status_code=500, detail=str(e)) # MCP Tool endpoints @app.get("/api/mcp/tools") async def list_mcp_tools(): """List all available MCP tools""" try: if not mcp_tools_service: raise HTTPException(status_code=503, detail="MCP Tools service not initialized") tools_info = mcp_tools_service.get_available_tools() return tools_info except Exception as e: logger.error(f"Error listing MCP tools: {e}") raise HTTPException(status_code=500, detail=str(e)) @app.post("/api/mcp/tools/{tool_name}") async def call_mcp_tool(tool_name: str, arguments: dict): """Execute an MCP tool with given arguments""" try: if not mcp_tools_service: raise HTTPException(status_code=503, detail="MCP Tools service not initialized") result = await mcp_tools_service.handle_tool_call(tool_name, arguments) return result except Exception as e: logger.error(f"Error calling MCP tool {tool_name}: {e}") raise HTTPException(status_code=500, detail=str(e)) # Add the original answer API endpoint @app.post("/api/answer") async def answer_question(request: dict): """Answer natural language questions using your original smart search""" try: if not request.get('question'): raise HTTPException(status_code=400, detail="Missing 'question' in request body") question = request['question'] db_key = request.get('db', 'db3') # Default to your local postgres # Use your existing router from llmDatabaseRouter import DatabaseRouter from config import Config # Get connection string from your config connection_string = Config.SQLALCHEMY_BINDS[db_key] # Initialize router with your config router = DatabaseRouter(connection_string) # Use the answer_question method result = await router.answer_question(question) return { "success": True, "database": db_key, **result } except Exception as e: logger.error(f"Answer API failed: {e}") raise HTTPException(status_code=500, detail=str(e)) if __name__ == "__main__": logging.basicConfig(level=logging.INFO) print("Starting MCP HTTP Bridge...") print("API will be available at: http://localhost:8000") print("Interactive docs at: http://localhost:8000/docs") print("Health check: http://localhost:8000/health") uvicorn.run( "http_bridge:app", host="0.0.0.0", port=8000, reload=True, log_level="info" )

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/MelaLitho/MCPServer'

If you have feedback or need assistance with the MCP directory API, please join our Discord server