Skip to main content
Glama

MCP Server with LLM Integration

by MelaLitho
server.py24.7 kB
""" MCP Server Implementation Main entry point for the Model Context Protocol server with tool registration and request handling capabilities. """ import asyncio import json import logging import sys from typing import Any, Dict, List, Optional, Sequence import structlog from mcp import ClientSession, StdioServerParameters from mcp.server import Server from mcp.server.models import InitializationOptions from mcp.types import ( CallToolRequest, CallToolResult, ListToolsRequest, TextContent, Tool, ) from pydantic import BaseModel from chatmemorysystem import ChatMemorySystem from llmintegrationsystem import LLMIntegrationSystem from postgres_integration import PostgreSQLIntegration from vector_search import VectorSearchEngine logger = structlog.get_logger(__name__) class MCPServer: def __init__(self): self.server = Server("mcp-server") self.llm_integration = LLMIntegrationSystem() self.chat_memory = ChatMemorySystem() self.postgres_integration = None # Will be initialized with database engine self.vector_search = None # Will be initialized with database engine self._setup_handlers() def initialize_database(self, engine, db_key: str = "default"): """Initialize database integrations with a database engine""" self.postgres_integration = PostgreSQLIntegration(engine, db_key) self.vector_search = VectorSearchEngine(engine, db_key) def initialize_vector_search(self, engine, db_key: str = "default"): """Initialize vector search with a database engine (legacy method)""" if not self.postgres_integration: self.postgres_integration = PostgreSQLIntegration(engine, db_key) self.vector_search = VectorSearchEngine(engine, db_key) def _setup_handlers(self): @self.server.list_tools() async def handle_list_tools() -> List[Tool]: return [ Tool( name="echo", description="Echo back the input text", inputSchema={ "type": "object", "properties": { "text": { "type": "string", "description": "Text to echo back", } }, "required": ["text"], }, ), Tool( name="get_memory", description="Retrieve chat memory for a conversation", inputSchema={ "type": "object", "properties": { "conversation_id": { "type": "string", "description": "ID of the conversation", } }, "required": ["conversation_id"], }, ), Tool( name="store_memory", description="Store information in chat memory", inputSchema={ "type": "object", "properties": { "conversation_id": { "type": "string", "description": "ID of the conversation", }, "content": { "type": "string", "description": "Content to store", }, "metadata": { "type": "object", "description": "Optional metadata", "default": {}, }, }, "required": ["conversation_id", "content"], }, ), Tool( name="llm_chat", description="Send a message to the integrated LLM", inputSchema={ "type": "object", "properties": { "message": { "type": "string", "description": "Message to send to the LLM", }, "model": { "type": "string", "description": "LLM model to use", "default": "mistral:latest", }, }, "required": ["message"], }, ), Tool( name="semantic_search", description="Search document embeddings using vector similarity", inputSchema={ "type": "object", "properties": { "question": { "type": "string", "description": "Search query/question", }, "table_filter": { "type": "string", "description": "Optional table name to filter results", }, "fk_filter": { "type": "object", "description": "Optional foreign key filter as JSON object", }, "k": { "type": "integer", "description": "Number of results to return", "default": 10, }, }, "required": ["question"], }, ), Tool( name="safe_sql", description="Execute SQL with safety guardrails - only allows SELECT statements", inputSchema={ "type": "object", "properties": { "sql": { "type": "string", "description": "SQL query to execute (SELECT only)", }, "limit_safe": { "type": "boolean", "description": "Auto-inject LIMIT clause for safety", "default": True, }, }, "required": ["sql"], }, ), Tool( name="get_database_schema", description="Get comprehensive schema information for the database", inputSchema={ "type": "object", "properties": {}, }, ), Tool( name="get_table_schema", description="Get detailed schema information for a specific table", inputSchema={ "type": "object", "properties": { "table_name": { "type": "string", "description": "Name of the table to get schema for", }, }, "required": ["table_name"], }, ), Tool( name="test_db_connection", description="Test the database connection", inputSchema={ "type": "object", "properties": {}, }, ), Tool( name="get_database_stats", description="Get comprehensive database statistics and performance info", inputSchema={ "type": "object", "properties": {}, }, ), Tool( name="get_table_size", description="Get detailed size information for a specific table", inputSchema={ "type": "object", "properties": { "table_name": { "type": "string", "description": "Name of the table to get size information for", }, }, "required": ["table_name"], }, ), Tool( name="explain_query", description="Execute EXPLAIN ANALYZE for a SELECT query to analyze performance", inputSchema={ "type": "object", "properties": { "sql": { "type": "string", "description": "SELECT query to analyze (SELECT only)", }, }, "required": ["sql"], }, ), Tool( name="generate_sql", description="Generate SQL query from natural language question", inputSchema={ "type": "object", "properties": { "question": { "type": "string", "description": "Natural language question to convert to SQL", }, "schema_context": { "type": "object", "description": "Optional schema context for better SQL generation", "default": {}, }, }, "required": ["question"], }, ), ] @self.server.call_tool() async def handle_call_tool( name: str, arguments: Optional[Dict[str, Any]] ) -> CallToolResult: try: if name == "echo": text = arguments.get("text", "") if arguments else "" return CallToolResult(content=[TextContent(type="text", text=text)]) elif name == "get_memory": conversation_id = arguments.get("conversation_id") if arguments else None if not conversation_id: return CallToolResult( content=[TextContent(type="text", text="Missing conversation_id")] ) memory = await self.chat_memory.get_memory(conversation_id) return CallToolResult( content=[TextContent(type="text", text=json.dumps(memory, indent=2))] ) elif name == "store_memory": if not arguments: return CallToolResult( content=[TextContent(type="text", text="Missing arguments")] ) conversation_id = arguments.get("conversation_id") content = arguments.get("content") metadata = arguments.get("metadata", {}) if not conversation_id or not content: return CallToolResult( content=[TextContent(type="text", text="Missing required arguments")] ) await self.chat_memory.store_memory(conversation_id, content, metadata) return CallToolResult( content=[TextContent(type="text", text="Memory stored successfully")] ) elif name == "llm_chat": if not arguments: return CallToolResult( content=[TextContent(type="text", text="Missing arguments")] ) message = arguments.get("message") model = arguments.get("model", "mistral:latest") if not message: return CallToolResult( content=[TextContent(type="text", text="Missing message")] ) response = await self.llm_integration.chat(message, model) return CallToolResult( content=[TextContent(type="text", text=response)] ) elif name == "semantic_search": if not arguments: return CallToolResult( content=[TextContent(type="text", text="Missing arguments")] ) if not self.vector_search: return CallToolResult( content=[TextContent(type="text", text="Vector search not initialized. Please configure database connection first.")] ) question = arguments.get("question") if not question: return CallToolResult( content=[TextContent(type="text", text="Missing question parameter")] ) table_filter = arguments.get("table_filter") fk_filter = arguments.get("fk_filter") k = arguments.get("k", 10) results = self.vector_search.semantic_search(question, table_filter, fk_filter, k) response_data = { "success": True, "question": question, "results": results, "count": len(results) } return CallToolResult( content=[TextContent(type="text", text=json.dumps(response_data, indent=2))] ) elif name == "safe_sql": if not arguments: return CallToolResult( content=[TextContent(type="text", text="Missing arguments")] ) if not self.postgres_integration: return CallToolResult( content=[TextContent(type="text", text="PostgreSQL integration not initialized. Please configure database connection first.")] ) sql = arguments.get("sql") if not sql: return CallToolResult( content=[TextContent(type="text", text="Missing sql parameter")] ) limit_safe = arguments.get("limit_safe", True) result = self.postgres_integration.safe_run_sql(sql, limit_safe) return CallToolResult( content=[TextContent(type="text", text=json.dumps(result, indent=2))] ) elif name == "get_database_schema": if not self.postgres_integration: return CallToolResult( content=[TextContent(type="text", text="PostgreSQL integration not initialized. Please configure database connection first.")] ) result = self.postgres_integration.get_database_schema() return CallToolResult( content=[TextContent(type="text", text=json.dumps(result, indent=2))] ) elif name == "get_table_schema": if not arguments: return CallToolResult( content=[TextContent(type="text", text="Missing arguments")] ) if not self.postgres_integration: return CallToolResult( content=[TextContent(type="text", text="PostgreSQL integration not initialized. Please configure database connection first.")] ) table_name = arguments.get("table_name") if not table_name: return CallToolResult( content=[TextContent(type="text", text="Missing table_name parameter")] ) result = self.postgres_integration.get_table_schema(table_name) return CallToolResult( content=[TextContent(type="text", text=json.dumps(result, indent=2))] ) elif name == "test_db_connection": if not self.postgres_integration: return CallToolResult( content=[TextContent(type="text", text="PostgreSQL integration not initialized. Please configure database connection first.")] ) result = self.postgres_integration.test_connection() return CallToolResult( content=[TextContent(type="text", text=json.dumps(result, indent=2))] ) elif name == "get_database_stats": if not self.postgres_integration: return CallToolResult( content=[TextContent(type="text", text="PostgreSQL integration not initialized. Please configure database connection first.")] ) result = self.postgres_integration.get_database_stats() return CallToolResult( content=[TextContent(type="text", text=json.dumps(result, indent=2))] ) elif name == "get_table_size": if not arguments: return CallToolResult( content=[TextContent(type="text", text="Missing arguments")] ) if not self.postgres_integration: return CallToolResult( content=[TextContent(type="text", text="PostgreSQL integration not initialized. Please configure database connection first.")] ) table_name = arguments.get("table_name") if not table_name: return CallToolResult( content=[TextContent(type="text", text="Missing table_name parameter")] ) result = self.postgres_integration.get_table_size(table_name) return CallToolResult( content=[TextContent(type="text", text=json.dumps(result, indent=2))] ) elif name == "explain_query": if not arguments: return CallToolResult( content=[TextContent(type="text", text="Missing arguments")] ) if not self.postgres_integration: return CallToolResult( content=[TextContent(type="text", text="PostgreSQL integration not initialized. Please configure database connection first.")] ) sql = arguments.get("sql") if not sql: return CallToolResult( content=[TextContent(type="text", text="Missing sql parameter")] ) result = self.postgres_integration.execute_explain(sql) return CallToolResult( content=[TextContent(type="text", text=json.dumps(result, indent=2))] ) elif name == "generate_sql": if not arguments: return CallToolResult( content=[TextContent(type="text", text="Missing arguments")] ) if not self.postgres_integration: return CallToolResult( content=[TextContent(type="text", text="PostgreSQL integration not initialized. Please configure database connection first.")] ) question = arguments.get("question") if not question: return CallToolResult( content=[TextContent(type="text", text="Missing question parameter")] ) schema_context = arguments.get("schema_context", {}) generated_sql = self.postgres_integration.generate_sql(question, schema_context) result = { "success": True, "question": question, "generated_sql": generated_sql, "note": "Use safe_sql tool to execute this query" } return CallToolResult( content=[TextContent(type="text", text=json.dumps(result, indent=2))] ) else: return CallToolResult( content=[TextContent(type="text", text=f"Unknown tool: {name}")] ) except Exception as e: logger.error("Tool execution failed", tool=name, error=str(e)) return CallToolResult( content=[TextContent(type="text", text=f"Error: {str(e)}")] ) async def run(self, transport_type="stdio", host="localhost", port=3000): """Run the MCP server with specified transport""" if transport_type == "stdio": async with self.server.stdio_server() as streams: await self.server.run( streams[0], streams[1], InitializationOptions( server_name="mcp-server", server_version="0.1.0", capabilities=self.server.get_capabilities(), ), ) else: # For future HTTP/WebSocket transport if needed logger.info(f"Transport type {transport_type} not yet implemented") logger.info("MCP servers typically use stdio transport") async def main(): import argparse parser = argparse.ArgumentParser(description="MCP Server") parser.add_argument("--transport", default="stdio", choices=["stdio"], help="Transport type (currently only stdio supported)") parser.add_argument("--host", default="localhost", help="Host to bind to (for future HTTP transport)") parser.add_argument("--port", type=int, default=3000, help="Port to bind to (for future HTTP transport)") args = parser.parse_args() logging.basicConfig(level=logging.INFO) structlog.configure( processors=[ structlog.stdlib.filter_by_level, structlog.stdlib.add_logger_name, structlog.stdlib.add_log_level, structlog.stdlib.PositionalArgumentsFormatter(), structlog.processors.TimeStamper(fmt="iso"), structlog.processors.StackInfoRenderer(), structlog.processors.format_exc_info, structlog.processors.UnicodeDecoder(), structlog.processors.JSONRenderer(), ], context_class=dict, logger_factory=structlog.stdlib.LoggerFactory(), wrapper_class=structlog.stdlib.BoundLogger, cache_logger_on_first_use=True, ) server = MCPServer() await server.run(transport_type=args.transport, host=args.host, port=args.port) if __name__ == "__main__": asyncio.run(main())

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/MelaLitho/MCPServer'

If you have feedback or need assistance with the MCP directory API, please join our Discord server